3555a232a85be11f5feb3d1bb73fa771b3e7b4d4
[lhc/web/wiklou.git] / includes / libs / eventrelayer / EventRelayerKafka.php
1 <?php
2 use Kafka\Produce;
3
4 /**
5 * Event relayer for Apache Kafka.
6 * Configuring for WANCache:
7 * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
8 */
9 class EventRelayerKafka extends EventRelayer {
10
11 /**
12 * Configuration.
13 *
14 * @var Config
15 */
16 protected $config;
17
18 /**
19 * Kafka producer.
20 *
21 * @var Produce
22 */
23 protected $producer;
24
25 /**
26 * Create Kafka producer.
27 *
28 * @param Config $config
29 */
30 public function __construct( array $params ) {
31 $this->config = new HashConfig( $params );
32 if ( !$this->config->has( 'KafkaEventHost' ) ) {
33 throw new InvalidArgumentException( "KafkaEventHost must be configured" );
34 }
35 }
36
37 /**
38 * Get the producer object from kafka-php.
39 * @return Produce
40 */
41 protected function getKafkaProducer() {
42 if ( !$this->producer ) {
43 $this->producer = Produce::getInstance( null, null, $this->config->get( 'KafkaEventHost' ) );
44 }
45 return $this->producer;
46 }
47
48 /**
49 * (non-PHPdoc)
50 *
51 * @see EventRelayer::doNotify()
52 *
53 */
54 protected function doNotify( $channel, array $events ) {
55 $jsonEvents = array_map( 'json_encode', $events );
56 try {
57 $producer = $this->getKafkaProducer();
58 $producer->setMessages( $channel, 0, $jsonEvents );
59 $producer->send();
60 } catch ( \Kafka\Exception $e ) {
61 $this->logger->warning( "Sending events failed: $e" );
62 return false;
63 }
64 return true;
65 }
66 }