use Wikimedia\Rdbms\DBConnRef;
use Wikimedia\Rdbms\MaintainableDBConnRef;
use Wikimedia\Rdbms\DatabaseDomain;
+use Wikimedia\Rdbms\DBUnexpectedError;
/**
* DB accessible external objects.
*/
public function store( $location, $data ) {
$dbw = $this->getMaster( $location );
- $dbw->insert( $this->getTable( $dbw ), [ 'blob_text' => $data ], __METHOD__ );
+ $dbw->insert(
+ $this->getTable( $dbw, $location ),
+ [ 'blob_text' => $data ],
+ __METHOD__
+ );
$id = $dbw->insertId();
if ( !$id ) {
throw new MWException( __METHOD__ . ': no insert ID' );
/**
* Get a replica DB connection for the specified cluster
*
+ * @since 1.34
* @param string $cluster Cluster name
* @return DBConnRef
*/
- public function getSlave( $cluster ) {
+ public function getReplica( $cluster ) {
$lb = $this->getLoadBalancer( $cluster );
return $lb->getConnectionRef(
);
}
+ /**
+ * Get a replica DB connection for the specified cluster
+ *
+ * @param string $cluster Cluster name
+ * @return DBConnRef
+ * @deprecated since 1.34
+ */
+ public function getSlave( $cluster ) {
+ return $this->getReplica( $cluster );
+ }
+
/**
* Get a master database connection for the specified cluster
*
* Get the 'blobs' table name for this database
*
* @param IDatabase $db
+ * @param string|null $cluster Cluster name
* @return string Table name ('blobs' by default)
*/
- public function getTable( $db ) {
- $table = $db->getLBInfo( 'blobs table' );
- if ( is_null( $table ) ) {
- $table = 'blobs';
+ public function getTable( $db, $cluster = null ) {
+ if ( $cluster !== null ) {
+ $lb = $this->getLoadBalancer( $cluster );
+ $info = $lb->getServerInfo( $lb->getWriterIndex() );
+ if ( isset( $info['blobs table'] ) ) {
+ return $info['blobs table'];
+ }
}
- return $table;
+ return $db->getLBInfo( 'blobs table' ) ?? 'blobs'; // b/c
+ }
+
+ /**
+ * Create the appropriate blobs table on this cluster
+ *
+ * @see getTable()
+ * @since 1.34
+ * @param string $cluster
+ */
+ public function initializeTable( $cluster ) {
+ global $IP;
+
+ static $supportedTypes = [ 'mysql', 'sqlite' ];
+
+ $dbw = $this->getMaster( $cluster );
+ if ( !in_array( $dbw->getType(), $supportedTypes, true ) ) {
+ throw new DBUnexpectedError( $dbw, "RDBMS type '{$dbw->getType()}' not supported." );
+ }
+
+ $sqlFilePath = "$IP/maintenance/storage/blobs.sql";
+ $sql = file_get_contents( $sqlFilePath );
+ if ( $sql === false ) {
+ throw new RuntimeException( "Failed to read '$sqlFilePath'." );
+ }
+
+ $rawTable = $this->getTable( $dbw, $cluster ); // e.g. "blobs_cluster23"
+ $encTable = $dbw->tableName( $rawTable );
+ $dbw->query(
+ str_replace(
+ [ '/*$wgDBprefix*/blobs', '/*_*/blobs' ],
+ [ $encTable, $encTable ],
+ $sql
+ ),
+ __METHOD__,
+ $dbw::QUERY_IGNORE_DBO_TRX
+ );
}
/**
$this->logger->debug( "ExternalStoreDB::fetchBlob cache miss on $cacheID" );
- $dbr = $this->getSlave( $cluster );
- $ret = $dbr->selectField( $this->getTable( $dbr ),
- 'blob_text', [ 'blob_id' => $id ], __METHOD__ );
+ $dbr = $this->getReplica( $cluster );
+ $ret = $dbr->selectField(
+ $this->getTable( $dbr, $cluster ),
+ 'blob_text',
+ [ 'blob_id' => $id ],
+ __METHOD__
+ );
if ( $ret === false ) {
$this->logger->info( "ExternalStoreDB::fetchBlob master fallback on $cacheID" );
// Try the master
$dbw = $this->getMaster( $cluster );
- $ret = $dbw->selectField( $this->getTable( $dbw ),
- 'blob_text', [ 'blob_id' => $id ], __METHOD__ );
+ $ret = $dbw->selectField(
+ $this->getTable( $dbw, $cluster ),
+ 'blob_text',
+ [ 'blob_id' => $id ],
+ __METHOD__
+ );
if ( $ret === false ) {
$this->logger->error( "ExternalStoreDB::fetchBlob master failed to find $cacheID" );
}
* Unlocated ids are not represented
*/
private function batchFetchBlobs( $cluster, array $ids ) {
- $dbr = $this->getSlave( $cluster );
+ $dbr = $this->getReplica( $cluster );
$res = $dbr->select(
- $this->getTable( $dbr ),
+ $this->getTable( $dbr, $cluster ),
[ 'blob_id', 'blob_text' ],
[ 'blob_id' => array_keys( $ids ) ],
__METHOD__
);
// Try the master
$dbw = $this->getMaster( $cluster );
- $res = $dbw->select( $this->getTable( $dbr ),
+ $res = $dbw->select(
+ $this->getTable( $dbr, $cluster ),
[ 'blob_id', 'blob_text' ],
[ 'blob_id' => array_keys( $ids ) ],
__METHOD__ );
* @since 1.23
*/
class MultiHttpClient implements LoggerAwareInterface {
- /** @var resource curl_multi_init() handle */
- protected $cmh;
+ /** @var resource */
+ protected $multiHandle = null; // curl_multi handle
/** @var string|null SSL certificates path */
protected $caBundlePath;
/** @var float */
* @endcode
* @param array $req HTTP request array
* @param array $opts
- * - connTimeout : connection timeout per request (seconds)
- * - reqTimeout : post-connection timeout per request (seconds)
- * - usePipelining : whether to use HTTP pipelining if possible (for all hosts)
- * - maxConnsPerHost : maximum number of concurrent connections (per host)
+ * - connTimeout : connection timeout per request (seconds)
+ * - reqTimeout : post-connection timeout per request (seconds)
* @return array Response array for request
*/
public function run( array $req, array $opts = [] ) {
* method/URL entries will also be changed to use the corresponding string keys.
*
* @param array[] $reqs Map of HTTP request arrays
- * @param array $opts Options
+ * @param array $opts
* - connTimeout : connection timeout per request (seconds)
* - reqTimeout : post-connection timeout per request (seconds)
- * - usePipelining : whether to use HTTP pipelining if possible (for all hosts)
+ * - usePipelining : whether to use HTTP pipelining if possible
* - maxConnsPerHost : maximum number of concurrent connections (per host)
* @return array $reqs With response array populated for each
* @throws Exception
* @suppress PhanTypeInvalidDimOffset
*/
private function runMultiCurl( array $reqs, array $opts ) {
- $chm = $this->getCurlMulti( $opts );
+ $chm = $this->getCurlMulti();
$selectTimeout = $this->getSelectTimeout( $opts );
$handles = [];
foreach ( $reqs as $index => &$req ) {
$handles[$index] = $this->getCurlHandle( $req, $opts );
- curl_multi_add_handle( $chm, $handles[$index] );
+ if ( count( $reqs ) > 1 ) {
+ // https://github.com/guzzle/guzzle/issues/349
+ curl_setopt( $handles[$index], CURLOPT_FORBID_REUSE, true );
+ }
}
unset( $req ); // don't assign over this by accident
+ $indexes = array_keys( $reqs );
+ if ( isset( $opts['usePipelining'] ) ) {
+ curl_multi_setopt( $chm, CURLMOPT_PIPELINING, (int)$opts['usePipelining'] );
+ }
+ if ( isset( $opts['maxConnsPerHost'] ) ) {
+ // Keep these sockets around as they may be needed later in the request
+ curl_multi_setopt( $chm, CURLMOPT_MAXCONNECTS, (int)$opts['maxConnsPerHost'] );
+ }
+
+ // @TODO: use a per-host rolling handle window (e.g. CURLMOPT_MAX_HOST_CONNECTIONS)
+ $batches = array_chunk( $indexes, $this->maxConnsPerHost );
$infos = [];
- // Execute the cURL handles concurrently...
- $active = null; // handles still being processed
- do {
- // Do any available work...
+
+ foreach ( $batches as $batch ) {
+ // Attach all cURL handles for this batch
+ foreach ( $batch as $index ) {
+ curl_multi_add_handle( $chm, $handles[$index] );
+ }
+ // Execute the cURL handles concurrently...
+ $active = null; // handles still being processed
do {
- $mrc = curl_multi_exec( $chm, $active );
- $info = curl_multi_info_read( $chm );
- if ( $info !== false ) {
- $infos[(int)$info['handle']] = $info;
+ // Do any available work...
+ do {
+ $mrc = curl_multi_exec( $chm, $active );
+ $info = curl_multi_info_read( $chm );
+ if ( $info !== false ) {
+ $infos[(int)$info['handle']] = $info;
+ }
+ } while ( $mrc == CURLM_CALL_MULTI_PERFORM );
+ // Wait (if possible) for available work...
+ if ( $active > 0 && $mrc == CURLM_OK && curl_multi_select( $chm, $selectTimeout ) == -1 ) {
+ // PHP bug 63411; https://curl.haxx.se/libcurl/c/curl_multi_fdset.html
+ usleep( 5000 ); // 5ms
}
- } while ( $mrc == CURLM_CALL_MULTI_PERFORM );
- // Wait (if possible) for available work...
- if ( $active > 0 && $mrc == CURLM_OK && curl_multi_select( $chm, $selectTimeout ) == -1 ) {
- // PHP bug 63411; https://curl.haxx.se/libcurl/c/curl_multi_fdset.html
- usleep( 5000 ); // 5ms
- }
- } while ( $active > 0 && $mrc == CURLM_OK );
+ } while ( $active > 0 && $mrc == CURLM_OK );
+ }
// Remove all of the added cURL handles and check for errors...
foreach ( $reqs as $index => &$req ) {
}
unset( $req ); // don't assign over this by accident
+ // Restore the default settings
+ curl_multi_setopt( $chm, CURLMOPT_PIPELINING, (int)$this->usePipelining );
+ curl_multi_setopt( $chm, CURLMOPT_MAXCONNECTS, (int)$this->maxConnsPerHost );
+
return $reqs;
}
}
/**
- * @param array $opts
* @return resource
* @throws Exception
*/
- protected function getCurlMulti( array $opts ) {
- if ( !$this->cmh ) {
+ protected function getCurlMulti() {
+ if ( !$this->multiHandle ) {
if ( !function_exists( 'curl_multi_init' ) ) {
throw new Exception( "PHP cURL function curl_multi_init missing. " .
"Check https://www.mediawiki.org/wiki/Manual:CURL" );
}
$cmh = curl_multi_init();
- // Limit the size of the idle connection cache such that consecutive parallel
- // request batches to the same host can avoid having to keep making connections
+ curl_multi_setopt( $cmh, CURLMOPT_PIPELINING, (int)$this->usePipelining );
curl_multi_setopt( $cmh, CURLMOPT_MAXCONNECTS, (int)$this->maxConnsPerHost );
- $this->cmh = $cmh;
+ $this->multiHandle = $cmh;
}
-
- // Limit the number of in-flight requests for any given host
- $maxHostConns = $opts['maxConnsPerHost'] ?? $this->maxConnsPerHost;
- curl_multi_setopt( $this->cmh, CURLMOPT_MAX_HOST_CONNECTIONS, (int)$maxHostConns );
- // Configure when to multiplex multiple requests onto single TCP handles
- $pipelining = $opts['usePipelining'] ?? $this->usePipelining;
- curl_multi_setopt( $this->cmh, CURLMOPT_PIPELINING, (int)$pipelining );
-
- return $this->cmh;
+ return $this->multiHandle;
}
/**
}
function __destruct() {
- if ( $this->cmh ) {
- curl_multi_close( $this->cmh );
+ if ( $this->multiHandle ) {
+ curl_multi_close( $this->multiHandle );
}
}
}