Merge "Handle missing namespace prefix in XML dumps more gracefully"
[lhc/web/wiklou.git] / includes / debug / logger / monolog / KafkaHandler.php
index 0fd3b08..6670fe9 100644 (file)
@@ -22,6 +22,7 @@ namespace MediaWiki\Logger\Monolog;
 
 use Kafka\MetaDataFromKafka;
 use Kafka\Produce;
+use Kafka\Protocol\Decoder;
 use MediaWiki\Logger\LoggerFactory;
 use Monolog\Handler\AbstractProcessingHandler;
 use Monolog\Logger;
@@ -32,12 +33,12 @@ use Psr\Log\LoggerInterface;
  *
  * Constructor options array arguments:
  * * alias: map from monolog channel to kafka topic name. When no
- *       alias exists the topic "monolog_$channel" will be used.
+ *   alias exists the topic "monolog_$channel" will be used.
  * * swallowExceptions: Swallow exceptions that occur while talking to
- *    kafka. Defaults to false.
+ *   kafka. Defaults to false.
  * * logExceptions: Log exceptions talking to kafka here. Either null,
- *    the name of a channel to log to, or an object implementing
- *    FormatterInterface. Defaults to null.
+ *   the name of a channel to log to, or an object implementing
+ *   FormatterInterface. Defaults to null.
  *
  * Requires the nmred/kafka-php library, version >= 1.3.0
  *
@@ -68,6 +69,7 @@ class KafkaHandler extends AbstractProcessingHandler {
                'alias' => [], // map from monolog channel to kafka topic
                'swallowExceptions' => false, // swallow exceptions sending records
                'logExceptions' => null, // A PSR3 logger to inform about errors
+               'requireAck' => 0,
        ];
 
        /**
@@ -118,6 +120,10 @@ class KafkaHandler extends AbstractProcessingHandler {
                        $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
                }
 
+               if ( isset( $options['requireAck'] ) ) {
+                       $produce->setRequireAck( $options['requireAck'] );
+               }
+
                return new self( $produce, $options, $level, $bubble );
        }
 
@@ -165,13 +171,42 @@ class KafkaHandler extends AbstractProcessingHandler {
         */
        protected function send() {
                try {
-                       $this->produce->send();
+                       $response = $this->produce->send();
                } catch ( \Kafka\Exception $e ) {
                        $ignore = $this->warning(
                                'Error sending records to kafka: {exception}',
                                [ 'exception' => $e ] );
                        if ( !$ignore ) {
                                throw $e;
+                       } else {
+                               return;
+                       }
+               }
+
+               if ( is_bool( $response ) ) {
+                       return;
+               }
+
+               $errors = [];
+               foreach ( $response as $topicName => $partitionResponse ) {
+                       foreach ( $partitionResponse as $partition => $info ) {
+                               if ( $info['errCode'] === 0 ) {
+                                       // no error
+                                       continue;
+                               }
+                               $errors[] = sprintf(
+                                       'Error producing to %s (errno %d): %s',
+                                       $topicName,
+                                       $info['errCode'],
+                                       Decoder::getError( $info['errCode'] )
+                               );
+                       }
+               }
+
+               if ( $errors ) {
+                       $error = implode( "\n", $errors );
+                       if ( !$this->warning( $error ) ) {
+                               throw new \RuntimeException( $error );
                        }
                }
        }