use Kafka\MetaDataFromKafka;
use Kafka\Produce;
+use Kafka\Protocol\Decoder;
use MediaWiki\Logger\LoggerFactory;
use Monolog\Handler\AbstractProcessingHandler;
use Monolog\Logger;
'alias' => [], // map from monolog channel to kafka topic
'swallowExceptions' => false, // swallow exceptions sending records
'logExceptions' => null, // A PSR3 logger to inform about errors
+ 'requireAck' => 0,
];
/**
$options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
}
+ if ( isset( $options['requireAck'] ) ) {
+ $produce->setRequireAck( $options['requireAck'] );
+ }
+
return new self( $produce, $options, $level, $bubble );
}
*/
protected function send() {
try {
- $this->produce->send();
+ $response = $this->produce->send();
} catch ( \Kafka\Exception $e ) {
$ignore = $this->warning(
'Error sending records to kafka: {exception}',
[ 'exception' => $e ] );
if ( !$ignore ) {
throw $e;
+ } else {
+ return;
+ }
+ }
+
+ if ( is_bool( $response ) ) {
+ return;
+ }
+
+ $errors = [];
+ foreach ( $response as $topicName => $partitionResponse ) {
+ foreach ( $partitionResponse as $partition => $info ) {
+ if ( $info['errCode'] === 0 ) {
+ // no error
+ continue;
+ }
+ $errors[] = sprintf(
+ 'Error producing to %s (errno %d): %s',
+ $topicName,
+ $info['errCode'],
+ Decoder::getError( $info['errCode'] )
+ );
+ }
+ }
+
+ if ( $errors ) {
+ $error = implode( "\n", $errors );
+ if ( !$this->warning( $error ) ) {
+ throw new \RuntimeException( $error );
}
}
}