) {
$metadata = new MetaDataFromKafka( $kafkaServers );
$produce = new Produce( $metadata );
+
+ if ( isset( $options['sendTimeout'] ) ) {
+ $timeOut = $options['sendTimeout'];
+ $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
+ $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
+ intval( $timeOut * 1000000 )
+ );
+ }
+ if ( isset( $options['recvTimeout'] ) ) {
+ $timeOut = $options['recvTimeout'];
+ $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
+ $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
+ intval( $timeOut * 1000000 )
+ );
+ }
if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
$options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
}
+
return new self( $produce, $options, $level, $bubble );
}