6 * Event relayer for Apache Kafka.
7 * Configuring for WANCache:
8 * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
10 class EventRelayerKafka
extends EventRelayer
{
26 * Create Kafka producer.
28 * @param array $params
30 public function __construct( array $params ) {
31 parent
::__construct( $params );
33 $this->config
= new HashConfig( $params );
34 if ( !$this->config
->has( 'KafkaEventHost' ) ) {
35 throw new InvalidArgumentException( "KafkaEventHost must be configured" );
40 * Get the producer object from kafka-php.
43 protected function getKafkaProducer() {
44 if ( !$this->producer
) {
45 $this->producer
= Produce
::getInstance(
46 null, null, $this->config
->get( 'KafkaEventHost' ) );
48 return $this->producer
;
51 protected function doNotify( $channel, array $events ) {
52 $jsonEvents = array_map( 'json_encode', $events );
54 $producer = $this->getKafkaProducer();
55 $producer->setMessages( $channel, 0, $jsonEvents );
57 } catch ( \Kafka\Exception
$e ) {
58 $this->logger
->warning( "Sending events failed: $e" );