Merge "Try to predict the rev_id when preparing edits"
[lhc/web/wiklou.git] / includes / debug / logger / monolog / KafkaHandler.php
index 2465918..432a9e1 100644 (file)
@@ -22,6 +22,7 @@ namespace MediaWiki\Logger\Monolog;
 
 use Kafka\MetaDataFromKafka;
 use Kafka\Produce;
+use Kafka\Protocol\Decoder;
 use MediaWiki\Logger\LoggerFactory;
 use Monolog\Handler\AbstractProcessingHandler;
 use Monolog\Logger;
@@ -59,16 +60,17 @@ class KafkaHandler extends AbstractProcessingHandler {
        /**
         * @var array Map from topic name to partition this request produces to
         */
-       protected $partitions = array();
+       protected $partitions = [];
 
        /**
         * @var array defaults for constructor options
         */
-       private static $defaultOptions = array(
-               'alias' => array(), // map from monolog channel to kafka topic
+       private static $defaultOptions = [
+               'alias' => [], // map from monolog channel to kafka topic
                'swallowExceptions' => false, // swallow exceptions sending records
                'logExceptions' => null, // A PSR3 logger to inform about errors
-       );
+               'requireAck' => 0,
+       ];
 
        /**
         * @param Produce $produce Kafka instance to produce through
@@ -95,7 +97,7 @@ class KafkaHandler extends AbstractProcessingHandler {
         * @return KafkaHandler
         */
        public static function factory(
-               $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true
+               $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
        ) {
                $metadata = new MetaDataFromKafka( $kafkaServers );
                $produce = new Produce( $metadata );
@@ -118,6 +120,10 @@ class KafkaHandler extends AbstractProcessingHandler {
                        $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
                }
 
+               if ( isset( $options['requireAck'] ) ) {
+                       $produce->setRequireAck( $options['requireAck'] );
+               }
+
                return new self( $produce, $options, $level, $bubble );
        }
 
@@ -126,7 +132,7 @@ class KafkaHandler extends AbstractProcessingHandler {
         */
        protected function write( array $record ) {
                if ( $record['formatted'] !== null ) {
-                       $this->addMessages( $record['channel'], array( $record['formatted'] ) );
+                       $this->addMessages( $record['channel'], [ $record['formatted'] ] );
                        $this->send();
                }
        }
@@ -135,7 +141,7 @@ class KafkaHandler extends AbstractProcessingHandler {
         * {@inheritDoc}
         */
        public function handleBatch( array $batch ) {
-               $channels = array();
+               $channels = [];
                foreach ( $batch as $record ) {
                        if ( $record['level'] < $this->level ) {
                                continue;
@@ -145,7 +151,7 @@ class KafkaHandler extends AbstractProcessingHandler {
 
                $formatter = $this->getFormatter();
                foreach ( $channels as $channel => $records ) {
-                       $messages = array();
+                       $messages = [];
                        foreach ( $records as $idx => $record ) {
                                $message = $formatter->format( $record );
                                if ( $message !== null ) {
@@ -165,13 +171,42 @@ class KafkaHandler extends AbstractProcessingHandler {
         */
        protected function send() {
                try {
-                       $this->produce->send();
+                       $response = $this->produce->send();
                } catch ( \Kafka\Exception $e ) {
                        $ignore = $this->warning(
                                'Error sending records to kafka: {exception}',
-                               array( 'exception' => $e ) );
+                               [ 'exception' => $e ] );
                        if ( !$ignore ) {
                                throw $e;
+                       } else {
+                               return;
+                       }
+               }
+
+               if ( is_bool( $response ) ) {
+                       return;
+               }
+
+               $errors = [];
+               foreach ( $response as $topicName => $partitionResponse ) {
+                       foreach ( $partitionResponse as $partition => $info ) {
+                               if ( $info['errCode'] === 0 ) {
+                                       // no error
+                                       continue;
+                               }
+                               $errors[] = sprintf(
+                                       'Error producing to %s (errno %d): %s',
+                                       $topicName,
+                                       $info['errCode'],
+                                       Decoder::getError( $info['errCode'] )
+                               );
+                       }
+               }
+
+               if ( $errors ) {
+                       $error = implode( "\n", $errors );
+                       if ( !$this->warning( $error ) ) {
+                               throw new \RuntimeException( $error );
                        }
                }
        }
@@ -188,7 +223,7 @@ class KafkaHandler extends AbstractProcessingHandler {
                        } catch ( \Kafka\Exception $e ) {
                                $ignore = $this->warning(
                                        'Error getting metadata for kafka topic {topic}: {exception}',
-                                       array( 'topic' => $topic, 'exception' => $e ) );
+                                       [ 'topic' => $topic, 'exception' => $e ] );
                                if ( $ignore ) {
                                        return null;
                                }
@@ -201,7 +236,7 @@ class KafkaHandler extends AbstractProcessingHandler {
                                $details = $this->produce->getClient()->getTopicDetail( $topic );
                                $ignore = $this->warning(
                                        'No partitions available for kafka topic {topic}',
-                                       array( 'topic' => $topic, 'kafka' => $details )
+                                       [ 'topic' => $topic, 'kafka' => $details ]
                                );
                                if ( !$ignore ) {
                                        throw new \RuntimeException( "No partitions available for kafka topic $topic" );
@@ -235,7 +270,7 @@ class KafkaHandler extends AbstractProcessingHandler {
         * @param array $context PSR3 compatible log context
         * @return bool true if caller should ignore warning
         */
-       protected function warning( $message, array $context = array() ) {
+       protected function warning( $message, array $context = [] ) {
                if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
                        $this->options['logExceptions']->warning( $message, $context );
                }