Supports schema revision id in avro binary formatter
authordcausse <dcausse@wikimedia.org>
Fri, 20 Nov 2015 17:09:56 +0000 (18:09 +0100)
committerdcausse <dcausse@wikimedia.org>
Tue, 24 Nov 2015 12:31:45 +0000 (13:31 +0100)
Avro formatter now supports a revision defined in schema configuration:

$wmgMonologAvroSchemas = array(
  'CirrusSearchRequestSet' => array(
    'schema' => file_get_contents( __DIR__ . '/schema.avsc' ),
    'revision' => 11144802,
  ),
);

The formatter still supports old style configuration:
$wmgMonologAvroSchemas = array(
  'CirrusSearchRequestSet' => file_get_contents( __DIR__ . '/schema.avsc' ),
);

Change-Id: Icc0f92be23305e77a69b92fee4d9f9de2edda81e

includes/debug/logger/monolog/AvroFormatter.php

index 019d028..4a39be8 100644 (file)
@@ -37,6 +37,10 @@ use Monolog\Formatter\FormatterInterface;
  * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
  */
 class AvroFormatter implements FormatterInterface {
+       /**
+        * @var Magic byte to encode schema revision id.
+        */
+       const MAGIC = 0x0;
        /**
         * @var array Map from schema name to schema definition
         */
@@ -80,6 +84,7 @@ class AvroFormatter implements FormatterInterface {
        public function format( array $record ) {
                $this->io->truncate();
                $schema = $this->getSchema( $record['channel'] );
+               $revId = $this->getSchemaRevisionId( $record['channel'] );
                if ( $schema === null ) {
                        trigger_error( "The schema for channel '{$record['channel']}' is not available" );
                        return null;
@@ -92,6 +97,10 @@ class AvroFormatter implements FormatterInterface {
                        trigger_error( "Avro failed to serialize record for {$record['channel']} : {$json}" );
                        return null;
                }
+               if ( $revId !== null ) {
+                       return chr( self::MAGIC ) . $this->encode_long( $revId ) . $this->io->string();
+               }
+               // @todo: remove backward compat code and do not send messages without rev id.
                return $this->io->string();
        }
 
@@ -123,17 +132,55 @@ class AvroFormatter implements FormatterInterface {
                if ( !isset( $this->schemas[$channel] ) ) {
                        return null;
                }
-               if ( !$this->schemas[$channel] instanceof AvroSchema ) {
-                       if ( is_string( $this->schemas[$channel] ) ) {
-                               $this->schemas[$channel] = AvroSchema::parse( $this->schemas[$channel] );
+               $schemaDetails = &$this->schemas[$channel];
+               $schema = null;
+               if ( isset( $schemaDetails['revision'] ) && isset( $schemaDetails['schema'] ) ) {
+                       $schema = &$schemaDetails['schema'];
+               } else {
+                       // @todo: Remove backward compat code
+                       $schema = &$schemaDetails;
+               }
+
+               if ( !$schema instanceof AvroSchema ) {
+                       if ( is_string( $schema ) ) {
+                               $schema = AvroSchema::parse( $schema );
                        } else {
-                               $this->schemas[$channel] = AvroSchema::real_parse(
+                               $schema = AvroSchema::real_parse(
                                        $this->schemas[$channel],
                                        null,
                                        new AvroNamedSchemata()
                                );
                        }
                }
-               return $this->schemas[$channel];
+               return $schema;
+       }
+
+       /**
+        * Get the writer for the named channel
+        *
+        * @var string $channel Name of the schema
+        * @return int|null
+        */
+       public function getSchemaRevisionId( $channel ) {
+               // @todo: remove backward compat code
+               if ( isset( $this->schemas[$channel] )
+                               && is_array( $this->schemas[$channel] )
+                               && isset( $this->schemas[$channel]['revision'] ) ) {
+                       return (int) $this->schemas[$channel]['revision'];
+               }
+               return null;
+       }
+
+
+       /**
+        * convert an integer to a 64bits big endian long (Java compatible)
+        * NOTE: certainly only compatible with PHP 64bits
+        * @param int $id
+        * @return string the binary representation of $id
+        */
+       private function encode_long( $id ) {
+               $high   = ( $id & 0xffffffff00000000 ) >> 32;
+               $low    = $id & 0x00000000ffffffff;
+               return pack( 'NN', $high, $low );
        }
 }