X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=maintenance%2FnextJobDB.php;h=6cc8566b4563705d29ac1149455632b37a9034e7;hb=9bdc558bc481218235b0b4f1b9c34ff6c86fd56c;hp=17a3f2e078ea459760c3088870a3e1afa32643f9;hpb=336b6f94a2218f46e523249eb10685829f54d2e4;p=lhc%2Fweb%2Fwiklou.git diff --git a/maintenance/nextJobDB.php b/maintenance/nextJobDB.php index 17a3f2e078..6cc8566b45 100644 --- a/maintenance/nextJobDB.php +++ b/maintenance/nextJobDB.php @@ -18,7 +18,6 @@ * http://www.gnu.org/copyleft/gpl.html * * @file - * @todo Make this work on PostgreSQL and maybe other database servers * @ingroup Maintenance */ @@ -33,86 +32,65 @@ class nextJobDB extends Maintenance { public function __construct() { parent::__construct(); $this->mDescription = "Pick a database that has pending jobs"; - $this->addOption( 'type', "The type of job to search for", false, true ); + $this->addOption( 'type', "Search by job type", false, true ); + $this->addOption( 'types', "Space separated list of job types to search for", false, true ); } public function execute() { - global $wgMemc; + global $wgMemc, $wgJobTypesExcludedFromDefaultQueue; - $type = $this->getOption( 'type', false ); + $type = false; // job type required/picked - $memcKey = 'jobqueue:dbs:v3'; - $pendingDbInfo = $wgMemc->get( $memcKey ); - - // If the cache entry wasn't present, is stale, or in .1% of cases otherwise, - // regenerate the cache. Use any available stale cache if another process is - // currently regenerating the pending DB information. - if ( !is_array( $pendingDbInfo ) - || ( time() - $pendingDbInfo['timestamp'] ) > 300 // 5 minutes - || mt_rand( 0, 999 ) == 0 - ) { - if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock - $pendingDbInfo = array( - 'pendingDBs' => $this->getPendingDbs(), - 'timestamp' => time() - ); - for ( $attempts=1; $attempts <= 25; ++$attempts ) { - if ( $wgMemc->add( "$memcKey:lock", 1, 60 ) ) { // lock - $wgMemc->set( $memcKey, $pendingDbInfo ); - $wgMemc->delete( "$memcKey:lock" ); // unlock - break; - } - } - $wgMemc->delete( "$memcKey:rebuild" ); // unlock - } + if ( $this->hasOption( 'types' ) ) { + $types = explode( ' ', $this->getOption( 'types' ) ); + } elseif ( $this->hasOption( 'type' ) ) { + $types = array( $this->getOption( 'type' ) ); + } else { + $types = false; } - if ( !is_array( $pendingDbInfo ) || !$pendingDbInfo['pendingDBs'] ) { + // Handle any required periodic queue maintenance + $this->executeReadyPeriodicTasks(); + + // Get all the queues with jobs in them + $pendingDBs = JobQueueAggregator::singleton()->getAllReadyWikiQueues(); + if ( !count( $pendingDBs ) ) { return; // no DBs with jobs or cache is both empty and locked } - $pendingDBs = $pendingDbInfo['pendingDBs']; // convenience do { $again = false; - if ( $type === false ) { - $candidates = call_user_func_array( 'array_merge', $pendingDBs ); - } elseif ( isset( $pendingDBs[$type] ) ) { - $candidates = $pendingDBs[$type]; - } else { - $candidates = array(); + $candidates = array(); // list of (type, db) + // Flatten the tree of candidates into a flat list so that a random + // item can be selected, weighing each queue (type/db tuple) equally. + foreach ( $pendingDBs as $type => $dbs ) { + if ( + ( is_array( $types ) && in_array( $type, $types ) ) || + ( $types === false && !in_array( $type, $wgJobTypesExcludedFromDefaultQueue ) ) + ) { + foreach ( $dbs as $db ) { + $candidates[] = array( $type, $db ); + } + } } - if ( !$candidates ) { - return; + if ( !count( $candidates ) ) { + return; // no jobs for this type } - $candidates = array_values( $candidates ); - $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ]; - if ( !$this->checkJob( $type, $db ) ) { - if ( $type === false ) { - // There are no jobs available in the current database - foreach ( $pendingDBs as $type2 => $dbs ) { - $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) ); - } - } else { - // There are no jobs of this type available in the current database - $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) ); - } - // Update the cache to remove the outdated information. - // Make sure that this does not race (especially with full rebuilds). - $pendingDbInfo['pendingDBs'] = $pendingDBs; - if ( $wgMemc->add( "$memcKey:lock", 1, 60 ) ) { // lock - $curInfo = $wgMemc->get( $memcKey ); - if ( $curInfo && $curInfo['timestamp'] === $pendingDbInfo['timestamp'] ) { - $wgMemc->set( $memcKey, $pendingDbInfo ); - } - $wgMemc->delete( "$memcKey:lock" ); // unlock - } + list( $type, $db ) = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ]; + if ( !$this->checkJob( $type, $db ) ) { // queue is actually empty? + $pendingDBs[$type] = array_diff( $pendingDBs[$type], $db ); + JobQueueAggregator::singleton()->notifyQueueEmpty( $db, $type ); $again = true; } } while ( $again ); - $this->output( $db . "\n" ); + if ( $this->hasOption( 'types' ) ) { + $this->output( $db . " " . $type . "\n" ); + } else { + $this->output( $db . "\n" ); + } } /** @@ -122,36 +100,31 @@ class nextJobDB extends Maintenance { * @param $dbName string * @return bool */ - function checkJob( $type, $dbName ) { - $group = JobQueueGroup::singleton( $dbName ); - if ( $type === false ) { - foreach ( $group->getDefaultQueueTypes() as $type ) { - if ( !$group->get( $type )->isEmpty() ) { - return true; - } - } - return false; - } else { - return !$group->get( $type )->isEmpty(); - } + private function checkJob( $type, $dbName ) { + return !JobQueueGroup::singleton( $dbName )->get( $type )->isEmpty(); } /** - * Get all databases that have a pending job - * @return array + * Do all ready periodic jobs for all databases every 5 minutes (and .1% of the time) + * @return integer */ - private function getPendingDbs() { - global $wgLocalDatabases; - - $pendingDBs = array(); // (job type => (db list)) - foreach ( $wgLocalDatabases as $db ) { - $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs(); - foreach ( $types as $type ) { - $pendingDBs[$type][] = $db; + private function executeReadyPeriodicTasks() { + global $wgLocalDatabases, $wgMemc; + + $count = 0; + $memcKey = 'jobqueue:periodic:lasttime'; + $timestamp = (int)$wgMemc->get( $memcKey ); // UNIX timestamp or 0 + if ( ( time() - $timestamp ) > 300 || mt_rand( 0, 999 ) == 0 ) { // 5 minutes + if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock + foreach ( $wgLocalDatabases as $db ) { + $count += JobQueueGroup::singleton( $db )->executeReadyPeriodicTasks(); + } + $wgMemc->set( $memcKey, time() ); + $wgMemc->delete( "$memcKey:rebuild" ); // unlock } } - return $pendingDBs; + return $count; } }