Merge "Add debug logging for the case that the API goes read only"
[lhc/web/wiklou.git] / includes / debug / logger / monolog / KafkaHandler.php
index 59d7764..2465918 100644 (file)
@@ -76,7 +76,9 @@ class KafkaHandler extends AbstractProcessingHandler {
         * @param int $level The minimum logging level at which this handler will be triggered
         * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
         */
-       public function __construct( Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true ) {
+       public function __construct(
+               Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
+       ) {
                parent::__construct( $level, $bubble );
                $this->produce = $produce;
                $this->options = array_merge( self::$defaultOptions, $options );
@@ -92,12 +94,30 @@ class KafkaHandler extends AbstractProcessingHandler {
         * @param bool $bubble Whether the messages that are handled can bubble the stack or not
         * @return KafkaHandler
         */
-       public static function factory( $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true ) {
+       public static function factory(
+               $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true
+       ) {
                $metadata = new MetaDataFromKafka( $kafkaServers );
                $produce = new Produce( $metadata );
+
+               if ( isset( $options['sendTimeout'] ) ) {
+                       $timeOut = $options['sendTimeout'];
+                       $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
+                       $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
+                               intval( $timeOut * 1000000 )
+                       );
+               }
+               if ( isset( $options['recvTimeout'] ) ) {
+                       $timeOut = $options['recvTimeout'];
+                       $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
+                       $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
+                               intval( $timeOut * 1000000 )
+                       );
+               }
                if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
                        $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
                }
+
                return new self( $produce, $options, $level, $bubble );
        }
 
@@ -133,7 +153,7 @@ class KafkaHandler extends AbstractProcessingHandler {
                                }
                        }
                        if ( $messages ) {
-                               $this->addMessages($channel, $messages);
+                               $this->addMessages( $channel, $messages );
                        }
                }