use Kafka\MetaDataFromKafka;
use Kafka\Produce;
+use Kafka\Protocol\Decoder;
use MediaWiki\Logger\LoggerFactory;
use Monolog\Handler\AbstractProcessingHandler;
use Monolog\Logger;
/**
* @var array Map from topic name to partition this request produces to
*/
- protected $partitions = array();
+ protected $partitions = [];
/**
* @var array defaults for constructor options
*/
- private static $defaultOptions = array(
- 'alias' => array(), // map from monolog channel to kafka topic
+ private static $defaultOptions = [
+ 'alias' => [], // map from monolog channel to kafka topic
'swallowExceptions' => false, // swallow exceptions sending records
'logExceptions' => null, // A PSR3 logger to inform about errors
- );
+ 'requireAck' => 0,
+ ];
/**
* @param Produce $produce Kafka instance to produce through
* @return KafkaHandler
*/
public static function factory(
- $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true
+ $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
) {
$metadata = new MetaDataFromKafka( $kafkaServers );
$produce = new Produce( $metadata );
$options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
}
+ if ( isset( $options['requireAck'] ) ) {
+ $produce->setRequireAck( $options['requireAck'] );
+ }
+
return new self( $produce, $options, $level, $bubble );
}
*/
protected function write( array $record ) {
if ( $record['formatted'] !== null ) {
- $this->addMessages( $record['channel'], array( $record['formatted'] ) );
+ $this->addMessages( $record['channel'], [ $record['formatted'] ] );
$this->send();
}
}
* {@inheritDoc}
*/
public function handleBatch( array $batch ) {
- $channels = array();
+ $channels = [];
foreach ( $batch as $record ) {
if ( $record['level'] < $this->level ) {
continue;
$formatter = $this->getFormatter();
foreach ( $channels as $channel => $records ) {
- $messages = array();
+ $messages = [];
foreach ( $records as $idx => $record ) {
$message = $formatter->format( $record );
if ( $message !== null ) {
*/
protected function send() {
try {
- $this->produce->send();
+ $response = $this->produce->send();
} catch ( \Kafka\Exception $e ) {
$ignore = $this->warning(
'Error sending records to kafka: {exception}',
- array( 'exception' => $e ) );
+ [ 'exception' => $e ] );
if ( !$ignore ) {
throw $e;
+ } else {
+ return;
+ }
+ }
+
+ if ( is_bool( $response ) ) {
+ return;
+ }
+
+ $errors = [];
+ foreach ( $response as $topicName => $partitionResponse ) {
+ foreach ( $partitionResponse as $partition => $info ) {
+ if ( $info['errCode'] === 0 ) {
+ // no error
+ continue;
+ }
+ $errors[] = sprintf(
+ 'Error producing to %s (errno %d): %s',
+ $topicName,
+ $info['errCode'],
+ Decoder::getError( $info['errCode'] )
+ );
+ }
+ }
+
+ if ( $errors ) {
+ $error = implode( "\n", $errors );
+ if ( !$this->warning( $error ) ) {
+ throw new \RuntimeException( $error );
}
}
}
} catch ( \Kafka\Exception $e ) {
$ignore = $this->warning(
'Error getting metadata for kafka topic {topic}: {exception}',
- array( 'topic' => $topic, 'exception' => $e ) );
+ [ 'topic' => $topic, 'exception' => $e ] );
if ( $ignore ) {
return null;
}
$details = $this->produce->getClient()->getTopicDetail( $topic );
$ignore = $this->warning(
'No partitions available for kafka topic {topic}',
- array( 'topic' => $topic, 'kafka' => $details )
+ [ 'topic' => $topic, 'kafka' => $details ]
);
if ( !$ignore ) {
throw new \RuntimeException( "No partitions available for kafka topic $topic" );
* @param array $context PSR3 compatible log context
* @return bool true if caller should ignore warning
*/
- protected function warning( $message, array $context = array() ) {
+ protected function warning( $message, array $context = [] ) {
if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
$this->options['logExceptions']->warning( $message, $context );
}