use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
+use Wikimedia\WaitConditionLoop;
/**
* interface is intended to be more or less compatible with
abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
/** @var array[] Lock tracking */
protected $locks = [];
-
- /** @var integer */
+ /** @var integer ERR_* class constant */
protected $lastError = self::ERR_NONE;
-
/** @var string */
protected $keyspace = 'local';
-
/** @var LoggerInterface */
protected $logger;
-
/** @var callback|null */
protected $asyncHandler;
+ /** @var integer Seconds */
+ protected $syncTimeout;
/** @var bool */
private $debugMode = false;
-
/** @var array */
private $duplicateKeyLookups = [];
-
/** @var bool */
private $reportDupes = false;
-
/** @var bool */
private $dupeTrackScheduled = false;
+ /** @var callable[] */
+ protected $busyCallbacks = [];
+
+ /** @var integer[] Map of (ATTR_* class constant => QOS_* class constant) */
+ protected $attrMap = [];
+
/** Possible values for getLastError() */
const ERR_NONE = 0; // no error
const ERR_NO_RESPONSE = 1; // no response
* In CLI mode, it should run the task immediately.
* - reportDupes: Whether to emit warning log messages for all keys that were
* requested more than once (requires an asyncHandler).
+ * - syncTimeout: How long to wait with WRITE_SYNC in seconds.
* @param array $params
*/
public function __construct( array $params = [] ) {
if ( !empty( $params['reportDupes'] ) && is_callable( $this->asyncHandler ) ) {
$this->reportDupes = true;
}
+
+ $this->syncTimeout = isset( $params['syncTimeout'] ) ? $params['syncTimeout'] : 3;
}
/**
return $success;
}
+ /**
+ * Reset the TTL on a key if it exists
+ *
+ * @param string $key
+ * @param int $expiry
+ * @return bool Success Returns false if there is no key
+ * @since 1.28
+ */
+ public function changeTTL( $key, $expiry = 0 ) {
+ $value = $this->get( $key );
+
+ return ( $value === false ) ? false : $this->set( $key, $value, $expiry );
+ }
+
/**
* Acquire an advisory lock on a key string
*
}
$expiry = min( $expiry ?: INF, self::TTL_DAY );
-
- $this->clearLastError();
- $timestamp = microtime( true ); // starting UNIX timestamp
- if ( $this->add( "{$key}:lock", 1, $expiry ) ) {
- $locked = true;
- } elseif ( $this->getLastError() || $timeout <= 0 ) {
- $locked = false; // network partition or non-blocking
- } else {
- // Estimate the RTT (us); use 1ms minimum for sanity
- $uRTT = max( 1e3, ceil( 1e6 * ( microtime( true ) - $timestamp ) ) );
- $sleep = 2 * $uRTT; // rough time to do get()+set()
-
- $attempts = 0; // failed attempts
- do {
- if ( ++$attempts >= 3 && $sleep <= 5e5 ) {
- // Exponentially back off after failed attempts to avoid network spam.
- // About 2*$uRTT*(2^n-1) us of "sleep" happen for the next n attempts.
- $sleep *= 2;
- }
- usleep( $sleep ); // back off
+ $loop = new WaitConditionLoop(
+ function () use ( $key, $timeout, $expiry ) {
$this->clearLastError();
- $locked = $this->add( "{$key}:lock", 1, $expiry );
- if ( $this->getLastError() ) {
- $locked = false; // network partition
- break;
+ if ( $this->add( "{$key}:lock", 1, $expiry ) ) {
+ return true; // locked!
+ } elseif ( $this->getLastError() ) {
+ return WaitConditionLoop::CONDITION_ABORTED; // network partition?
}
- } while ( !$locked && ( microtime( true ) - $timestamp ) < $timeout );
- }
+ return WaitConditionLoop::CONDITION_CONTINUE;
+ },
+ $timeout
+ );
+
+ $locked = ( $loop->invoke() === $loop::CONDITION_REACHED );
if ( $locked ) {
$this->locks[$key] = [ 'class' => $rclass, 'depth' => 1 ];
}
$this->lastError = $err;
}
+ /**
+ * Let a callback be run to avoid wasting time on special blocking calls
+ *
+ * The callbacks may or may not be called ever, in any particular order.
+ * They are likely to be invoked when something WRITE_SYNC is used used.
+ * They should follow a caching pattern as shown below, so that any code
+ * using the word will get it's result no matter what happens.
+ * @code
+ * $result = null;
+ * $workCallback = function () use ( &$result ) {
+ * if ( !$result ) {
+ * $result = ....
+ * }
+ * return $result;
+ * }
+ * @endcode
+ *
+ * @param callable $workCallback
+ * @since 1.28
+ */
+ public function addBusyCallback( callable $workCallback ) {
+ $this->busyCallbacks[] = $workCallback;
+ }
+
/**
* Modify a cache update operation array for EventRelayer::notify()
*
public function makeKey() {
return $this->makeKeyInternal( $this->keyspace, func_get_args() );
}
+
+ /**
+ * @param integer $flag ATTR_* class constant
+ * @return integer QOS_* class constant
+ * @since 1.28
+ */
+ public function getQoS( $flag ) {
+ return isset( $this->attrMap[$flag] ) ? $this->attrMap[$flag] : self::QOS_UNKNOWN;
+ }
+
+ /**
+ * Merge the flag maps of one or more BagOStuff objects into a "lowest common denominator" map
+ *
+ * @param BagOStuff[] $bags
+ * @return integer[] Resulting flag map (class ATTR_* constant => class QOS_* constant)
+ */
+ protected function mergeFlagMaps( array $bags ) {
+ $map = [];
+ foreach ( $bags as $bag ) {
+ foreach ( $bag->attrMap as $attr => $rank ) {
+ if ( isset( $map[$attr] ) ) {
+ $map[$attr] = min( $map[$attr], $rank );
+ } else {
+ $map[$attr] = $rank;
+ }
+ }
+ }
+
+ return $map;
+ }
}