* http://www.gnu.org/copyleft/gpl.html
*
* @file
- * @todo Make this work on PostgreSQL and maybe other database servers
* @ingroup Maintenance
*/
public function __construct() {
parent::__construct();
$this->mDescription = "Pick a database that has pending jobs";
- $this->addOption( 'type', "The type of job to search for", false, true );
+ $this->addOption( 'type', "Search by job type", false, true );
+ $this->addOption( 'types', "Space separated list of job types to search for", false, true );
}
public function execute() {
global $wgMemc;
- $type = $this->getOption( 'type', false );
+ $type = false; // job type required/picked
+ if ( $this->hasOption( 'types' ) ) {
+ $types = explode( ' ', $this->getOption( 'types' ) );
+ } elseif ( $this->hasOption( 'type' ) ) {
+ $types = array( $this->getOption( 'type' ) );
+ } else {
+ $types = JobQueueGroup::singleton()->getDefaultQueueTypes();
+ }
+
+ // Handle any required periodic queue maintenance
+ $this->executeReadyPeriodicTasks();
$memcKey = 'jobqueue:dbs:v3';
$pendingDbInfo = $wgMemc->get( $memcKey );
- // If the cache entry wasn't present, or in 1% of cases otherwise,
- // regenerate the cache. Use any available stale cache if another
- // process is currently regenerating the pending DB information.
- if ( !$pendingDbInfo || mt_rand( 0, 100 ) == 0 ) {
- $lock = $wgMemc->add( 'jobqueue:dbs:v3:lock', 1, 1800 ); // lock
- if ( $lock ) {
+ // If the cache entry wasn't present, is stale, or in .1% of cases otherwise,
+ // regenerate the cache. Use any available stale cache if another process is
+ // currently regenerating the pending DB information.
+ if ( !is_array( $pendingDbInfo )
+ || ( time() - $pendingDbInfo['timestamp'] ) > 300 // 5 minutes
+ || mt_rand( 0, 999 ) == 0
+ ) {
+ if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock
$pendingDbInfo = array(
'pendingDBs' => $this->getPendingDbs(),
'timestamp' => time()
);
- $wgMemc->set( $memcKey, $pendingDbInfo );
- $wgMemc->delete( 'jobqueue:dbs:v3:lock' ); // unlock
+ for ( $attempts=1; $attempts <= 25; ++$attempts ) {
+ if ( $wgMemc->add( "$memcKey:lock", 1, 60 ) ) { // lock
+ $wgMemc->set( $memcKey, $pendingDbInfo );
+ $wgMemc->delete( "$memcKey:lock" ); // unlock
+ break;
+ }
+ }
+ $wgMemc->delete( "$memcKey:rebuild" ); // unlock
}
}
- if ( !$pendingDbInfo || !$pendingDbInfo['pendingDBs'] ) {
+ if ( !is_array( $pendingDbInfo ) || !$pendingDbInfo['pendingDBs'] ) {
return; // no DBs with jobs or cache is both empty and locked
}
- $pendingDBs = $pendingDbInfo['pendingDBs'];
+ $pendingDBs = $pendingDbInfo['pendingDBs']; // convenience
do {
$again = false;
- if ( $type === false ) {
- $candidates = call_user_func_array( 'array_merge', $pendingDBs );
- } elseif ( isset( $pendingDBs[$type] ) ) {
- $candidates = $pendingDBs[$type];
- } else {
- $candidates = array();
+ $candidates = array(); // list of (type, db)
+ // Flatten the tree of candidates into a flat list so that a random
+ // item can be selected, weighing each queue (type/db tuple) equally.
+ foreach ( $pendingDBs as $type => $dbs ) {
+ if ( in_array( $type, $types ) ) {
+ foreach ( $dbs as $db ) {
+ $candidates[] = array( $type, $db );
+ }
+ }
}
- if ( !$candidates ) {
- return;
+ if ( !count( $candidates ) ) {
+ return; // no jobs for this type
}
- $candidates = array_values( $candidates );
- $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
- if ( !$this->checkJob( $type, $db ) ) {
- if ( $type === false ) {
- // There are no jobs available in the current database
- foreach ( $pendingDBs as $type2 => $dbs ) {
- $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) );
+ list( $type, $db ) = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
+ if ( !$this->checkJob( $type, $db ) ) { // queue is actually empty?
+ $pendingDBs = $this->delistDB( $pendingDBs, $db, $type );
+ // Update the cache to remove the outdated information.
+ // Make sure that this does not race (especially with full rebuilds).
+ if ( $wgMemc->add( "$memcKey:lock", 1, 60 ) ) { // lock
+ $curInfo = $wgMemc->get( $memcKey );
+ if ( is_array( $curInfo ) ) {
+ $curInfo['pendingDBs'] =
+ $this->delistDB( $curInfo['pendingDBs'], $db, $type );
+ $wgMemc->set( $memcKey, $curInfo );
+ // May as well make use of this newer information
+ $pendingDBs = $curInfo['pendingDBs'];
}
- } else {
- // There are no jobs of this type available in the current database
- $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
+ $wgMemc->delete( "$memcKey:lock" ); // unlock
}
- // Update the cache to remove the outdated information
- $pendingDbInfo['pendingDBs'] = $pendingDBs;
- // @TODO: fix race condition with these updates
- $wgMemc->set( $memcKey, $pendingDbInfo );
$again = true;
}
} while ( $again );
- $this->output( $db . "\n" );
+ if ( $this->hasOption( 'types' ) ) {
+ $this->output( $db . " " . $type . "\n" );
+ } else {
+ $this->output( $db . "\n" );
+ }
+ }
+
+ /**
+ * Remove a type/DB entry from the list of queues with jobs
+ *
+ * @param $pendingDBs array
+ * @param $db string
+ * @param $type string
+ * @return Array
+ */
+ private function delistDB( array $pendingDBs, $db, $type ) {
+ $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
+ return $pendingDBs;
}
/**
* @param $dbName string
* @return bool
*/
- function checkJob( $type, $dbName ) {
- $group = JobQueueGroup::singleton( $dbName );
- if ( $type === false ) {
- foreach ( $group->getDefaultQueueTypes() as $type ) {
- if ( !$group->get( $type )->isEmpty() ) {
- return true;
- }
- }
- return false;
- } else {
- return !$group->get( $type )->isEmpty();
- }
+ private function checkJob( $type, $dbName ) {
+ return !JobQueueGroup::singleton( $dbName )->get( $type )->isEmpty();
}
/**
$pendingDBs = array(); // (job type => (db list))
foreach ( $wgLocalDatabases as $db ) {
- $types = JobQueueGroup::singleton( $db )->getQueuesWithJobs();
- foreach ( $types as $type ) {
+ foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) {
$pendingDBs[$type][] = $db;
}
}
return $pendingDBs;
}
+
+ /**
+ * Do all ready periodic jobs for all databases every 5 minutes (and .1% of the time)
+ * @return integer
+ */
+ private function executeReadyPeriodicTasks() {
+ global $wgLocalDatabases, $wgMemc;
+
+ $count = 0;
+ $memcKey = 'jobqueue:periodic:lasttime';
+ $timestamp = (int)$wgMemc->get( $memcKey ); // UNIX timestamp or 0
+ if ( ( time() - $timestamp ) > 300 || mt_rand( 0, 999 ) == 0 ) { // 5 minutes
+ if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock
+ foreach ( $wgLocalDatabases as $db ) {
+ $count += JobQueueGroup::singleton( $db )->executeReadyPeriodicTasks();
+ }
+ $wgMemc->set( $memcKey, time() );
+ $wgMemc->delete( "$memcKey:rebuild" ); // unlock
+ }
+ }
+
+ return $count;
+ }
}
$maintClass = "nextJobDb";