Enable users to watch category membership changes #2
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueDB.php
index 491092a..7907614 100644 (file)
@@ -177,7 +177,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doBatchPush()
-        * @param array $jobs
+        * @param IJobSpecification[] $jobs
         * @param int $flags
         * @throws DBError|Exception
         * @return void
@@ -198,7 +198,7 @@ class JobQueueDB extends JobQueue {
         * This function should *not* be called outside of JobQueueDB
         *
         * @param IDatabase $dbw
-        * @param array $jobs
+        * @param IJobSpecification[] $jobs
         * @param int $flags
         * @param string $method
         * @throws DBError
@@ -221,7 +221,7 @@ class JobQueueDB extends JobQueue {
                }
 
                if ( $flags & self::QOS_ATOMIC ) {
-                       $dbw->begin( $method ); // wrap all the job additions in one transaction
+                       $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
                }
                try {
                        // Strip out any duplicate jobs that are already in the queue...
@@ -245,10 +245,8 @@ class JobQueueDB extends JobQueue {
                        foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
                                $dbw->insert( 'job', $rowBatch, $method );
                        }
-                       JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
-                       JobQueue::incrStats(
-                               'job-insert-duplicate',
-                               $this->type,
+                       JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
+                       JobQueue::incrStats( 'dupe_inserts', $this->type,
                                count( $rowSet ) + count( $rowList ) - count( $rows )
                        );
                } catch ( DBError $e ) {
@@ -258,7 +256,7 @@ class JobQueueDB extends JobQueue {
                        throw $e;
                }
                if ( $flags & self::QOS_ATOMIC ) {
-                       $dbw->commit( $method );
+                       $dbw->endAtomic( $method );
                }
 
                return;
@@ -293,12 +291,13 @@ class JobQueueDB extends JobQueue {
                                if ( !$row ) {
                                        break; // nothing to do
                                }
-                               JobQueue::incrStats( 'job-pop', $this->type );
+                               JobQueue::incrStats( 'pops', $this->type );
                                // Get the job object from the row...
                                $title = Title::makeTitle( $row->job_namespace, $row->job_title );
                                $job = Job::factory( $row->job_cmd, $title,
                                        self::extractBlob( $row->job_params ), $row->job_id );
                                $job->metadata['id'] = $row->job_id;
+                               $job->metadata['timestamp'] = $row->job_timestamp;
                                break; // done
                        } while ( true );
 
@@ -478,7 +477,7 @@ class JobQueueDB extends JobQueue {
                        $dbw->delete( 'job',
                                array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
 
-                       JobQueue::incrStats( 'job-ack', $this->type );
+                       JobQueue::incrStats( 'acks', $this->type );
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
@@ -488,11 +487,11 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @see JobQueue::doDeduplicateRootJob()
-        * @param Job $job
+        * @param IJobSpecification $job
         * @throws MWException
         * @return bool
         */
-       protected function doDeduplicateRootJob( Job $job ) {
+       protected function doDeduplicateRootJob( IJobSpecification $job ) {
                $params = $job->getParams();
                if ( !isset( $params['rootJobSignature'] ) ) {
                        throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
@@ -557,18 +556,35 @@ class JobQueueDB extends JobQueue {
         * @return Iterator
         */
        public function getAllQueuedJobs() {
+               return $this->getJobIterator( array( 'job_cmd' => $this->getType(), 'job_token' => '' ) );
+       }
+
+       /**
+        * @see JobQueue::getAllAcquiredJobs()
+        * @return Iterator
+        */
+       public function getAllAcquiredJobs() {
+               return $this->getJobIterator( array( 'job_cmd' => $this->getType(), "job_token > ''" ) );
+       }
+
+       /**
+        * @param array $conds Query conditions
+        * @return Iterator
+        */
+       protected function getJobIterator( array $conds ) {
                $dbr = $this->getSlaveDB();
                try {
                        return new MappedIterator(
-                               $dbr->select( 'job', self::selectFields(),
-                                       array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
-                               function ( $row ) use ( $dbr ) {
+                               $dbr->select( 'job', self::selectFields(), $conds ),
+                               function ( $row ) {
                                        $job = Job::factory(
                                                $row->job_cmd,
                                                Title::makeTitle( $row->job_namespace, $row->job_title ),
-                                               strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
+                                               strlen( $row->job_params ) ? unserialize( $row->job_params ) : array()
                                        );
                                        $job->metadata['id'] = $row->job_id;
+                                       $job->metadata['timestamp'] = $row->job_timestamp;
+
                                        return $job;
                                }
                        );
@@ -661,7 +677,7 @@ class JobQueueDB extends JobQueue {
                                        );
                                        $affected = $dbw->affectedRows();
                                        $count += $affected;
-                                       JobQueue::incrStats( 'job-recycle', $this->type, $affected );
+                                       JobQueue::incrStats( 'recycles', $this->type, $affected );
                                        $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
                                }
                        }
@@ -688,7 +704,7 @@ class JobQueueDB extends JobQueue {
                                $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
                                $affected = $dbw->affectedRows();
                                $count += $affected;
-                               JobQueue::incrStats( 'job-abandon', $this->type, $affected );
+                               JobQueue::incrStats( 'abandons', $this->type, $affected );
                        }
 
                        $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );