Merge "Delete maintenance/language/zhtable/trad2simp_supp_unset.manual"
[lhc/web/wiklou.git] / includes / job / JobQueueDB.php
index 3fa0655..c39083d 100644 (file)
@@ -79,7 +79,7 @@ class JobQueueDB extends JobQueue {
                        return false;
                }
 
-               list( $dbr, $scope ) = $this->getSlaveDB();
+               $dbr = $this->getSlaveDB();
                try {
                        $found = $dbr->selectField( // unclaimed job
                                'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
@@ -105,7 +105,7 @@ class JobQueueDB extends JobQueue {
                }
 
                try {
-                       list( $dbr, $scope ) = $this->getSlaveDB();
+                       $dbr = $this->getSlaveDB();
                        $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
                                array( 'job_cmd' => $this->type, 'job_token' => '' ),
                                __METHOD__
@@ -134,7 +134,7 @@ class JobQueueDB extends JobQueue {
                        return $count;
                }
 
-               list( $dbr, $scope ) = $this->getSlaveDB();
+               $dbr = $this->getSlaveDB();
                try {
                        $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
                                array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
@@ -167,7 +167,7 @@ class JobQueueDB extends JobQueue {
                        return $count;
                }
 
-               list( $dbr, $scope ) = $this->getSlaveDB();
+               $dbr = $this->getSlaveDB();
                try {
                        $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
                                array(
@@ -193,12 +193,12 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doBatchPush( array $jobs, $flags ) {
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
 
                $that = $this;
                $method = __METHOD__;
                $dbw->onTransactionIdle(
-                       function() use ( $dbw, $that, $jobs, $flags, $method, $scope ) {
+                       function() use ( $dbw, $that, $jobs, $flags, $method ) {
                                $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
                        }
                );
@@ -216,7 +216,7 @@ class JobQueueDB extends JobQueue {
         * @return boolean
         * @throws type
         */
-       public function doBatchPushInternal( DatabaseBase $dbw, array $jobs, $flags, $method ) {
+       public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
                if ( !count( $jobs ) ) {
                        return true;
                }
@@ -284,7 +284,7 @@ class JobQueueDB extends JobQueue {
                        return false; // queue is empty
                }
 
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
                try {
                        $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
                        $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
@@ -339,7 +339,7 @@ class JobQueueDB extends JobQueue {
         * @return Row|false
         */
        protected function claimRandom( $uuid, $rand, $gte ) {
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
                // Check cache to see if the queue has <= OFFSET items
                $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
 
@@ -415,7 +415,7 @@ class JobQueueDB extends JobQueue {
         * @return Row|false
         */
        protected function claimOldest( $uuid ) {
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
 
                $row = false; // the row acquired
                do {
@@ -480,7 +480,7 @@ class JobQueueDB extends JobQueue {
                        throw new MWException( "Job of type '{$job->getType()}' has no ID." );
                }
 
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
                try {
                        $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
                        $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
@@ -518,9 +518,9 @@ class JobQueueDB extends JobQueue {
                // deferred till "transaction idle", do the same here, so that the ordering is
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
-               list( $dbw, $scope ) = $this->getMasterDB();
-               $cache = $this->cache;
-               $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $scope ) {
+               $dbw = $this->getMasterDB();
+               $cache = $this->dupCache;
+               $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) {
                        $timestamp = $cache->get( $key ); // current last timestamp of this job
                        if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
                                return true; // a newer version of this root job was enqueued
@@ -538,8 +538,7 @@ class JobQueueDB extends JobQueue {
         * @return bool
         */
        protected function doDelete() {
-               list( $dbw, $scope ) = $this->getMasterDB();
-
+               $dbw = $this->getMasterDB();
                try {
                        $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
                } catch ( DBError $e ) {
@@ -582,12 +581,12 @@ class JobQueueDB extends JobQueue {
         * @return Iterator
         */
        public function getAllQueuedJobs() {
-               list( $dbr, $scope ) = $this->getSlaveDB();
+               $dbr = $this->getSlaveDB();
                try {
                        return new MappedIterator(
                                $dbr->select( 'job', '*',
                                        array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
-                               function( $row ) use ( $scope ) {
+                               function( $row ) use ( $dbr ) {
                                        $job = Job::factory(
                                                $row->job_cmd,
                                                Title::makeTitle( $row->job_namespace, $row->job_title ),
@@ -604,6 +603,36 @@ class JobQueueDB extends JobQueue {
                }
        }
 
+       public function getCoalesceLocationInternal() {
+               return $this->cluster
+                       ? "DBCluster:{$this->cluster}:{$this->wiki}"
+                       : "LBFactory:{$this->wiki}";
+       }
+
+       protected function doGetSiblingQueuesWithJobs( array $types ) {
+               $dbr = $this->getSlaveDB();
+               $res = $dbr->select( 'job', 'DISTINCT job_cmd',
+                       array( 'job_cmd' => $types ), __METHOD__ );
+
+               $types = array();
+               foreach ( $res as $row ) {
+                       $types[] = $row->job_cmd;
+               }
+               return $types;
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               $dbr = $this->getSlaveDB();
+               $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
+                       array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
+
+               $sizes = array();
+               foreach ( $res as $row ) {
+                       $sizes[$row->job_cmd] = (int)$row->count;
+               }
+               return $sizes;
+       }
+
        /**
         * Recycle or destroy any jobs that have been claimed for too long
         *
@@ -612,7 +641,7 @@ class JobQueueDB extends JobQueue {
        public function recycleAndDeleteStaleJobs() {
                $now = time();
                $count = 0; // affected rows
-               list( $dbw, $scope ) = $this->getMasterDB();
+               $dbw = $this->getMasterDB();
 
                try {
                        if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
@@ -689,7 +718,30 @@ class JobQueueDB extends JobQueue {
        }
 
        /**
-        * @return Array (DatabaseBase, ScopedCallback)
+        * @param $job Job
+        * @return array
+        */
+       protected function insertFields( Job $job ) {
+               $dbw = $this->getMasterDB();
+               return array(
+                       // Fields that describe the nature of the job
+                       'job_cmd'       => $job->getType(),
+                       'job_namespace' => $job->getTitle()->getNamespace(),
+                       'job_title'     => $job->getTitle()->getDBkey(),
+                       'job_params'    => self::makeBlob( $job->getParams() ),
+                       // Additional job metadata
+                       'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),
+                       'job_timestamp' => $dbw->timestamp(),
+                       'job_sha1'      => wfBaseConvert(
+                               sha1( serialize( $job->getDeduplicationInfo() ) ),
+                               16, 36, 31
+                       ),
+                       'job_random'    => mt_rand( 0, self::MAX_JOB_RANDOM )
+               );
+       }
+
+       /**
+        * @return DBConnRef
         */
        protected function getSlaveDB() {
                try {
@@ -700,7 +752,7 @@ class JobQueueDB extends JobQueue {
        }
 
        /**
-        * @return Array (DatabaseBase, ScopedCallback)
+        * @return DBConnRef
         */
        protected function getMasterDB() {
                try {
@@ -712,42 +764,13 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @param $index integer (DB_SLAVE/DB_MASTER)
-        * @return Array (DatabaseBase, ScopedCallback)
+        * @return DBConnRef
         */
        protected function getDB( $index ) {
                $lb = ( $this->cluster !== false )
                        ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
                        : wfGetLB( $this->wiki );
-               $conn = $lb->getConnection( $index, array(), $this->wiki );
-               return array(
-                       $conn,
-                       new ScopedCallback( function() use ( $lb, $conn ) {
-                               $lb->reuseConnection( $conn );
-                       } )
-               );
-       }
-
-       /**
-        * @param $job Job
-        * @return array
-        */
-       protected function insertFields( Job $job ) {
-               list( $dbw, $scope ) = $this->getMasterDB();
-               return array(
-                       // Fields that describe the nature of the job
-                       'job_cmd'       => $job->getType(),
-                       'job_namespace' => $job->getTitle()->getNamespace(),
-                       'job_title'     => $job->getTitle()->getDBkey(),
-                       'job_params'    => self::makeBlob( $job->getParams() ),
-                       // Additional job metadata
-                       'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),
-                       'job_timestamp' => $dbw->timestamp(),
-                       'job_sha1'      => wfBaseConvert(
-                               sha1( serialize( $job->getDeduplicationInfo() ) ),
-                               16, 36, 31
-                       ),
-                       'job_random'    => mt_rand( 0, self::MAX_JOB_RANDOM )
-               );
+               return $lb->getConnectionRef( $index, array(), $this->wiki );
        }
 
        /**