Merge "Try to predict the rev_id when preparing edits"
[lhc/web/wiklou.git] / includes / debug / logger / monolog / KafkaHandler.php
index 0fd3b08..432a9e1 100644 (file)
@@ -22,6 +22,7 @@ namespace MediaWiki\Logger\Monolog;
 
 use Kafka\MetaDataFromKafka;
 use Kafka\Produce;
+use Kafka\Protocol\Decoder;
 use MediaWiki\Logger\LoggerFactory;
 use Monolog\Handler\AbstractProcessingHandler;
 use Monolog\Logger;
@@ -68,6 +69,7 @@ class KafkaHandler extends AbstractProcessingHandler {
                'alias' => [], // map from monolog channel to kafka topic
                'swallowExceptions' => false, // swallow exceptions sending records
                'logExceptions' => null, // A PSR3 logger to inform about errors
+               'requireAck' => 0,
        ];
 
        /**
@@ -118,6 +120,10 @@ class KafkaHandler extends AbstractProcessingHandler {
                        $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
                }
 
+               if ( isset( $options['requireAck'] ) ) {
+                       $produce->setRequireAck( $options['requireAck'] );
+               }
+
                return new self( $produce, $options, $level, $bubble );
        }
 
@@ -165,13 +171,42 @@ class KafkaHandler extends AbstractProcessingHandler {
         */
        protected function send() {
                try {
-                       $this->produce->send();
+                       $response = $this->produce->send();
                } catch ( \Kafka\Exception $e ) {
                        $ignore = $this->warning(
                                'Error sending records to kafka: {exception}',
                                [ 'exception' => $e ] );
                        if ( !$ignore ) {
                                throw $e;
+                       } else {
+                               return;
+                       }
+               }
+
+               if ( is_bool( $response ) ) {
+                       return;
+               }
+
+               $errors = [];
+               foreach ( $response as $topicName => $partitionResponse ) {
+                       foreach ( $partitionResponse as $partition => $info ) {
+                               if ( $info['errCode'] === 0 ) {
+                                       // no error
+                                       continue;
+                               }
+                               $errors[] = sprintf(
+                                       'Error producing to %s (errno %d): %s',
+                                       $topicName,
+                                       $info['errCode'],
+                                       Decoder::getError( $info['errCode'] )
+                               );
+                       }
+               }
+
+               if ( $errors ) {
+                       $error = implode( "\n", $errors );
+                       if ( !$this->warning( $error ) ) {
+                               throw new \RuntimeException( $error );
                        }
                }
        }