Create Kafka event relayer
authorStanislav Malyshev <smalyshev@gmail.com>
Fri, 15 Apr 2016 21:01:23 +0000 (14:01 -0700)
committerStanislav Malyshev <smalyshev@gmail.com>
Wed, 20 Apr 2016 18:25:58 +0000 (11:25 -0700)
Bug: T125138
Change-Id: I9d7705cb164bc975c3a0ddf4a33ac54fe7de931c

autoload.php
includes/libs/eventrelayer/EventRelayer.php
includes/libs/eventrelayer/EventRelayerKafka.php [new file with mode: 0644]
includes/libs/objectcache/WANObjectCache.php

index 22411cd..e941fc1 100644 (file)
@@ -397,6 +397,7 @@ $wgAutoloadLocalClasses = [
        'ErrorPageError' => __DIR__ . '/includes/exception/ErrorPageError.php',
        'EventRelayer' => __DIR__ . '/includes/libs/eventrelayer/EventRelayer.php',
        'EventRelayerGroup' => __DIR__ . '/includes/EventRelayerGroup.php',
+       'EventRelayerKafka' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerKafka.php',
        'EventRelayerNull' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerNull.php',
        'Exif' => __DIR__ . '/includes/media/Exif.php',
        'ExifBitmapHandler' => __DIR__ . '/includes/media/ExifBitmap.php',
index b61cae7..f28a4c0 100644 (file)
  * @file
  * @author Aaron Schulz
  */
+use Psr\Log\LoggerInterface;
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\NullLogger;
 
 /**
  * Base class for reliable event relays
  */
-abstract class EventRelayer {
+abstract class EventRelayer implements LoggerAwareInterface {
+       /** @var LoggerInterface */
+       protected $logger;
+
        /**
         * @param array $params
         */
        public function __construct( array $params ) {
+               $this->logger = new NullLogger();
        }
 
        /**
@@ -47,6 +54,14 @@ abstract class EventRelayer {
                return $this->doNotify( $channel, $events );
        }
 
+       /**
+        * Set logger instance.
+        * @param LoggerInterface $logger
+        */
+       public function setLogger( LoggerInterface $logger ) {
+               $this->logger = $logger;
+       }
+
        /**
         * @param string $channel
         * @param array $events List of event data maps
diff --git a/includes/libs/eventrelayer/EventRelayerKafka.php b/includes/libs/eventrelayer/EventRelayerKafka.php
new file mode 100644 (file)
index 0000000..3555a23
--- /dev/null
@@ -0,0 +1,66 @@
+<?php
+use Kafka\Produce;
+
+/**
+ * Event relayer for Apache Kafka.
+ * Configuring for WANCache:
+ * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
+ */
+class EventRelayerKafka extends EventRelayer {
+
+       /**
+        * Configuration.
+        *
+        * @var Config
+        */
+       protected $config;
+
+       /**
+        * Kafka producer.
+        *
+        * @var Produce
+        */
+       protected $producer;
+
+       /**
+        * Create Kafka producer.
+        *
+        * @param Config $config
+        */
+       public function __construct( array $params ) {
+               $this->config = new HashConfig( $params );
+               if ( !$this->config->has( 'KafkaEventHost' ) ) {
+                       throw new InvalidArgumentException( "KafkaEventHost must be configured" );
+               }
+       }
+
+       /**
+        * Get the producer object from kafka-php.
+        * @return Produce
+        */
+       protected function getKafkaProducer() {
+               if ( !$this->producer ) {
+                       $this->producer = Produce::getInstance( null, null, $this->config->get( 'KafkaEventHost' ) );
+               }
+               return $this->producer;
+       }
+
+       /**
+        * (non-PHPdoc)
+        *
+        * @see EventRelayer::doNotify()
+        *
+        */
+       protected function doNotify( $channel, array $events ) {
+               $jsonEvents = array_map( 'json_encode', $events );
+               try {
+                       $producer = $this->getKafkaProducer();
+                       $producer->setMessages( $channel, 0, $jsonEvents );
+                       $producer->send();
+               } catch ( \Kafka\Exception $e ) {
+                       $this->logger->warning( "Sending events failed: $e" );
+                       return false;
+               }
+               return true;
+       }
+}
index b212e97..dd2e0d5 100644 (file)
@@ -149,6 +149,10 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
                $this->setLogger( isset( $params['logger'] ) ? $params['logger'] : new NullLogger() );
        }
 
+       /**
+        * Set logger instance.
+        * @param LoggerInterface $logger
+        */
        public function setLogger( LoggerInterface $logger ) {
                $this->logger = $logger;
        }