Send job ACKs to statsd
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueDB.php
index 5e8399c..000fa4f 100644 (file)
@@ -29,7 +29,6 @@
  */
 class JobQueueDB extends JobQueue {
        const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
-       const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
        const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
        const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
        const MAX_OFFSET = 255; // integer; maximum number of rows to skip
@@ -71,15 +70,6 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doIsEmpty() {
-               $key = $this->getCacheKey( 'empty' );
-
-               $isEmpty = $this->cache->get( $key );
-               if ( $isEmpty === 'true' ) {
-                       return true;
-               } elseif ( $isEmpty === 'false' ) {
-                       return false;
-               }
-
                $dbr = $this->getSlaveDB();
                try {
                        $found = $dbr->selectField( // unclaimed job
@@ -88,7 +78,6 @@ class JobQueueDB extends JobQueue {
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
-               $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
 
                return !$found;
        }
@@ -256,12 +245,11 @@ class JobQueueDB extends JobQueue {
                        foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
                                $dbw->insert( 'job', $rowBatch, $method );
                        }
-                       JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki );
+                       JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
                        JobQueue::incrStats(
                                'job-insert-duplicate',
                                $this->type,
-                               count( $rowSet ) + count( $rowList ) - count( $rows ),
-                               $this->wiki
+                               count( $rowSet ) + count( $rowList ) - count( $rows )
                        );
                } catch ( DBError $e ) {
                        if ( $flags & self::QOS_ATOMIC ) {
@@ -273,8 +261,6 @@ class JobQueueDB extends JobQueue {
                        $dbw->commit( $method );
                }
 
-               $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
-
                return;
        }
 
@@ -283,10 +269,6 @@ class JobQueueDB extends JobQueue {
         * @return Job|bool
         */
        protected function doPop() {
-               if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
-                       return false; // queue is empty
-               }
-
                $dbw = $this->getMasterDB();
                try {
                        $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
@@ -309,17 +291,11 @@ class JobQueueDB extends JobQueue {
                                }
                                // Check if we found a row to reserve...
                                if ( !$row ) {
-                                       $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
                                        break; // nothing to do
                                }
-                               JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+                               JobQueue::incrStats( 'job-pop', $this->type );
                                // Get the job object from the row...
-                               $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
-                               if ( !$title ) {
-                                       $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
-                                       wfDebug( "Row has invalid title '{$row->job_title}'.\n" );
-                                       continue; // try again
-                               }
+                               $title = Title::makeTitle( $row->job_namespace, $row->job_title );
                                $job = Job::factory( $row->job_cmd, $title,
                                        self::extractBlob( $row->job_params ), $row->job_id );
                                $job->metadata['id'] = $row->job_id;
@@ -495,6 +471,8 @@ class JobQueueDB extends JobQueue {
                        // Delete a row with a single DELETE without holding row locks over RTTs...
                        $dbw->delete( 'job',
                                array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+
+                       JobQueue::incrStats( 'job-ack', $this->type );
                } catch ( DBError $e ) {
                        $this->throwDBException( $e );
                }
@@ -575,7 +553,7 @@ class JobQueueDB extends JobQueue {
         * @return void
         */
        protected function doFlushCaches() {
-               foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+               foreach ( array( 'size', 'acquiredcount' ) as $type ) {
                        $this->cache->delete( $this->getCacheKey( $type ) );
                }
        }
@@ -685,8 +663,8 @@ class JobQueueDB extends JobQueue {
                                        );
                                        $affected = $dbw->affectedRows();
                                        $count += $affected;
-                                       JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
-                                       $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+                                       JobQueue::incrStats( 'job-recycle', $this->type, $affected );
+                                       $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
                                }
                        }
 
@@ -712,7 +690,7 @@ class JobQueueDB extends JobQueue {
                                $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
                                $affected = $dbw->affectedRows();
                                $count += $affected;
-                               JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki );
+                               JobQueue::incrStats( 'job-abandon', $this->type, $affected );
                        }
 
                        $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );