Add Job::getMetadata() and Job::setMetadata() accessors
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueDB.php
index cda0636..c52a948 100644 (file)
@@ -20,7 +20,7 @@
  * @file
  */
 use Wikimedia\Rdbms\IDatabase;
-use Wikimedia\Rdbms\DBConnRef;
+use Wikimedia\Rdbms\Database;
 use Wikimedia\Rdbms\DBConnectionError;
 use Wikimedia\Rdbms\DBError;
 use MediaWiki\MediaWikiServices;
@@ -40,12 +40,17 @@ class JobQueueDB extends JobQueue {
 
        /** @var WANObjectCache */
        protected $cache;
+       /** @var IDatabase|DBError|null */
+       protected $conn;
 
-       /** @var bool|string Name of an external DB cluster. False if not set */
-       protected $cluster = false;
+       /** @var array|null Server configuration array */
+       protected $server;
+       /** @var string|null Name of an external DB cluster or null for the local DB cluster */
+       protected $cluster;
 
        /**
         * Additional parameters include:
+        *   - server  : Server configuration array for Database::factory. Overrides "cluster".
         *   - cluster : The name of an external cluster registered via LBFactory.
         *               If not specified, the primary DB cluster for the wiki will be used.
         *               This can be overridden with a custom cluster so that DB handles will
@@ -55,8 +60,13 @@ class JobQueueDB extends JobQueue {
        protected function __construct( array $params ) {
                parent::__construct( $params );
 
-               $this->cluster = $params['cluster'] ?? false;
-               $this->cache = ObjectCache::getMainWANInstance();
+               if ( isset( $params['server'] ) ) {
+                       $this->server = $params['server'];
+               } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
+                       $this->cluster = $params['cluster'];
+               }
+
+               $this->cache = MediaWikiServices::getInstance()->getMainWANObjectCache();
        }
 
        protected function supportedOrders() {
@@ -73,6 +83,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function doIsEmpty() {
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
                        $found = $dbr->selectField( // unclaimed job
                                'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
@@ -96,8 +108,10 @@ class JobQueueDB extends JobQueue {
                        return $size;
                }
 
+               $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
-                       $dbr = $this->getReplicaDB();
                        $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
                                [ 'job_cmd' => $this->type, 'job_token' => '' ],
                                __METHOD__
@@ -127,6 +141,8 @@ class JobQueueDB extends JobQueue {
                }
 
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
                        $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
                                [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
@@ -158,6 +174,8 @@ class JobQueueDB extends JobQueue {
                }
 
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
                        $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
                                [
@@ -185,6 +203,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function doBatchPush( array $jobs, $flags ) {
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
                // In general, there will be two cases here:
                // a) sqlite; DB connection is probably a regular round-aware handle.
                // If the connection is busy with a transaction, then defer the job writes
@@ -273,15 +293,12 @@ class JobQueueDB extends JobQueue {
         */
        protected function doPop() {
                $dbw = $this->getMasterDB();
-               try {
-                       $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
-                       $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
-                       $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
-                               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
-                       } );
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
 
+               $job = false; // job popped off
+               try {
                        $uuid = wfRandomString( 32 ); // pop attempt
-                       $job = false; // job popped off
                        do { // retry when our row is invalid or deleted as a duplicate
                                // Try to reserve a row in the DB...
                                if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
@@ -300,8 +317,8 @@ class JobQueueDB extends JobQueue {
                                $title = Title::makeTitle( $row->job_namespace, $row->job_title );
                                $job = Job::factory( $row->job_cmd, $title,
                                        self::extractBlob( $row->job_params ) );
-                               $job->metadata['id'] = $row->job_id;
-                               $job->metadata['timestamp'] = $row->job_timestamp;
+                               $job->setMetadata( 'id', $row->job_id );
+                               $job->setMetadata( 'timestamp', $row->job_timestamp );
                                break; // done
                        } while ( true );
 
@@ -327,6 +344,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function claimRandom( $uuid, $rand, $gte ) {
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
                // Check cache to see if the queue has <= OFFSET items
                $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
 
@@ -404,6 +423,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function claimOldest( $uuid ) {
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
 
                $row = false; // the row acquired
                do {
@@ -463,21 +484,21 @@ class JobQueueDB extends JobQueue {
         * @throws MWException
         */
        protected function doAck( Job $job ) {
-               if ( !isset( $job->metadata['id'] ) ) {
+               $id = $job->getMetadata( 'id' );
+               if ( $id === null ) {
                        throw new MWException( "Job of type '{$job->getType()}' has no ID." );
                }
 
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
                try {
-                       $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
-                       $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
-                       $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
-                               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
-                       } );
-
                        // Delete a row with a single DELETE without holding row locks over RTTs...
-                       $dbw->delete( 'job',
-                               [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
+                       $dbw->delete(
+                               'job',
+                               [ 'job_cmd' => $this->type, 'job_id' => $id ],
+                               __METHOD__
+                       );
 
                        JobQueue::incrStats( 'acks', $this->type );
                } catch ( DBError $e ) {
@@ -505,6 +526,9 @@ class JobQueueDB extends JobQueue {
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
+
                $cache = $this->dupCache;
                $dbw->onTransactionCommitOrIdle(
                        function () use ( $cache, $params, $key ) {
@@ -528,6 +552,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function doDelete() {
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
                try {
                        $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
                } catch ( DBError $e ) {
@@ -542,9 +568,15 @@ class JobQueueDB extends JobQueue {
         * @return void
         */
        protected function doWaitForBackups() {
+               if ( $this->server ) {
+                       return; // not using LBFactory instance
+               }
+
                $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
-               $lbFactory->waitForReplication(
-                       [ 'domain' => $this->domain, 'cluster' => $this->cluster ] );
+               $lbFactory->waitForReplication( [
+                       'domain' => $this->domain,
+                       'cluster' => is_string( $this->cluster ) ? $this->cluster : false
+               ] );
        }
 
        /**
@@ -578,6 +610,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function getJobIterator( array $conds ) {
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
                        return new MappedIterator(
                                $dbr->select( 'job', self::selectFields(), $conds ),
@@ -587,8 +621,8 @@ class JobQueueDB extends JobQueue {
                                                Title::makeTitle( $row->job_namespace, $row->job_title ),
                                                strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
                                        );
-                                       $job->metadata['id'] = $row->job_id;
-                                       $job->metadata['timestamp'] = $row->job_timestamp;
+                                       $job->setMetadata( 'id', $row->job_id );
+                                       $job->setMetadata( 'timestamp', $row->job_timestamp );
 
                                        return $job;
                                }
@@ -599,13 +633,19 @@ class JobQueueDB extends JobQueue {
        }
 
        public function getCoalesceLocationInternal() {
-               return $this->cluster
+               if ( $this->server ) {
+                       return null; // not using the LBFactory instance
+               }
+
+               return is_string( $this->cluster )
                        ? "DBCluster:{$this->cluster}:{$this->domain}"
                        : "LBFactory:{$this->domain}";
        }
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                // @note: this does not check whether the jobs are claimed or not.
                // This is useful so JobQueueGroup::pop() also sees queues that only
                // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
@@ -623,6 +663,9 @@ class JobQueueDB extends JobQueue {
 
        protected function doGetSiblingQueueSizes( array $types ) {
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
+
                $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
                        [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
 
@@ -643,6 +686,8 @@ class JobQueueDB extends JobQueue {
                $now = time();
                $count = 0; // affected rows
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
 
                try {
                        if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
@@ -744,7 +789,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @throws JobQueueConnectionError
-        * @return DBConnRef
+        * @return IDatabase
         */
        protected function getReplicaDB() {
                try {
@@ -756,7 +801,7 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @throws JobQueueConnectionError
-        * @return DBConnRef
+        * @return IDatabase
         */
        protected function getMasterDB() {
                try {
@@ -768,20 +813,52 @@ class JobQueueDB extends JobQueue {
 
        /**
         * @param int $index (DB_REPLICA/DB_MASTER)
-        * @return DBConnRef
+        * @return IDatabase
         */
        protected function getDB( $index ) {
-               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
-               $lb = ( $this->cluster !== false )
-                       ? $lbFactory->getExternalLB( $this->cluster )
-                       : $lbFactory->getMainLB( $this->domain );
-
-               return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
-                       // Keep a separate connection to avoid contention and deadlocks;
-                       // However, SQLite has the opposite behavior due to DB-level locking.
-                       ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
-                       // Jobs insertion will be defered until the PRESEND stage to reduce contention.
-                       : $lb->getConnectionRef( $index, [], $this->domain );
+               if ( $this->server ) {
+                       if ( $this->conn instanceof IDatabase ) {
+                               return $this->conn;
+                       } elseif ( $this->conn instanceof DBError ) {
+                               throw $this->conn;
+                       }
+
+                       try {
+                               $this->conn = Database::factory( $this->server['type'], $this->server );
+                       } catch ( DBError $e ) {
+                               $this->conn = $e;
+                               throw $e;
+                       }
+
+                       return $this->conn;
+               } else {
+                       $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+                       $lb = is_string( $this->cluster )
+                               ? $lbFactory->getExternalLB( $this->cluster )
+                               : $lbFactory->getMainLB( $this->domain );
+
+                       return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
+                               // Keep a separate connection to avoid contention and deadlocks;
+                               // However, SQLite has the opposite behavior due to DB-level locking.
+                               ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
+                               // Jobs insertion will be defered until the PRESEND stage to reduce contention.
+                               : $lb->getConnectionRef( $index, [], $this->domain );
+               }
+       }
+
+       /**
+        * @param IDatabase $db
+        * @return ScopedCallback
+        */
+       private function getScopedNoTrxFlag( IDatabase $db ) {
+               $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
+               $db->clearFlag( DBO_TRX ); // make each query its own transaction
+
+               return new ScopedCallback( function () use ( $db, $autoTrx ) {
+                       if ( $autoTrx ) {
+                               $db->setFlag( DBO_TRX ); // restore old setting
+                       }
+               } );
        }
 
        /**