use Kafka\MetaDataFromKafka;
use Kafka\Produce;
+use Kafka\Protocol\Decoder;
use MediaWiki\Logger\LoggerFactory;
use Monolog\Handler\AbstractProcessingHandler;
use Monolog\Logger;
*
* Constructor options array arguments:
* * alias: map from monolog channel to kafka topic name. When no
- * alias exists the topic "monolog_$channel" will be used.
+ * alias exists the topic "monolog_$channel" will be used.
* * swallowExceptions: Swallow exceptions that occur while talking to
- * kafka. Defaults to false.
+ * kafka. Defaults to false.
* * logExceptions: Log exceptions talking to kafka here. Either null,
- * the name of a channel to log to, or an object implementing
- * FormatterInterface. Defaults to null.
+ * the name of a channel to log to, or an object implementing
+ * FormatterInterface. Defaults to null.
*
* Requires the nmred/kafka-php library, version >= 1.3.0
*
'alias' => [], // map from monolog channel to kafka topic
'swallowExceptions' => false, // swallow exceptions sending records
'logExceptions' => null, // A PSR3 logger to inform about errors
+ 'requireAck' => 0,
];
/**
$options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
}
+ if ( isset( $options['requireAck'] ) ) {
+ $produce->setRequireAck( $options['requireAck'] );
+ }
+
return new self( $produce, $options, $level, $bubble );
}
*/
protected function send() {
try {
- $this->produce->send();
+ $response = $this->produce->send();
} catch ( \Kafka\Exception $e ) {
$ignore = $this->warning(
'Error sending records to kafka: {exception}',
[ 'exception' => $e ] );
if ( !$ignore ) {
throw $e;
+ } else {
+ return;
+ }
+ }
+
+ if ( is_bool( $response ) ) {
+ return;
+ }
+
+ $errors = [];
+ foreach ( $response as $topicName => $partitionResponse ) {
+ foreach ( $partitionResponse as $partition => $info ) {
+ if ( $info['errCode'] === 0 ) {
+ // no error
+ continue;
+ }
+ $errors[] = sprintf(
+ 'Error producing to %s (errno %d): %s',
+ $topicName,
+ $info['errCode'],
+ Decoder::getError( $info['errCode'] )
+ );
+ }
+ }
+
+ if ( $errors ) {
+ $error = implode( "\n", $errors );
+ if ( !$this->warning( $error ) ) {
+ throw new \RuntimeException( $error );
}
}
}