X-Git-Url: http://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjob%2FJobQueueDB.php;h=c39083df168d72ec3cba4d683531952b39be6aac;hb=75bbe1bbffd6009959ba94ba44b2e4bc9551f7fc;hp=3fa06556cda7f3f1a58ac0aece2c16ce22f403b2;hpb=75da5baa3f353a326edaf9ee4e0bbaed097f725a;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 3fa06556cd..c39083df16 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -79,7 +79,7 @@ class JobQueueDB extends JobQueue { return false; } - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); try { $found = $dbr->selectField( // unclaimed job 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ @@ -105,7 +105,7 @@ class JobQueueDB extends JobQueue { } try { - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); $size = (int)$dbr->selectField( 'job', 'COUNT(*)', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ @@ -134,7 +134,7 @@ class JobQueueDB extends JobQueue { return $count; } - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); try { $count = (int)$dbr->selectField( 'job', 'COUNT(*)', array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), @@ -167,7 +167,7 @@ class JobQueueDB extends JobQueue { return $count; } - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); try { $count = (int)$dbr->selectField( 'job', 'COUNT(*)', array( @@ -193,12 +193,12 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doBatchPush( array $jobs, $flags ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); $that = $this; $method = __METHOD__; $dbw->onTransactionIdle( - function() use ( $dbw, $that, $jobs, $flags, $method, $scope ) { + function() use ( $dbw, $that, $jobs, $flags, $method ) { $that->doBatchPushInternal( $dbw, $jobs, $flags, $method ); } ); @@ -216,7 +216,7 @@ class JobQueueDB extends JobQueue { * @return boolean * @throws type */ - public function doBatchPushInternal( DatabaseBase $dbw, array $jobs, $flags, $method ) { + public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { if ( !count( $jobs ) ) { return true; } @@ -284,7 +284,7 @@ class JobQueueDB extends JobQueue { return false; // queue is empty } - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); try { $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting @@ -339,7 +339,7 @@ class JobQueueDB extends JobQueue { * @return Row|false */ protected function claimRandom( $uuid, $rand, $gte ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); // Check cache to see if the queue has <= OFFSET items $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); @@ -415,7 +415,7 @@ class JobQueueDB extends JobQueue { * @return Row|false */ protected function claimOldest( $uuid ) { - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); $row = false; // the row acquired do { @@ -480,7 +480,7 @@ class JobQueueDB extends JobQueue { throw new MWException( "Job of type '{$job->getType()}' has no ID." ); } - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); try { $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting @@ -518,9 +518,9 @@ class JobQueueDB extends JobQueue { // deferred till "transaction idle", do the same here, so that the ordering is // maintained. Having only the de-duplication registration succeed would cause // jobs to become no-ops without any actual jobs that made them redundant. - list( $dbw, $scope ) = $this->getMasterDB(); - $cache = $this->cache; - $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $scope ) { + $dbw = $this->getMasterDB(); + $cache = $this->dupCache; + $dbw->onTransactionIdle( function() use ( $cache, $params, $key, $dbw ) { $timestamp = $cache->get( $key ); // current last timestamp of this job if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { return true; // a newer version of this root job was enqueued @@ -538,8 +538,7 @@ class JobQueueDB extends JobQueue { * @return bool */ protected function doDelete() { - list( $dbw, $scope ) = $this->getMasterDB(); - + $dbw = $this->getMasterDB(); try { $dbw->delete( 'job', array( 'job_cmd' => $this->type ) ); } catch ( DBError $e ) { @@ -582,12 +581,12 @@ class JobQueueDB extends JobQueue { * @return Iterator */ public function getAllQueuedJobs() { - list( $dbr, $scope ) = $this->getSlaveDB(); + $dbr = $this->getSlaveDB(); try { return new MappedIterator( $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), - function( $row ) use ( $scope ) { + function( $row ) use ( $dbr ) { $job = Job::factory( $row->job_cmd, Title::makeTitle( $row->job_namespace, $row->job_title ), @@ -604,6 +603,36 @@ class JobQueueDB extends JobQueue { } } + public function getCoalesceLocationInternal() { + return $this->cluster + ? "DBCluster:{$this->cluster}:{$this->wiki}" + : "LBFactory:{$this->wiki}"; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', 'DISTINCT job_cmd', + array( 'job_cmd' => $types ), __METHOD__ ); + + $types = array(); + foreach ( $res as $row ) { + $types[] = $row->job_cmd; + } + return $types; + } + + protected function doGetSiblingQueueSizes( array $types ) { + $dbr = $this->getSlaveDB(); + $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), + array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) ); + + $sizes = array(); + foreach ( $res as $row ) { + $sizes[$row->job_cmd] = (int)$row->count; + } + return $sizes; + } + /** * Recycle or destroy any jobs that have been claimed for too long * @@ -612,7 +641,7 @@ class JobQueueDB extends JobQueue { public function recycleAndDeleteStaleJobs() { $now = time(); $count = 0; // affected rows - list( $dbw, $scope ) = $this->getMasterDB(); + $dbw = $this->getMasterDB(); try { if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { @@ -689,7 +718,30 @@ class JobQueueDB extends JobQueue { } /** - * @return Array (DatabaseBase, ScopedCallback) + * @param $job Job + * @return array + */ + protected function insertFields( Job $job ) { + $dbw = $this->getMasterDB(); + return array( + // Fields that describe the nature of the job + 'job_cmd' => $job->getType(), + 'job_namespace' => $job->getTitle()->getNamespace(), + 'job_title' => $job->getTitle()->getDBkey(), + 'job_params' => self::makeBlob( $job->getParams() ), + // Additional job metadata + 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), + 'job_timestamp' => $dbw->timestamp(), + 'job_sha1' => wfBaseConvert( + sha1( serialize( $job->getDeduplicationInfo() ) ), + 16, 36, 31 + ), + 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) + ); + } + + /** + * @return DBConnRef */ protected function getSlaveDB() { try { @@ -700,7 +752,7 @@ class JobQueueDB extends JobQueue { } /** - * @return Array (DatabaseBase, ScopedCallback) + * @return DBConnRef */ protected function getMasterDB() { try { @@ -712,42 +764,13 @@ class JobQueueDB extends JobQueue { /** * @param $index integer (DB_SLAVE/DB_MASTER) - * @return Array (DatabaseBase, ScopedCallback) + * @return DBConnRef */ protected function getDB( $index ) { $lb = ( $this->cluster !== false ) ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) : wfGetLB( $this->wiki ); - $conn = $lb->getConnection( $index, array(), $this->wiki ); - return array( - $conn, - new ScopedCallback( function() use ( $lb, $conn ) { - $lb->reuseConnection( $conn ); - } ) - ); - } - - /** - * @param $job Job - * @return array - */ - protected function insertFields( Job $job ) { - list( $dbw, $scope ) = $this->getMasterDB(); - return array( - // Fields that describe the nature of the job - 'job_cmd' => $job->getType(), - 'job_namespace' => $job->getTitle()->getNamespace(), - 'job_title' => $job->getTitle()->getDBkey(), - 'job_params' => self::makeBlob( $job->getParams() ), - // Additional job metadata - 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), - 'job_timestamp' => $dbw->timestamp(), - 'job_sha1' => wfBaseConvert( - sha1( serialize( $job->getDeduplicationInfo() ) ), - 16, 36, 31 - ), - 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) - ); + return $lb->getConnectionRef( $index, array(), $this->wiki ); } /**