[JobQueue] Reduced deadlocks in claim() function.
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 31 Oct 2012 21:34:35 +0000 (14:34 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Thu, 1 Nov 2012 02:59:39 +0000 (19:59 -0700)
* Split claim() into claimRandom() and claimOldest().
* Added a new SELECT+UPDATE method that will automatically be used if there are slaves.
  This is what claimRandom() uses, which is the claim function used for random queues.
  This can handle torture testing with dozens of processes using NullJob without deadlocks.
* Made claimOldest() work using the same method as the old claim() method. Doing SELECT
  first won't really work that well in this case. The useless "job_random > 0" is now gone
  from the query, which actually alleviates deadlock problems too. This method is used for
  "timestamp" ordered queues.

Change-Id: Iaea96ff8eba2c918376f9465b54e9bbc3124f473

includes/job/JobQueue.php
includes/job/JobQueueDB.php

index 4637bd2..2778829 100644 (file)
@@ -47,7 +47,7 @@ abstract class JobQueue {
        /**
         * Get a job queue object of the specified type.
         * $params includes:
-        *     class : what job class to use (determines job type)
+        *     class : What job class to use (determines job type)
         *     wiki  : wiki ID of the wiki the jobs are for (defaults to current wiki)
         *     type  : The name of the job types this queue handles
         *     order : Order that pop() selects jobs, either "timestamp" or "random".
index d9465fb..4a9c47a 100644 (file)
@@ -126,26 +126,20 @@ class JobQueueDB extends JobQueue {
                        do { // retry when our row is invalid or deleted as a duplicate
                                // Try to reserve a row in the DB...
                                if ( $this->order === 'timestamp' ) { // oldest first
-                                       $found = $this->claim( $uuid, 0, true );
+                                       $row = $this->claimOldest( $uuid );
                                } else { // random first
-                                       $rand  = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
-                                       $gte   = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
-                                       $found = $this->claim( $uuid, $rand, $gte )
-                                               || $this->claim( $uuid, $rand, !$gte ); // try both directions
+                                       $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+                                       $gte  = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+                                       $row  = $this->claimRandom( $uuid, $rand, $gte );
+                                       if ( !$row ) { // need to try the other direction
+                                               $row = $this->claimRandom( $uuid, $rand, !$gte );
+                                       }
                                }
                                // Check if we found a row to reserve...
-                               if ( !$found ) {
+                               if ( !$row ) {
                                        $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
                                        break; // nothing to do
                                }
-                               // Fetch any row that we just reserved...
-                               $row = $dbw->selectRow( 'job', '*',
-                                       array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
-                               // Check if another process deleted it as a duplicate
-                               if ( !$row ) {
-                                       wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
-                                       continue; // try again
-                               }
                                // Get the job object from the row...
                                $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
                                if ( !$title ) {
@@ -176,51 +170,107 @@ class JobQueueDB extends JobQueue {
 
        /**
         * Reserve a row with a single UPDATE without holding row locks over RTTs...
+        *
         * @param $uuid string 32 char hex string
         * @param $rand integer Random unsigned integer (31 bits)
         * @param $gte bool Search for job_random >= $random (otherwise job_random <= $random)
-        * @return integer Number of affected rows
+        * @return Row|false
         */
-       protected function claim( $uuid, $rand, $gte ) {
+       protected function claimRandom( $uuid, $rand, $gte ) {
                $dbw  = $this->getMasterDB();
                $dir  = $gte ? 'ASC' : 'DESC';
                $ineq = $gte ? '>=' : '<=';
-               if ( $dbw->getType() === 'mysql' ) {
-                       // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
-                       // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
-                       // Oracle and Postgre have no such limitation. However, MySQL offers an
-                       // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
-                       // The DB wrapper functions do not support this, so it's done manually.
-                       $dbw->query( "UPDATE {$dbw->tableName( 'job' )}
-                               SET
-                                       job_token = {$dbw->addQuotes( $uuid ) },
-                                       job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}
-                               WHERE (
-                                       job_cmd = {$dbw->addQuotes( $this->type )}
-                                       AND job_token = {$dbw->addQuotes( '' )}
-                                       AND job_random {$ineq} {$dbw->addQuotes( $rand )}
-                               ) ORDER BY job_random {$dir} LIMIT 1",
-                               __METHOD__
-                       );
-               } else {
-                       // Use a subquery to find the job, within an UPDATE to claim it.
-                       // This uses as much of the DB wrapper functions as possible.
-                       $dbw->update( 'job',
-                               array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
-                               array( 'job_id = (' .
-                                       $dbw->selectSQLText( 'job', 'job_id',
-                                               array(
-                                                       'job_cmd'   => $this->type,
-                                                       'job_token' => '',
-                                                       "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
-                                               __METHOD__,
-                                               array( 'ORDER BY' => "job_random {$dir}", 'LIMIT' => 1 ) ) .
-                                       ')'
-                               ),
-                               __METHOD__
+
+               $row = false; // the row acquired
+               // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
+               // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
+               // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
+               // be used here with MySQL.
+               while ( true ) {
+                       $row = $dbw->selectRow( 'job', '*', // find a random job
+                               array(
+                                       'job_cmd'   => $this->type,
+                                       'job_token' => '',
+                                       "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
+                               __METHOD__,
+                               array( 'ORDER BY' => "job_random {$dir}" )
                        );
+                       if ( $row ) { // claim the job
+                               $dbw->update( 'job', // update by PK
+                                       array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+                                       array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
+                                       __METHOD__
+                               );
+                               // This might get raced out by another runner when claiming the previously
+                               // selected row. The use of job_random should minimize this problem, however.
+                               if ( $dbw->affectedRows() ) {
+                                       break; // acquired
+                               } else {
+                                       $row = false; // raced out
+                               }
+                       } else {
+                               break; // nothing to do
+                       }
                }
-               return $dbw->affectedRows();
+
+               return $row;
+       }
+
+       /**
+        * Reserve a row with a single UPDATE without holding row locks over RTTs...
+        *
+        * @param $uuid string 32 char hex string
+        * @return Row|false
+        */
+       protected function claimOldest( $uuid ) {
+               $dbw  = $this->getMasterDB();
+
+               $row = false; // the row acquired
+               while ( true ) {
+                       if ( $dbw->getType() === 'mysql' ) {
+                               // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
+                               // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
+                               // Oracle and Postgre have no such limitation. However, MySQL offers an
+                               // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
+                               $dbw->query( "UPDATE {$dbw->tableName( 'job' )}
+                                       SET
+                                               job_token = {$dbw->addQuotes( $uuid ) },
+                                               job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}
+                                       WHERE (
+                                               job_cmd = {$dbw->addQuotes( $this->type )}
+                                               AND job_token = {$dbw->addQuotes( '' )}
+                                       ) ORDER BY job_random ASC LIMIT 1",
+                                       __METHOD__
+                               );
+                       } else {
+                               // Use a subquery to find the job, within an UPDATE to claim it.
+                               // This uses as much of the DB wrapper functions as possible.
+                               $dbw->update( 'job',
+                                       array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+                                       array( 'job_id = (' .
+                                               $dbw->selectSQLText( 'job', 'job_id',
+                                                       array( 'job_cmd' => $this->type, 'job_token' => '' ),
+                                                       __METHOD__,
+                                                       array( 'ORDER BY' => 'job_random ASC', 'LIMIT' => 1 ) ) .
+                                               ')'
+                                       ),
+                                       __METHOD__
+                               );
+                       }
+                       // Fetch any row that we just reserved...
+                       if ( $dbw->affectedRows() ) {
+                               $row = $dbw->selectRow( 'job', '*',
+                                       array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__
+                               );
+                               if ( !$row ) { // raced out by duplicate job removal
+                                       wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
+                               }
+                       } else {
+                               break; // nothing to do
+                       }
+               }
+
+               return $row;
        }
 
        /**