From c6b668c2eca22549d5112cc44cef573cf2dea74c Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Mon, 16 Apr 2018 13:38:01 -0700 Subject: [PATCH] Do not start explicit transaction rounds for RecentChangesUpdateJob The replaces the hacky use of onTransactionIdle(), which no longer runs immediately in explicit transaction rounds since d4c31cf841. Also clarified TransactionRoundDefiningUpdate comment about rounds. Change-Id: Ie17eacdcaea4e47019cc94e1c7beed9d7fec5cf2 --- .../TransactionRoundDefiningUpdate.php | 3 +- includes/jobqueue/Job.php | 15 ++ includes/jobqueue/JobRunner.php | 4 +- .../jobqueue/jobs/RecentChangesUpdateJob.php | 223 +++++++++--------- 4 files changed, 128 insertions(+), 117 deletions(-) diff --git a/includes/deferred/TransactionRoundDefiningUpdate.php b/includes/deferred/TransactionRoundDefiningUpdate.php index 65baec5d51..a32d4a0703 100644 --- a/includes/deferred/TransactionRoundDefiningUpdate.php +++ b/includes/deferred/TransactionRoundDefiningUpdate.php @@ -1,8 +1,7 @@ executionFlags && $flag ) === $flag; + } + /** * Batch-insert a group of jobs into the queue. * This will be wrapped in a transaction with a forced commit. diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php index fa7d605731..977fbdaaa5 100644 --- a/includes/jobqueue/JobRunner.php +++ b/includes/jobqueue/JobRunner.php @@ -290,7 +290,9 @@ class JobRunner implements LoggerAwareInterface { $jobStartTime = microtime( true ); try { $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope - $lbFactory->beginMasterChanges( $fnameTrxOwner ); + if ( !$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) { + $lbFactory->beginMasterChanges( $fnameTrxOwner ); + } $status = $job->run(); $error = $job->getLastError(); $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner ); diff --git a/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/includes/jobqueue/jobs/RecentChangesUpdateJob.php index 77daca7676..8f508283d7 100644 --- a/includes/jobqueue/jobs/RecentChangesUpdateJob.php +++ b/includes/jobqueue/jobs/RecentChangesUpdateJob.php @@ -35,6 +35,7 @@ class RecentChangesUpdateJob extends Job { throw new Exception( "Missing 'type' parameter." ); } + $this->executionFlags |= self::JOB_NO_EXPLICIT_TRX_ROUND; $this->removeDuplicates = true; } @@ -127,124 +128,118 @@ class RecentChangesUpdateJob extends Job { $window = $wgActiveUserDays * 86400; $dbw = wfGetDB( DB_MASTER ); - // JobRunner uses DBO_TRX, but doesn't call begin/commit itself; - // onTransactionIdle() will run immediately since there is no trx. - $dbw->onTransactionIdle( - function () use ( $dbw, $days, $window ) { - $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); - $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); - // Avoid disconnect/ping() cycle that makes locks fall off - $dbw->setSessionOptions( [ 'connTimeout' => 900 ] ); - - $lockKey = wfWikiID() . '-activeusers'; - if ( !$dbw->lock( $lockKey, __METHOD__, 0 ) ) { - // Exclusive update (avoids duplicate entries)… it's usually fine to just drop out here, - // if the Job is already running. - return; - } + $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); - $nowUnix = time(); - // Get the last-updated timestamp for the cache - $cTime = $dbw->selectField( 'querycache_info', - 'qci_timestamp', - [ 'qci_type' => 'activeusers' ] - ); - $cTimeUnix = $cTime ? wfTimestamp( TS_UNIX, $cTime ) : 1; - - // Pick the date range to fetch from. This is normally from the last - // update to till the present time, but has a limited window for sanity. - // If the window is limited, multiple runs are need to fully populate it. - $sTimestamp = max( $cTimeUnix, $nowUnix - $days * 86400 ); - $eTimestamp = min( $sTimestamp + $window, $nowUnix ); - - // Get all the users active since the last update - $actorQuery = ActorMigration::newMigration()->getJoin( 'rc_user' ); - $res = $dbw->select( - [ 'recentchanges' ] + $actorQuery['tables'], - [ - 'rc_user_text' => $actorQuery['fields']['rc_user_text'], - 'lastedittime' => 'MAX(rc_timestamp)' - ], - [ - $actorQuery['fields']['rc_user'] . ' > 0', // actual accounts - 'rc_type != ' . $dbw->addQuotes( RC_EXTERNAL ), // no wikidata - 'rc_log_type IS NULL OR rc_log_type != ' . $dbw->addQuotes( 'newusers' ), - 'rc_timestamp >= ' . $dbw->addQuotes( $dbw->timestamp( $sTimestamp ) ), - 'rc_timestamp <= ' . $dbw->addQuotes( $dbw->timestamp( $eTimestamp ) ) - ], - __METHOD__, - [ - 'GROUP BY' => [ 'rc_user_text' ], - 'ORDER BY' => 'NULL' // avoid filesort - ], - $actorQuery['joins'] - ); - $names = []; - foreach ( $res as $row ) { - $names[$row->rc_user_text] = $row->lastedittime; - } + $lockKey = wfWikiID() . '-activeusers'; + if ( !$dbw->lock( $lockKey, __METHOD__, 0 ) ) { + // Exclusive update (avoids duplicate entries)… it's usually fine to just + // drop out here, if the Job is already running. + return; + } - // Find which of the recently active users are already accounted for - if ( count( $names ) ) { - $res = $dbw->select( 'querycachetwo', - [ 'user_name' => 'qcc_title' ], - [ - 'qcc_type' => 'activeusers', - 'qcc_namespace' => NS_USER, - 'qcc_title' => array_keys( $names ), - 'qcc_value >= ' . $dbw->addQuotes( $nowUnix - $days * 86400 ), // TS_UNIX - ], - __METHOD__ - ); - // Note: In order for this to be actually consistent, we would need - // to update these rows with the new lastedittime. - foreach ( $res as $row ) { - unset( $names[$row->user_name] ); - } - } + // Long-running queries expected + $dbw->setSessionOptions( [ 'connTimeout' => 900 ] ); - // Insert the users that need to be added to the list - if ( count( $names ) ) { - $newRows = []; - foreach ( $names as $name => $lastEditTime ) { - $newRows[] = [ - 'qcc_type' => 'activeusers', - 'qcc_namespace' => NS_USER, - 'qcc_title' => $name, - 'qcc_value' => wfTimestamp( TS_UNIX, $lastEditTime ), - 'qcc_namespacetwo' => 0, // unused - 'qcc_titletwo' => '' // unused - ]; - } - foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) { - $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ ); - $factory->commitAndWaitForReplication( __METHOD__, $ticket ); - } - } + $nowUnix = time(); + // Get the last-updated timestamp for the cache + $cTime = $dbw->selectField( 'querycache_info', + 'qci_timestamp', + [ 'qci_type' => 'activeusers' ] + ); + $cTimeUnix = $cTime ? wfTimestamp( TS_UNIX, $cTime ) : 1; + + // Pick the date range to fetch from. This is normally from the last + // update to till the present time, but has a limited window for sanity. + // If the window is limited, multiple runs are need to fully populate it. + $sTimestamp = max( $cTimeUnix, $nowUnix - $days * 86400 ); + $eTimestamp = min( $sTimestamp + $window, $nowUnix ); + + // Get all the users active since the last update + $actorQuery = ActorMigration::newMigration()->getJoin( 'rc_user' ); + $res = $dbw->select( + [ 'recentchanges' ] + $actorQuery['tables'], + [ + 'rc_user_text' => $actorQuery['fields']['rc_user_text'], + 'lastedittime' => 'MAX(rc_timestamp)' + ], + [ + $actorQuery['fields']['rc_user'] . ' > 0', // actual accounts + 'rc_type != ' . $dbw->addQuotes( RC_EXTERNAL ), // no wikidata + 'rc_log_type IS NULL OR rc_log_type != ' . $dbw->addQuotes( 'newusers' ), + 'rc_timestamp >= ' . $dbw->addQuotes( $dbw->timestamp( $sTimestamp ) ), + 'rc_timestamp <= ' . $dbw->addQuotes( $dbw->timestamp( $eTimestamp ) ) + ], + __METHOD__, + [ + 'GROUP BY' => [ 'rc_user_text' ], + 'ORDER BY' => 'NULL' // avoid filesort + ], + $actorQuery['joins'] + ); + $names = []; + foreach ( $res as $row ) { + $names[$row->rc_user_text] = $row->lastedittime; + } + + // Find which of the recently active users are already accounted for + if ( count( $names ) ) { + $res = $dbw->select( 'querycachetwo', + [ 'user_name' => 'qcc_title' ], + [ + 'qcc_type' => 'activeusers', + 'qcc_namespace' => NS_USER, + 'qcc_title' => array_keys( $names ), + 'qcc_value >= ' . $dbw->addQuotes( $nowUnix - $days * 86400 ), // TS_UNIX + ], + __METHOD__ + ); + // Note: In order for this to be actually consistent, we would need + // to update these rows with the new lastedittime. + foreach ( $res as $row ) { + unset( $names[$row->user_name] ); + } + } + + // Insert the users that need to be added to the list + if ( count( $names ) ) { + $newRows = []; + foreach ( $names as $name => $lastEditTime ) { + $newRows[] = [ + 'qcc_type' => 'activeusers', + 'qcc_namespace' => NS_USER, + 'qcc_title' => $name, + 'qcc_value' => wfTimestamp( TS_UNIX, $lastEditTime ), + 'qcc_namespacetwo' => 0, // unused + 'qcc_titletwo' => '' // unused + ]; + } + foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) { + $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ ); + $factory->commitAndWaitForReplication( __METHOD__, $ticket ); + } + } + + // If a transaction was already started, it might have an old + // snapshot, so kludge the timestamp range back as needed. + $asOfTimestamp = min( $eTimestamp, (int)$dbw->trxTimestamp() ); + + // Touch the data freshness timestamp + $dbw->replace( 'querycache_info', + [ 'qci_type' ], + [ 'qci_type' => 'activeusers', + 'qci_timestamp' => $dbw->timestamp( $asOfTimestamp ) ], // not always $now + __METHOD__ + ); + + $dbw->unlock( $lockKey, __METHOD__ ); - // If a transaction was already started, it might have an old - // snapshot, so kludge the timestamp range back as needed. - $asOfTimestamp = min( $eTimestamp, (int)$dbw->trxTimestamp() ); - - // Touch the data freshness timestamp - $dbw->replace( 'querycache_info', - [ 'qci_type' ], - [ 'qci_type' => 'activeusers', - 'qci_timestamp' => $dbw->timestamp( $asOfTimestamp ) ], // not always $now - __METHOD__ - ); - - $dbw->unlock( $lockKey, __METHOD__ ); - - // Rotate out users that have not edited in too long (according to old data set) - $dbw->delete( 'querycachetwo', - [ - 'qcc_type' => 'activeusers', - 'qcc_value < ' . $dbw->addQuotes( $nowUnix - $days * 86400 ) // TS_UNIX - ], - __METHOD__ - ); - }, + // Rotate out users that have not edited in too long (according to old data set) + $dbw->delete( 'querycachetwo', + [ + 'qcc_type' => 'activeusers', + 'qcc_value < ' . $dbw->addQuotes( $nowUnix - $days * 86400 ) // TS_UNIX + ], __METHOD__ ); } -- 2.20.1