jobqueue: fix DBO_TRX logic in JobQueueDB for avoiding transactions
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 30 Mar 2019 02:02:15 +0000 (19:02 -0700)
committerKrinkle <krinklemail@gmail.com>
Mon, 1 Apr 2019 22:29:31 +0000 (22:29 +0000)
Various methods were missing the flag setting logic and tests could
fail or have "outer scope" warnings in the logs for sqlite.

Change-Id: Ia0607d189a307667297f06109a34363c92e37d92

includes/jobqueue/JobQueueDB.php

index 495be73..74a6559 100644 (file)
@@ -83,6 +83,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function doIsEmpty() {
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
                        $found = $dbr->selectField( // unclaimed job
                                'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
@@ -106,8 +108,10 @@ class JobQueueDB extends JobQueue {
                        return $size;
                }
 
+               $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
-                       $dbr = $this->getReplicaDB();
                        $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
                                [ 'job_cmd' => $this->type, 'job_token' => '' ],
                                __METHOD__
@@ -137,6 +141,8 @@ class JobQueueDB extends JobQueue {
                }
 
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
                        $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
                                [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
@@ -168,6 +174,8 @@ class JobQueueDB extends JobQueue {
                }
 
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
                        $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
                                [
@@ -195,6 +203,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function doBatchPush( array $jobs, $flags ) {
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
                // In general, there will be two cases here:
                // a) sqlite; DB connection is probably a regular round-aware handle.
                // If the connection is busy with a transaction, then defer the job writes
@@ -283,15 +293,12 @@ class JobQueueDB extends JobQueue {
         */
        protected function doPop() {
                $dbw = $this->getMasterDB();
-               try {
-                       $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
-                       $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
-                       $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
-                               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
-                       } );
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
 
+               $job = false; // job popped off
+               try {
                        $uuid = wfRandomString( 32 ); // pop attempt
-                       $job = false; // job popped off
                        do { // retry when our row is invalid or deleted as a duplicate
                                // Try to reserve a row in the DB...
                                if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
@@ -337,6 +344,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function claimRandom( $uuid, $rand, $gte ) {
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
                // Check cache to see if the queue has <= OFFSET items
                $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
 
@@ -414,6 +423,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function claimOldest( $uuid ) {
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
 
                $row = false; // the row acquired
                do {
@@ -478,13 +489,9 @@ class JobQueueDB extends JobQueue {
                }
 
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
                try {
-                       $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
-                       $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
-                       $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
-                               $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
-                       } );
-
                        // Delete a row with a single DELETE without holding row locks over RTTs...
                        $dbw->delete( 'job',
                                [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
@@ -515,6 +522,9 @@ class JobQueueDB extends JobQueue {
                // maintained. Having only the de-duplication registration succeed would cause
                // jobs to become no-ops without any actual jobs that made them redundant.
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
+
                $cache = $this->dupCache;
                $dbw->onTransactionCommitOrIdle(
                        function () use ( $cache, $params, $key ) {
@@ -538,6 +548,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function doDelete() {
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
                try {
                        $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
                } catch ( DBError $e ) {
@@ -594,6 +606,8 @@ class JobQueueDB extends JobQueue {
         */
        protected function getJobIterator( array $conds ) {
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                try {
                        return new MappedIterator(
                                $dbr->select( 'job', self::selectFields(), $conds ),
@@ -626,6 +640,8 @@ class JobQueueDB extends JobQueue {
 
        protected function doGetSiblingQueuesWithJobs( array $types ) {
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
                // @note: this does not check whether the jobs are claimed or not.
                // This is useful so JobQueueGroup::pop() also sees queues that only
                // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
@@ -643,6 +659,9 @@ class JobQueueDB extends JobQueue {
 
        protected function doGetSiblingQueueSizes( array $types ) {
                $dbr = $this->getReplicaDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbr );
+
                $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
                        [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
 
@@ -663,6 +682,8 @@ class JobQueueDB extends JobQueue {
                $now = time();
                $count = 0; // affected rows
                $dbw = $this->getMasterDB();
+               /** @noinspection PhpUnusedLocalVariableInspection */
+               $scope = $this->getScopedNoTrxFlag( $dbw );
 
                try {
                        if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
@@ -821,6 +842,21 @@ class JobQueueDB extends JobQueue {
                }
        }
 
+       /**
+        * @param IDatabase $db
+        * @return ScopedCallback
+        */
+       private function getScopedNoTrxFlag( IDatabase $db ) {
+               $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
+               $db->clearFlag( DBO_TRX ); // make each query its own transaction
+
+               return new ScopedCallback( function () use ( $db, $autoTrx ) {
+                       if ( $autoTrx ) {
+                               $db->setFlag( DBO_TRX ); // restore old setting
+                       }
+               } );
+       }
+
        /**
         * @param string $property
         * @return string