* @param int $level The minimum logging level at which this handler will be triggered
* @param bool $bubble Whether the messages that are handled can bubble up the stack or not
*/
- public function __construct( Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true ) {
+ public function __construct(
+ Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
+ ) {
parent::__construct( $level, $bubble );
$this->produce = $produce;
$this->options = array_merge( self::$defaultOptions, $options );
* @param bool $bubble Whether the messages that are handled can bubble the stack or not
* @return KafkaHandler
*/
- public static function factory( $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true ) {
+ public static function factory(
+ $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true
+ ) {
$metadata = new MetaDataFromKafka( $kafkaServers );
$produce = new Produce( $metadata );
+
+ if ( isset( $options['sendTimeout'] ) ) {
+ $timeOut = $options['sendTimeout'];
+ $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
+ $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
+ intval( $timeOut * 1000000 )
+ );
+ }
+ if ( isset( $options['recvTimeout'] ) ) {
+ $timeOut = $options['recvTimeout'];
+ $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
+ $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
+ intval( $timeOut * 1000000 )
+ );
+ }
if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
$options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
}
+
return new self( $produce, $options, $level, $bubble );
}
}
}
if ( $messages ) {
- $this->addMessages($channel, $messages);
+ $this->addMessages( $channel, $messages );
}
}