Merge "Added a separate error message for mkdir failures"
[lhc/web/wiklou.git] / includes / libs / eventrelayer / EventRelayerKafka.php
1 <?php
2 use Kafka\Produce;
3
4 /**
5 * Event relayer for Apache Kafka.
6 * Configuring for WANCache:
7 * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
8 */
9 class EventRelayerKafka extends EventRelayer {
10 /**
11 * Configuration.
12 *
13 * @var Config
14 */
15 protected $config;
16
17 /**
18 * Kafka producer.
19 *
20 * @var Produce
21 */
22 protected $producer;
23
24 /**
25 * Create Kafka producer.
26 *
27 * @param array $params
28 */
29 public function __construct( array $params ) {
30 parent::__construct( $params );
31
32 $this->config = new HashConfig( $params );
33 if ( !$this->config->has( 'KafkaEventHost' ) ) {
34 throw new InvalidArgumentException( "KafkaEventHost must be configured" );
35 }
36 }
37
38 /**
39 * Get the producer object from kafka-php.
40 * @return Produce
41 */
42 protected function getKafkaProducer() {
43 if ( !$this->producer ) {
44 $this->producer = Produce::getInstance(
45 null, null, $this->config->get( 'KafkaEventHost' ) );
46 }
47 return $this->producer;
48 }
49
50 protected function doNotify( $channel, array $events ) {
51 $jsonEvents = array_map( 'json_encode', $events );
52 try {
53 $producer = $this->getKafkaProducer();
54 $producer->setMessages( $channel, 0, $jsonEvents );
55 $producer->send();
56 } catch ( \Kafka\Exception $e ) {
57 $this->logger->warning( "Sending events failed: $e" );
58 return false;
59 }
60 return true;
61 }
62 }