Merge "Removed deprecated $wgDeferredUpdateList"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Tue, 22 Sep 2015 00:50:22 +0000 (00:50 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Tue, 22 Sep 2015 00:50:22 +0000 (00:50 +0000)
autoload.php
composer.json
includes/debug/logger/MonologSpi.php
includes/debug/logger/monolog/AvroFormatter.php [new file with mode: 0644]
includes/debug/logger/monolog/BufferHandler.php [new file with mode: 0644]
includes/debug/logger/monolog/KafkaHandler.php [new file with mode: 0644]
includes/utils/AvroValidator.php [new file with mode: 0644]
tests/phpunit/includes/ConsecutiveParametersMatcher.php [new file with mode: 0644]
tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php [new file with mode: 0644]
tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php [new file with mode: 0644]
tests/phpunit/includes/utils/AvroValidatorTest.php [new file with mode: 0644]

index 5adfbe5..03728e9 100644 (file)
@@ -149,6 +149,7 @@ $wgAutoloadLocalClasses = array(
        'AutoLoader' => __DIR__ . '/includes/AutoLoader.php',
        'AutoloadGenerator' => __DIR__ . '/includes/utils/AutoloadGenerator.php',
        'Autopromote' => __DIR__ . '/includes/Autopromote.php',
+       'AvroValidator' => __DIR__ . '/includes/utils/AvroValidator.php',
        'BacklinkCache' => __DIR__ . '/includes/cache/BacklinkCache.php',
        'BacklinkJobUtils' => __DIR__ . '/includes/jobqueue/utils/BacklinkJobUtils.php',
        'BackupDumper' => __DIR__ . '/maintenance/backup.inc',
@@ -753,6 +754,9 @@ $wgAutoloadLocalClasses = array(
        'MediaWiki\\Logger\\LegacySpi' => __DIR__ . '/includes/debug/logger/LegacySpi.php',
        'MediaWiki\\Logger\\LoggerFactory' => __DIR__ . '/includes/debug/logger/LoggerFactory.php',
        'MediaWiki\\Logger\\MonologSpi' => __DIR__ . '/includes/debug/logger/MonologSpi.php',
+       'MediaWiki\\Logger\\Monolog\\AvroFormatter' => __DIR__ . '/includes/debug/logger/monolog/AvroFormatter.php',
+       'MediaWiki\\Logger\\Monolog\\BufferHandler' => __DIR__ . '/includes/debug/logger/monolog/BufferHandler.php',
+       'MediaWiki\\Logger\\Monolog\\KafkaHandler' => __DIR__ . '/includes/debug/logger/monolog/KafkaHandler.php',
        'MediaWiki\\Logger\\Monolog\\LegacyFormatter' => __DIR__ . '/includes/debug/logger/monolog/LegacyFormatter.php',
        'MediaWiki\\Logger\\Monolog\\LegacyHandler' => __DIR__ . '/includes/debug/logger/monolog/LegacyHandler.php',
        'MediaWiki\\Logger\\Monolog\\LineFormatter' => __DIR__ . '/includes/debug/logger/monolog/LineFormatter.php',
index a1987cc..e698862 100644 (file)
                "ext-wikidiff2": "Diff accelerator",
                "ext-apc": "Local data and opcode cache",
                "monolog/monolog": "Flexible debug logging system",
+               "nmred/kafka-php": "Send debug log events to kafka",
                "pear/mail": "Mail sending support",
                "pear/mail_mime": "Mail sending support",
-               "pear/mail_mime-decode": "Mail sending support"
+               "pear/mail_mime-decode": "Mail sending support",
+               "wikimedia/avro": "Binary serialization format used with kafka"
        },
        "autoload": {
                "psr-0": {
index 7b54861..274e18e 100644 (file)
@@ -20,6 +20,7 @@
 
 namespace MediaWiki\Logger;
 
+use MediaWiki\Logger\Monolog\BufferHandler;
 use Monolog\Logger;
 use ObjectFactory;
 
@@ -84,6 +85,7 @@ use ObjectFactory;
  *                   'logstash'
  *               ),
  *               'formatter' => 'logstash',
+ *               'buffer' => true,
  *           ),
  *           'udp2log' => array(
  *               'class' => '\\MediaWiki\\Logger\\Monolog\\LegacyHandler',
@@ -247,6 +249,9 @@ class MonologSpi implements Spi {
                                        $this->getFormatter( $spec['formatter'] )
                                );
                        }
+                       if ( isset( $spec['buffer'] ) && $spec['buffer'] ) {
+                               $handler = new BufferHandler( $handler );
+                       }
                        $this->singletons['handlers'][$name] = $handler;
                }
                return $this->singletons['handlers'][$name];
diff --git a/includes/debug/logger/monolog/AvroFormatter.php b/includes/debug/logger/monolog/AvroFormatter.php
new file mode 100644 (file)
index 0000000..b6adab4
--- /dev/null
@@ -0,0 +1,139 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use AvroIODatumWriter;
+use AvroIOBinaryEncoder;
+use AvroIOTypeException;
+use AvroNamedSchemata;
+use AvroSchema;
+use AvroStringIO;
+use AvroValidator;
+use Monolog\Formatter\FormatterInterface;
+
+/**
+ * Log message formatter that uses the apache Avro format.
+ *
+ * @since 1.26
+ * @author Erik Bernhardson <ebernhardson@wikimedia.org>
+ * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
+ */
+class AvroFormatter implements FormatterInterface {
+       /**
+        * @var array Map from schema name to schema definition
+        */
+       protected $schemas;
+
+       /**
+        * @var AvroStringIO
+        */
+       protected $io;
+
+       /**
+        * @var AvroIOBinaryEncoder
+        */
+       protected $encoder;
+
+       /**
+        * @var AvroIODatumWriter
+        */
+       protected $writer;
+
+       /**
+        * @var array $schemas Map from Monolog channel to Avro schema.
+        *  Each schema can be either the JSON string or decoded into PHP
+        *  arrays.
+        */
+       public function __construct( array $schemas ) {
+               $this->schemas = $schemas;
+               $this->io = new AvroStringIO( '' );
+               $this->encoder = new AvroIOBinaryEncoder( $this->io );
+               $this->writer = new AvroIODatumWriter();
+       }
+
+       /**
+        * Formats the record context into a binary string per the
+        * schema configured for the records channel.
+        *
+        * @param array $record
+        * @return string|null The serialized record, or null if
+        *  the record is not valid for the selected schema.
+        */
+       public function format( array $record ) {
+               $this->io->truncate();
+               $schema = $this->getSchema( $record['channel'] );
+               if ( $schema === null ) {
+                       trigger_error( "The schema for channel '{$record['channel']}' is not available" );
+                       return null;
+               }
+               try {
+                       $this->writer->write_data( $schema, $record['context'], $this->encoder );
+               } catch ( AvroIOTypeException $e ) {
+                       $errors = AvroValidator::getErrors( $schema, $record['context'] );
+                       $json = json_encode( $errors );
+                       trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" );
+                       return null;
+               }
+               return $this->io->string();
+       }
+
+       /**
+        * Format a set of records into a list of binary strings
+        * conforming to the configured schema.
+        *
+        * @param array $records
+        * @return string[]
+        */
+       public function formatBatch( array $records ) {
+               $result = array();
+               foreach ( $records as $record ) {
+                       $message = $this->format( $record );
+                       if ( $message !== null ) {
+                               $result[] = $message;
+                       }
+               }
+               return $result;
+       }
+
+       /**
+        * Get the writer for the named channel
+        *
+        * @var string $channel Name of the schema to fetch
+        * @return AvroSchema|null
+        */
+       protected function getSchema( $channel ) {
+               if ( !isset( $this->schemas[$channel] ) ) {
+                       return null;
+               }
+               if ( !$this->schemas[$channel] instanceof AvroSchema ) {
+                       if ( is_string( $this->schemas[$channel] ) ) {
+                               $this->schemas[$channel] = AvroSchema::parse( $this->schemas[$channel] );
+                       } else {
+                               $this->schemas[$channel] = AvroSchema::real_parse(
+                                       $this->schemas[$channel],
+                                       null,
+                                       new AvroNamedSchemata()
+                               );
+                       }
+               }
+               return $this->schemas[$channel];
+       }
+}
diff --git a/includes/debug/logger/monolog/BufferHandler.php b/includes/debug/logger/monolog/BufferHandler.php
new file mode 100644 (file)
index 0000000..3ebd0b1
--- /dev/null
@@ -0,0 +1,47 @@
+<?php
+/**
+ * Helper class for the index.php entry point.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use DeferredUpdates;
+use Monolog\Handler\BufferHandler as BaseBufferHandler;
+
+/**
+ * Updates the Monolog BufferHandler to use DeferredUpdates rather
+ * than register_shutdown_function. On supported platforms this will
+ * use register_postsend_function or fastcgi_finish_request() to delay
+ * until after the request has shutdown and we are no longer delaying
+ * the web request.
+ */
+class BufferHandler extends BaseBufferHandler {
+       /**
+        * {@inheritDoc}
+        */
+       public function handle( array $record ) {
+               if (!$this->initialized) {
+                       DeferredUpdates::addCallableUpdate( array( $this, 'close' ) );
+                       $this->initialized = true;
+               }
+               return parent::handle( $record );
+       }
+}
+
diff --git a/includes/debug/logger/monolog/KafkaHandler.php b/includes/debug/logger/monolog/KafkaHandler.php
new file mode 100644 (file)
index 0000000..59d7764
--- /dev/null
@@ -0,0 +1,224 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use Kafka\MetaDataFromKafka;
+use Kafka\Produce;
+use MediaWiki\Logger\LoggerFactory;
+use Monolog\Handler\AbstractProcessingHandler;
+use Monolog\Logger;
+use Psr\Log\LoggerInterface;
+
+/**
+ * Log handler sends log events to a kafka server.
+ *
+ * Constructor options array arguments:
+ * * alias: map from monolog channel to kafka topic name. When no
+ *       alias exists the topic "monolog_$channel" will be used.
+ * * swallowExceptions: Swallow exceptions that occur while talking to
+ *    kafka. Defaults to false.
+ * * logExceptions: Log exceptions talking to kafka here. Either null,
+ *    the name of a channel to log to, or an object implementing
+ *    FormatterInterface. Defaults to null.
+ *
+ * Requires the nmred/kafka-php library, version >= 1.3.0
+ *
+ * @since 1.26
+ * @author Erik Bernhardson <ebernhardson@wikimedia.org>
+ * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
+ */
+class KafkaHandler extends AbstractProcessingHandler {
+       /**
+        * @var Produce Sends requests to kafka
+        */
+       protected $produce;
+
+       /**
+        * @var array Optional handler configuration
+        */
+       protected $options;
+
+       /**
+        * @var array Map from topic name to partition this request produces to
+        */
+       protected $partitions = array();
+
+       /**
+        * @var array defaults for constructor options
+        */
+       private static $defaultOptions = array(
+               'alias' => array(), // map from monolog channel to kafka topic
+               'swallowExceptions' => false, // swallow exceptions sending records
+               'logExceptions' => null, // A PSR3 logger to inform about errors
+       );
+
+       /**
+        * @param Produce $produce Kafka instance to produce through
+        * @param array $options optional handler configuration
+        * @param int $level The minimum logging level at which this handler will be triggered
+        * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
+        */
+       public function __construct( Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true ) {
+               parent::__construct( $level, $bubble );
+               $this->produce = $produce;
+               $this->options = array_merge( self::$defaultOptions, $options );
+       }
+
+       /**
+        * Constructs the necessary support objects and returns a KafkaHandler
+        * instance.
+        *
+        * @param string[] $kafkaServers
+        * @param array $options
+        * @param int $level The minimum logging level at which this handle will be triggered
+        * @param bool $bubble Whether the messages that are handled can bubble the stack or not
+        * @return KafkaHandler
+        */
+       public static function factory( $kafkaServers, array $options = array(), $level = Logger::DEBUG, $bubble = true ) {
+               $metadata = new MetaDataFromKafka( $kafkaServers );
+               $produce = new Produce( $metadata );
+               if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
+                       $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
+               }
+               return new self( $produce, $options, $level, $bubble );
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       protected function write( array $record ) {
+               if ( $record['formatted'] !== null ) {
+                       $this->addMessages( $record['channel'], array( $record['formatted'] ) );
+                       $this->send();
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public function handleBatch( array $batch ) {
+               $channels = array();
+               foreach ( $batch as $record ) {
+                       if ( $record['level'] < $this->level ) {
+                               continue;
+                       }
+                       $channels[$record['channel']][] = $this->processRecord( $record );
+               }
+
+               $formatter = $this->getFormatter();
+               foreach ( $channels as $channel => $records ) {
+                       $messages = array();
+                       foreach ( $records as $idx => $record ) {
+                               $message = $formatter->format( $record );
+                               if ( $message !== null ) {
+                                       $messages[] = $message;
+                               }
+                       }
+                       if ( $messages ) {
+                               $this->addMessages($channel, $messages);
+                       }
+               }
+
+               $this->send();
+       }
+
+       /**
+        * Send any records in the kafka client internal queue.
+        */
+       protected function send() {
+               try {
+                       $this->produce->send();
+               } catch ( \Kafka\Exception $e ) {
+                       $ignore = $this->warning(
+                               'Error sending records to kafka: {exception}',
+                               array( 'exception' => $e ) );
+                       if ( !$ignore ) {
+                               throw $e;
+                       }
+               }
+       }
+
+       /**
+        * @param string $topic Name of topic to get partition for
+        * @return int|null The random partition to produce to for this request,
+        *  or null if a partition could not be determined.
+        */
+       protected function getRandomPartition( $topic ) {
+               if ( !array_key_exists( $topic, $this->partitions ) ) {
+                       try {
+                               $partitions = $this->produce->getAvailablePartitions( $topic );
+                       } catch ( \Kafka\Exception $e ) {
+                               $ignore = $this->warning(
+                                       'Error getting metadata for kafka topic {topic}: {exception}',
+                                       array( 'topic' => $topic, 'exception' => $e ) );
+                               if ( $ignore ) {
+                                       return null;
+                               }
+                               throw $e;
+                       }
+                       if ( $partitions ) {
+                               $key = array_rand( $partitions );
+                               $this->partitions[$topic] = $partitions[$key];
+                       } else {
+                               $details = $this->produce->getClient()->getTopicDetail( $topic );
+                               $ignore = $this->warning(
+                                       'No partitions available for kafka topic {topic}',
+                                       array( 'topic' => $topic, 'kafka' => $details )
+                               );
+                               if ( !$ignore ) {
+                                       throw new \RuntimeException( "No partitions available for kafka topic $topic" );
+                               }
+                               $this->partitions[$topic] = null;
+                       }
+               }
+               return $this->partitions[$topic];
+       }
+
+       /**
+        * Adds records for a channel to the Kafka client internal queue.
+        *
+        * @param string $channel Name of Monolog channel records belong to
+        * @param array $records List of records to append
+        */
+       protected function addMessages( $channel, array $records ) {
+               if ( isset( $this->options['alias'][$channel] ) ) {
+                       $topic = $this->options['alias'][$channel];
+               } else {
+                       $topic = "monolog_$channel";
+               }
+               $partition = $this->getRandomPartition( $topic );
+               if ( $partition !== null ) {
+                       $this->produce->setMessages( $topic, $partition, $records );
+               }
+       }
+
+       /**
+        * @param string $message PSR3 compatible message string
+        * @param array $context PSR3 compatible log context
+        * @return bool true if caller should ignore warning
+        */
+       protected function warning( $message, array $context = array() ) {
+               if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
+                       $this->options['logExceptions']->warning( $message, $context );
+               }
+               return $this->options['swallowExceptions'];
+       }
+}
diff --git a/includes/utils/AvroValidator.php b/includes/utils/AvroValidator.php
new file mode 100644 (file)
index 0000000..4f8e0b1
--- /dev/null
@@ -0,0 +1,184 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+/**
+ * Generate error strings for data that doesn't match the specified
+ * Avro schema. This is very similar to AvroSchema::is_valid_datum(),
+ * but returns error messages instead of a boolean.
+ *
+ * @since 1.26
+ * @author Erik Bernhardson <ebernhardson@wikimedia.org>
+ * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
+ */
+class AvroValidator {
+       /**
+        * @param AvroSchema $schema The rules to conform to.
+        * @param mixed $datum The value to validate against $schema.
+        * @return string|string[] An error or list of errors in the
+        *  provided $datum. When no errors exist the empty array is
+        *  returned.
+        */
+       public static function getErrors( AvroSchema $schema, $datum ) {
+               switch ( $schema->type) {
+               case AvroSchema::NULL_TYPE:
+                       if ( !is_null($datum) ) {
+                               return self::wrongType( 'null', $datum );
+                       }
+                       return array();
+               case AvroSchema::BOOLEAN_TYPE:
+                       if ( !is_bool($datum) ) {
+                               return self::wrongType( 'boolean', $datum );
+                       }
+                       return array();
+               case AvroSchema::STRING_TYPE:
+               case AvroSchema::BYTES_TYPE:
+                       if ( !is_string($datum) ) {
+                               return self::wrongType( 'string', $datum );
+                       }
+                       return array();
+               case AvroSchema::INT_TYPE:
+                       if ( !is_int($datum) ) {
+                               return self::wrongType( 'integer', $datum );
+                       }
+                       if ( AvroSchema::INT_MIN_VALUE > $datum
+                               || $datum > AvroSchema::INT_MAX_VALUE
+                       ) {
+                               return self::outOfRange(
+                                       AvroSchema::INT_MIN_VALUE,
+                                       AvroSchema::INT_MAX_VALUE,
+                                       $datum
+                               );
+                       }
+                       return array();
+               case AvroSchema::LONG_TYPE:
+                       if ( !is_int($datum) ) {
+                               return self::wrongType( 'integer', $datum );
+                       }
+                       if ( AvroSchema::LONG_MIN_VALUE > $datum
+                               || $datum > AvroSchema::LONG_MAX_VALUE
+                       ) {
+                               return self::outOfRange(
+                                       AvroSchema::LONG_MIN_VALUE,
+                                       AvroSchema::LONG_MAX_VALUE,
+                                       $datum
+                               );
+                       }
+                       return array();
+               case AvroSchema::FLOAT_TYPE:
+               case AvroSchema::DOUBLE_TYPE:
+                       if ( !is_float($datum) && !is_int($datum) ) {
+                               return self::wrongType( 'float or integer', $datum );
+                       }
+                       return array();
+               case AvroSchema::ARRAY_SCHEMA:
+                       if (!is_array($datum)) {
+                               return self::wrongType( 'array', $datum );
+                       }
+                       $errors = array();
+                       foreach ($datum as $d) {
+                               $result = $this->validate( $schema->items(), $d );
+                               if ( $result ) {
+                                       $errors[] = $result;
+                               }
+                       }
+                       if ( $errors ) {
+                               return $errors;
+                       }
+                       return array();
+               case AvroSchema::MAP_SCHEMA:
+                       if (!is_array($datum)) {
+                               return self::wrongType( 'array', $datum );
+                       }
+                       $errors = array();
+                       foreach ($datum as $k => $v) {
+                       if ( !is_string($k) ) {
+                                       $errors[] = self::wrongType( 'string key', $k );
+                               }
+                               $result = self::getErrors( $schema->values(), $v );
+                               if ( $result ) {
+                                       $errors[$k] = $result;
+                               }
+                       }
+                       return $errors;
+               case AvroSchema::UNION_SCHEMA:
+                       $errors = array();
+                       foreach ($schema->schemas() as $schema) {
+                               $result = self::getErrors( $schema, $datum );
+                               if ( !$result ) {
+                                       return array();
+                               }
+                               $errors[] = $result;
+                       }
+                       if ( $errors ) {
+                               return array( "Expected any one of these to be true", $errors );
+                       }
+                       return "No schemas provided to union";
+               case AvroSchema::ENUM_SCHEMA:
+                       if ( !in_array( $datum, $schema->symbols() ) ) {
+                               $symbols = implode( ', ', $schema->symbols );
+                               return "Expected one of $symbols but recieved $datum";
+                       }
+                       return array();
+               case AvroSchema::FIXED_SCHEMA:
+                       if ( !is_string( $datum ) ) {
+                               return self::wrongType( 'string', $datum );
+                       }
+                       $len = strlen( $datum );
+                       if ( $len !== $schema->size() ) {
+                               return "Expected string of length {$schema->size()}, "
+                                       . "but recieved one of length $len";
+                       }
+                       return array();
+               case AvroSchema::RECORD_SCHEMA:
+               case AvroSchema::ERROR_SCHEMA:
+               case AvroSchema::REQUEST_SCHEMA:
+                       if ( !is_array( $datum ) ) {
+                               return self::wrongType( 'array', $datum );
+                       }
+                       $errors = array();
+                       foreach ( $schema->fields() as $field ) {
+                               $name = $field->name();
+                               if ( !array_key_exists( $name, $datum ) ) {
+                                       $errors[$name] = 'Missing expected field';
+                                       continue;
+                               }
+                               $result = self::getErrors( $field->type(), $datum[$name] );
+                               if ( $result ) {
+                                       $errors[$name] = $result;
+                               }
+                       }
+                       return $errors;
+               default:
+                       return "Unknown avro schema type: {$schema->type}";
+               }
+       }
+
+       public static function typeOf( $datum ) {
+               return is_object( $datum ) ? get_class( $datum ) : gettype( $datum );
+       }
+
+       public static function wrongType( $expected, $datum ) {
+               return "Expected $expected, but recieved " . self::typeOf( $datum );
+       }
+
+       public static function outOfRange( $min, $max, $datum ) {
+               return "Expected value between $min and $max, but recieved $datum";
+       }
+}
diff --git a/tests/phpunit/includes/ConsecutiveParametersMatcher.php b/tests/phpunit/includes/ConsecutiveParametersMatcher.php
new file mode 100644 (file)
index 0000000..adf74bb
--- /dev/null
@@ -0,0 +1,123 @@
+<?php
+/*
+ * This file is part of the PHPUnit_MockObject package.
+ *
+ * (c) Sebastian Bergmann <sebastian@phpunit.de>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+/**
+ * Invocation matcher which looks for sets of specific parameters in the invocations.
+ *
+ * Checks the parameters of the incoming invocations, the parameter list is
+ * checked against the defined constraints in $parameters. If the constraint
+ * is met it will return true in matches().
+ *
+ * It takes a list of match groups and and increases a call index after each invocation.
+ * So the first invocation uses the first group of constraints, the second the next and so on.
+ */
+class PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters extends PHPUnit_Framework_MockObject_Matcher_StatelessInvocation
+{
+    /**
+     * @var array
+     */
+    private $_parameterGroups = array();
+
+    /**
+     * @var array
+     */
+    private $_invocations = array();
+
+    /**
+     * @param array $parameterGroups
+     */
+    public function __construct(array $parameterGroups)
+    {
+        foreach ($parameterGroups as $index => $parameters) {
+            foreach ($parameters as $parameter) {
+                if (!($parameter instanceof \PHPUnit_Framework_Constraint)) {
+                    $parameter = new \PHPUnit_Framework_Constraint_IsEqual($parameter);
+                }
+                $this->_parameterGroups[$index][] = $parameter;
+            }
+        }
+    }
+
+    /**
+     * @return string
+     */
+    public function toString()
+    {
+        $text = 'with consecutive parameters';
+
+        return $text;
+    }
+
+    /**
+     * @param  PHPUnit_Framework_MockObject_Invocation $invocation
+     * @return bool
+     */
+    public function matches(PHPUnit_Framework_MockObject_Invocation $invocation)
+    {
+        $this->_invocations[] = $invocation;
+        $callIndex            = count($this->_invocations) - 1;
+        $this->verifyInvocation($invocation, $callIndex);
+
+        return false;
+    }
+
+    public function verify()
+    {
+        foreach ($this->_invocations as $callIndex => $invocation) {
+            $this->verifyInvocation($invocation, $callIndex);
+        }
+    }
+
+    /**
+     * Verify a single invocation
+     *
+     * @param  PHPUnit_Framework_MockObject_Invocation      $invocation
+     * @param  int                                          $callIndex
+     * @throws PHPUnit_Framework_ExpectationFailedException
+     */
+    private function verifyInvocation(PHPUnit_Framework_MockObject_Invocation $invocation, $callIndex)
+    {
+
+        if (isset($this->_parameterGroups[$callIndex])) {
+            $parameters = $this->_parameterGroups[$callIndex];
+        } else {
+          // no parameter assertion for this call index
+            return;
+        }
+
+        if ($invocation === null) {
+            throw new PHPUnit_Framework_ExpectationFailedException(
+                'Mocked method does not exist.'
+            );
+        }
+
+        if (count($invocation->parameters) < count($parameters)) {
+            throw new PHPUnit_Framework_ExpectationFailedException(
+                sprintf(
+                    'Parameter count for invocation %s is too low.',
+                    $invocation->toString()
+                )
+            );
+        }
+
+        foreach ($parameters as $i => $parameter) {
+            $parameter->evaluate(
+                $invocation->parameters[$i],
+                sprintf(
+                    'Parameter %s for invocation #%d %s does not match expected ' .
+                    'value.',
+                    $i,
+                    $callIndex,
+                    $invocation->toString()
+                )
+            );
+        }
+    }
+}
diff --git a/tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php b/tests/phpunit/includes/debug/logger/monolog/AvroFormatterTest.php
new file mode 100644 (file)
index 0000000..4c6d25e
--- /dev/null
@@ -0,0 +1,64 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use MediaWikiTestCase;
+use PHPUnit_Framework_Error_Notice;
+
+class AvroFormatterTest extends MediaWikiTestCase {
+
+       public function setUo() {
+               if ( !class_exists( 'AvroStringIO' ) ) {
+                       $this->markTestSkipped( 'Avro is required for the AvroFormatterTest' );
+               }
+               parent::setUp();
+       }
+
+       public function testSchemaNotAvailable() {
+               $formatter = new AvroFormatter( array() );      
+               $this->setExpectedException( 'PHPUnit_Framework_Error_Notice', "The schema for channel 'marty' is not available" );
+               $formatter->format( array( 'channel' => 'marty' ) );
+       }
+
+       public function testSchemaNotAvailableReturnValue() {
+               $formatter = new AvroFormatter( array() );      
+               $noticeEnabled = PHPUnit_Framework_Error_Notice::$enabled;
+               // disable conversion of notices 
+               PHPUnit_Framework_Error_Notice::$enabled = false;
+               // have to keep the user notice from being output
+               wfSuppressWarnings();
+               $res = $formatter->format( array( 'channel' => 'marty' ) );
+               wfRestoreWarnings();
+               PHPUnit_Framework_Error_Notice::$enabled = $noticeEnabled;
+               $this->assertNull( $res );
+       }
+
+       public function testDoesSomethingWhenSchemaAvailable() {
+               $formatter = new AvroFormatter( array( 'string' => array( 'type' => 'string' ) ) );
+               $res = $formatter->format( array(
+                       'channel' => 'string',
+                       'context' => 'better to be',
+               ) );
+               $this->assertNotNull( $res );
+               // basically just tell us if avro changes its string encoding
+               $this->assertEquals( base64_decode( 'GGJldHRlciB0byBiZQ==' ), $res );
+       }
+}
diff --git a/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php b/tests/phpunit/includes/debug/logger/monolog/KafkaHandlerTest.php
new file mode 100644 (file)
index 0000000..272f6e4
--- /dev/null
@@ -0,0 +1,204 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+namespace MediaWiki\Logger\Monolog;
+
+use MediaWikiTestCase;
+use Monolog\Logger;
+
+// not available in the version of phpunit mw uses, so copied into repo
+require_once __DIR__ . '/../../../ConsecutiveParametersMatcher.php';
+
+class KafkaHandlerTest extends MediaWikiTestCase {
+
+       public function setUo() {
+               if ( !class_exists( 'Monolog\Handler\AbstractProcessingHandler' ) ) {
+                       $this->markTestSkipped( 'Monolog is required for the KafkaHandlerTest' );
+               }
+               parent::setUp();
+       }
+
+       public function topicNamingProvider() {
+               return array(
+                       array( array(), 'monolog_foo' ),
+                       array( array( 'alias' => array( 'foo' => 'bar' ) ), 'bar' )
+               );
+       }
+
+       /**
+        * @dataProvider topicNamingProvider
+        */
+       public function testTopicNaming( $options, $expect ) {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects($this->any())
+                       ->method('getAvailablePartitions')
+                       ->will($this->returnValue( array( 'A' ) ) );
+               $produce->expects($this->once())
+                       ->method( 'setMessages' )
+                       ->with( $expect, $this->anything(), $this->anything() );
+
+               $handler = new KafkaHandler( $produce, $options );
+               $handler->handle( array(
+                       'channel' => 'foo',
+                       'level' => Logger::EMERGENCY,
+                       'extra' => array(),
+               ) );
+       }
+
+       public function swallowsExceptionsWhenRequested() {
+               return array(
+                       // defaults to false
+                       array( array(), true ),
+                       // also try false explicitly
+                       array( array( 'swallowExceptions' => false ), true ),
+                       // turn it on
+                       array( array( 'swallowExceptions' => true ), false ),
+               );
+       }
+
+       /**
+        * @dataProvider swallowsExceptionsWhenRequested
+        */
+       public function testGetAvailablePartitionsException( $options, $expectException ) {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects( $this->any() )
+                       ->method( 'getAvailablePartitions' )
+                       ->will( $this->throwException( new \Kafka\Exception ) );
+
+               if ( $expectException ) {
+                       $this->setExpectedException( 'Kafka\Exception' );
+               }
+
+               $handler = new KafkaHandler( $produce, $options );
+               $handler->handle( array(
+                       'channel' => 'foo',
+                       'level' => Logger::EMERGENCY,
+                       'extra' => array(),
+               ) );
+
+               if ( !$expectException ) {
+                       $this->assertTrue( true, 'no exception was thrown' );
+               }
+       }
+
+       /**
+        * @dataProvider swallowsExceptionsWhenRequested
+        */
+       public function testSendException( $options, $expectException ) {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects( $this->any() )
+                       ->method( 'getAvailablePartitions' )
+                       ->will( $this->returnValue( array( 'A' ) ) );
+               $produce->expects( $this->any() )
+                       ->method( 'send' )
+                       ->will( $this->throwException( new \Kafka\Exception ) );
+
+               if ( $expectException ) {
+                       $this->setExpectedException( 'Kafka\Exception' );
+               }
+
+               $handler = new KafkaHandler( $produce, $options );
+               $handler->handle( array(
+                       'channel' => 'foo',
+                       'level' => Logger::EMERGENCY,
+                       'extra' => array(),
+               ) );
+
+               if ( !$expectException ) {
+                       $this->assertTrue( true, 'no exception was thrown' );
+               }
+       }
+
+       public function testHandlesNullFormatterResult() {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects( $this->any() )
+                       ->method( 'getAvailablePartitions' )
+                       ->will( $this->returnValue( array( 'A' ) ) );
+               $mockMethod = $produce->expects( $this->exactly( 2 ) )
+                       ->method( 'setMessages' );
+               // evil hax
+               \TestingAccessWrapper::newFromObject( $mockMethod )->matcher->parametersMatcher =
+                       new \PHPUnit_Framework_MockObject_Matcher_ConsecutiveParameters( array(
+                               array( $this->anything(), $this->anything(), array( 'words' ) ),
+                               array( $this->anything(), $this->anything(), array( 'lines' ) )
+                       ) );
+
+               $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' );
+               $formatter->expects( $this->any() )
+                       ->method( 'format' )
+                       ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) );
+
+               $handler = new KafkaHandler( $produce, array() );
+               $handler->setFormatter( $formatter );
+               for ( $i = 0; $i < 3; ++$i ) {
+                       $handler->handle( array(
+                               'channel' => 'foo',
+                               'level' => Logger::EMERGENCY,
+                               'extra' => array(),
+                       ) );
+               }
+       }
+
+
+       public function testBatchHandlesNullFormatterResult() {
+               $produce = $this->getMockBuilder( 'Kafka\Produce' )
+                       ->disableOriginalConstructor()
+                       ->getMock();
+               $produce->expects( $this->any() )
+                       ->method( 'getAvailablePartitions' )
+                       ->will( $this->returnValue( array( 'A' ) ) );
+               $produce->expects( $this->once() )
+                       ->method( 'setMessages' )
+                       ->with( $this->anything(), $this->anything(), array( 'words', 'lines' ) );
+
+               $formatter = $this->getMock( 'Monolog\Formatter\FormatterInterface' );
+               $formatter->expects( $this->any() )
+                       ->method( 'format' )
+                       ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) );
+
+               $handler = new KafkaHandler( $produce, array() );
+               $handler->setFormatter( $formatter );
+               $handler->handleBatch( array(
+                       array(
+                               'channel' => 'foo',
+                               'level' => Logger::EMERGENCY,
+                               'extra' => array(),
+                       ),
+                       array(
+                               'channel' => 'foo',
+                               'level' => Logger::EMERGENCY,
+                               'extra' => array(),
+                       ),
+                       array(
+                               'channel' => 'foo',
+                               'level' => Logger::EMERGENCY,
+                               'extra' => array(),
+                       ),
+               ) );
+       }
+}
diff --git a/tests/phpunit/includes/utils/AvroValidatorTest.php b/tests/phpunit/includes/utils/AvroValidatorTest.php
new file mode 100644 (file)
index 0000000..52c242c
--- /dev/null
@@ -0,0 +1,96 @@
+<?php
+/**
+ * Tests for IP validity functions.
+ *
+ * Ported from /t/inc/IP.t by avar.
+ *
+ * @group IP
+ * @todo Test methods in this call should be split into a method and a
+ * dataprovider.
+ */
+
+class AvroValidatorTest extends PHPUnit_Framework_TestCase {
+       public function setUp() {
+               if ( !class_exists( 'AvroSchema' ) ) {
+                       $this->markTestSkipped( 'Avro is required to run the AvroValidatorTest' );
+               }
+               parent::setUp();
+       }
+
+       public function getErrorsProvider() {
+               $stringSchema = AvroSchema::parse( json_encode( array( 'type' => 'string' ) ) );
+               $recordSchema = AvroSchema::parse( json_encode( array(
+                       'type' => 'record',
+                       'name' => 'ut',
+                       'fields' => array(
+                               array( 'name' => 'id', 'type' => 'int', 'required' => true ),
+                       ),
+               ) ) );
+               $enumSchema = AvroSchema::parse( json_encode( array(
+                       'type' => 'record',
+                       'name' => 'ut',
+                       'fields' => array(
+                               array( 'name' => 'count', 'type' => array( 'int', 'null' ) ),
+                       ),
+               ) ) );
+
+               return array(
+                       array(
+                               'No errors with a simple string serialization',
+                               $stringSchema, 'foobar', array(),
+                       ),
+
+                       array(
+                               'Cannot serialize integer into string',
+                               $stringSchema, 5, 'Expected string, but recieved integer',
+                       ),
+
+                       array(
+                               'Cannot serialize array into string',
+                               $stringSchema, array(), 'Expected string, but recieved array',
+                       ),
+
+                       array(
+                               'allows and ignores extra fields',
+                               $recordSchema, array( 'id' => 4, 'foo' => 'bar' ), array(),
+                       ),
+
+                       array(
+                               'detects missing fields',
+                               $recordSchema, array(), array( 'id' => 'Missing expected field' ),
+                       ),
+
+                       array(
+                               'handles first element in enum',
+                               $enumSchema, array( 'count' => 4 ), array(),
+                       ),
+
+                       array(
+                               'handles second element in enum',
+                               $enumSchema, array( 'count' => null ), array(),
+                       ),
+
+                       array(
+                               'rejects element not in union',
+                               $enumSchema, array( 'count' => 'invalid' ), array( 'count' => array(
+                                       'Expected any one of these to be true',
+                                       array(
+                                               'Expected integer, but recieved string',
+                                               'Expected null, but recieved string',
+                                       )
+                               ) )
+                       ),
+               );
+       }
+
+       /**
+        * @dataProvider getErrorsProvider
+        */
+       public function testGetErrors( $message, $schema, $datum, $expected ) {
+               $this->assertEquals(
+                       $expected,
+                       AvroValidator::getErrors( $schema, $datum ),
+                       $message
+               );
+       }
+}