Merge "maintenance: Script to rename titles for Unicode uppercasing changes"
[lhc/web/wiklou.git] / includes / libs / eventrelayer / EventRelayerKafka.php
1 <?php
2
3 use Kafka\Produce;
4
5 /**
6 * Event relayer for Apache Kafka.
7 * Configuring for WANCache:
8 * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
9 */
10 class EventRelayerKafka extends EventRelayer {
11 /**
12 * Configuration.
13 *
14 * @var Config
15 */
16 protected $config;
17
18 /**
19 * Kafka producer.
20 *
21 * @var Produce
22 */
23 protected $producer;
24
25 /**
26 * Create Kafka producer.
27 *
28 * @param array $params
29 */
30 public function __construct( array $params ) {
31 parent::__construct( $params );
32
33 $this->config = new HashConfig( $params );
34 if ( !$this->config->has( 'KafkaEventHost' ) ) {
35 throw new InvalidArgumentException( "KafkaEventHost must be configured" );
36 }
37 }
38
39 /**
40 * Get the producer object from kafka-php.
41 * @return Produce
42 */
43 protected function getKafkaProducer() {
44 if ( !$this->producer ) {
45 $this->producer = Produce::getInstance(
46 null, null, $this->config->get( 'KafkaEventHost' ) );
47 }
48 return $this->producer;
49 }
50
51 protected function doNotify( $channel, array $events ) {
52 $jsonEvents = array_map( 'json_encode', $events );
53 try {
54 $producer = $this->getKafkaProducer();
55 $producer->setMessages( $channel, 0, $jsonEvents );
56 $producer->send();
57 } catch ( \Kafka\Exception $e ) {
58 $this->logger->warning( "Sending events failed: $e" );
59 return false;
60 }
61 return true;
62 }
63 }