* @since 1.21
*/
class JobQueueGroup {
- /** @var array */
+ /** @var JobQueueGroup[] */
protected static $instances = array();
/** @var ProcessCacheLRU */
/** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
protected $coalescedQueues;
+ /** @var Job[] */
+ protected $bufferedJobs = array();
+
const TYPE_DEFAULT = 1; // integer; jobs popped by default
const TYPE_ANY = 2; // integer; any job
}
/**
- * Insert jobs into the respective queues of with the belong.
+ * Insert jobs into the respective queues of which they belong
*
* This inserts the jobs into the queue specified by $wgJobTypeConf
* and updates the aggregate job queue information cache as needed.
*
- * @param Job|Job[] $jobs A single Job or a list of Jobs
- * @throws MWException
+ * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+ * @throws InvalidArgumentException
* @return void
*/
public function push( $jobs ) {
return;
}
+ $this->assertValidJobs( $jobs );
+
$jobsByType = array(); // (job type => list of jobs)
foreach ( $jobs as $job ) {
- if ( $job instanceof IJobSpecification ) {
- $jobsByType[$job->getType()][] = $job;
- } else {
- throw new MWException( "Attempted to push a non-Job object into a queue." );
- }
+ $jobsByType[$job->getType()][] = $job;
}
foreach ( $jobsByType as $type => $jobs ) {
}
}
+ /**
+ * Buffer jobs for insertion via push() or call it now if in CLI mode
+ *
+ * Note that MediaWiki::restInPeace() calls pushLazyJobs()
+ *
+ * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+ * @return void
+ * @since 1.26
+ */
+ public function lazyPush( $jobs ) {
+ if ( PHP_SAPI === 'cli' ) {
+ $this->push( $jobs );
+ return;
+ }
+
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+
+ // Throw errors now instead of on push(), when other jobs may be buffered
+ $this->assertValidJobs( $jobs );
+
+ $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
+ }
+
+ /**
+ * Push all jobs buffered via lazyPush() into their respective queues
+ *
+ * @return void
+ * @since 1.26
+ */
+ public static function pushLazyJobs() {
+ foreach ( self::$instances as $group ) {
+ $group->push( $group->bufferedJobs );
+ $group->bufferedJobs = array();
+ }
+ }
+
/**
* Pop a job off one of the job queues
*
* Acknowledge that a job was completed
*
* @param Job $job
- * @return bool
+ * @return void
*/
public function ack( Job $job ) {
- return $this->get( $job->getType() )->ack( $job );
+ $this->get( $job->getType() )->ack( $job );
}
/**
* This does nothing for certain queue classes.
*
* @return void
- * @throws MWException
*/
public function waitForBackups() {
global $wgJobTypeConf;
* @since 1.23
*/
public function queuesHaveJobs( $type = self::TYPE_ANY ) {
- global $wgMemc;
-
$key = wfMemcKey( 'jobqueue', 'queueshavejobs', $type );
+ $cache = ObjectCache::getLocalClusterInstance();
- $value = $wgMemc->get( $key );
+ $value = $cache->get( $key );
if ( $value === false ) {
$queues = $this->getQueuesWithJobs();
if ( $type == self::TYPE_DEFAULT ) {
$queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
}
$value = count( $queues ) ? 'true' : 'false';
- $wgMemc->add( $key, $value, 15 );
+ $cache->add( $key, $value, 15 );
}
return ( $value === 'true' );
* @return mixed
*/
private function getCachedConfigVar( $name ) {
- global $wgConf, $wgMemc;
+ global $wgConf;
if ( $this->wiki === wfWikiID() ) {
return $GLOBALS[$name]; // common case
} else {
+ $cache = ObjectCache::getLocalClusterInstance();
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
$key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
- $value = $wgMemc->get( $key ); // ('v' => ...) or false
+ $value = $cache->get( $key ); // ('v' => ...) or false
if ( is_array( $value ) ) {
return $value['v'];
} else {
$value = $wgConf->getConfig( $this->wiki, $name );
- $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
+ $cache->set(
+ $key,
+ array( 'v' => $value ),
+ $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY )
+ );
return $value;
}
}
}
+
+ /**
+ * @param array $jobs
+ * @throws InvalidArgumentException
+ */
+ private function assertValidJobs( array $jobs ) {
+ foreach ( $jobs as $job ) { // sanity checks
+ if ( !( $job instanceof IJobSpecification ) ) {
+ throw new InvalidArgumentException( "Expected IJobSpecification objects" );
+ }
+ }
+ }
+
+ function __destruct() {
+ $n = count( $this->bufferedJobs );
+ if ( $n > 0 ) {
+ $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
+ trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
+ }
+ }
}