KafkaHandler: allow customizing timeouts
authorMax Semenik <maxsem.wiki@gmail.com>
Fri, 29 Jan 2016 00:23:22 +0000 (16:23 -0800)
committerMax Semenik <maxsem.wiki@gmail.com>
Fri, 29 Jan 2016 02:01:13 +0000 (18:01 -0800)
Bug: T125084
Change-Id: I8f01fa61d916aeaa831a84e12b6fae08d04ca046

includes/debug/logger/monolog/KafkaHandler.php

index 4e8e65b..2465918 100644 (file)
@@ -99,9 +99,25 @@ class KafkaHandler extends AbstractProcessingHandler {
        ) {
                $metadata = new MetaDataFromKafka( $kafkaServers );
                $produce = new Produce( $metadata );
+
+               if ( isset( $options['sendTimeout'] ) ) {
+                       $timeOut = $options['sendTimeout'];
+                       $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
+                       $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
+                               intval( $timeOut * 1000000 )
+                       );
+               }
+               if ( isset( $options['recvTimeout'] ) ) {
+                       $timeOut = $options['recvTimeout'];
+                       $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
+                       $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
+                               intval( $timeOut * 1000000 )
+                       );
+               }
                if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
                        $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
                }
+
                return new self( $produce, $options, $level, $bubble );
        }