kafka: Implement ack handling
authorErik Bernhardson <ebernhardson@wikimedia.org>
Fri, 3 Jun 2016 00:44:43 +0000 (17:44 -0700)
committerErik Bernhardson <ebernhardson@wikimedia.org>
Thu, 23 Jun 2016 15:12:29 +0000 (08:12 -0700)
By default the kafka implementation we use doesn't require any kind of
acknowledgment, it just throws messages into the wind and lets them sit
where they may. Implement an option for KafkaHandler to specify the
number of acks (number of replicas that must record the message) and
some error handling to throw exceptions as necessary when there is a
problem.

Bug: T135159
Change-Id: I859dc791072db407f908b2f36be0d6704f1a6256

includes/debug/logger/monolog/KafkaHandler.php
tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php

index 0fd3b08..432a9e1 100644 (file)
@@ -22,6 +22,7 @@ namespace MediaWiki\Logger\Monolog;
 
 use Kafka\MetaDataFromKafka;
 use Kafka\Produce;
+use Kafka\Protocol\Decoder;
 use MediaWiki\Logger\LoggerFactory;
 use Monolog\Handler\AbstractProcessingHandler;
 use Monolog\Logger;
@@ -68,6 +69,7 @@ class KafkaHandler extends AbstractProcessingHandler {
                'alias' => [], // map from monolog channel to kafka topic
                'swallowExceptions' => false, // swallow exceptions sending records
                'logExceptions' => null, // A PSR3 logger to inform about errors
+               'requireAck' => 0,
        ];
 
        /**
@@ -118,6 +120,10 @@ class KafkaHandler extends AbstractProcessingHandler {
                        $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
                }
 
+               if ( isset( $options['requireAck'] ) ) {
+                       $produce->setRequireAck( $options['requireAck'] );
+               }
+
                return new self( $produce, $options, $level, $bubble );
        }
 
@@ -165,13 +171,42 @@ class KafkaHandler extends AbstractProcessingHandler {
         */
        protected function send() {
                try {
-                       $this->produce->send();
+                       $response = $this->produce->send();
                } catch ( \Kafka\Exception $e ) {
                        $ignore = $this->warning(
                                'Error sending records to kafka: {exception}',
                                [ 'exception' => $e ] );
                        if ( !$ignore ) {
                                throw $e;
+                       } else {
+                               return;
+                       }
+               }
+
+               if ( is_bool( $response ) ) {
+                       return;
+               }
+
+               $errors = [];
+               foreach ( $response as $topicName => $partitionResponse ) {
+                       foreach ( $partitionResponse as $partition => $info ) {
+                               if ( $info['errCode'] === 0 ) {
+                                       // no error
+                                       continue;
+                               }
+                               $errors[] = sprintf(
+                                       'Error producing to %s (errno %d): %s',
+                                       $topicName,
+                                       $info['errCode'],
+                                       Decoder::getError( $info['errCode'] )
+                               );
+                       }
+               }
+
+               if ( $errors ) {
+                       $error = implode( "\n", $errors );
+                       if ( !$this->warning( $error ) ) {
+                               throw new \RuntimeException( $error );
                        }
                }
        }
index e29d207..cf97071 100644 (file)
@@ -58,6 +58,9 @@ class KafkaHandlerTest extends MediaWikiTestCase {
                $produce->expects( $this->once() )
                        ->method( 'setMessages' )
                        ->with( $expect, $this->anything(), $this->anything() );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->returnValue( true ) );
 
                $handler = new KafkaHandler( $produce, $options );
                $handler->handle( [
@@ -89,6 +92,9 @@ class KafkaHandlerTest extends MediaWikiTestCase {
                $produce->expects( $this->any() )
                        ->method( 'getAvailablePartitions' )
                        ->will( $this->throwException( new \Kafka\Exception ) );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->returnValue( true ) );
 
                if ( $expectException ) {
                        $this->setExpectedException( 'Kafka\Exception' );
@@ -147,6 +153,9 @@ class KafkaHandlerTest extends MediaWikiTestCase {
                        ->will( $this->returnValue( [ 'A' ] ) );
                $mockMethod = $produce->expects( $this->exactly( 2 ) )
                        ->method( 'setMessages' );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->returnValue( true ) );
                // evil hax
                \TestingAccessWrapper::newFromObject( $mockMethod )->matcher->parametersMatcher =
                        new \PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters( [
@@ -181,6 +190,9 @@ class KafkaHandlerTest extends MediaWikiTestCase {
                $produce->expects( $this->once() )
                        ->method( 'setMessages' )
                        ->with( $this->anything(), $this->anything(), [ 'words', 'lines' ] );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->returnValue( true ) );
 
                $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' );
                $formatter->expects( $this->any() )