Merge "Http::getProxy() method to get proxy configuration"
[lhc/web/wiklou.git] / includes / debug / logger / monolog / KafkaHandler.php
1 <?php
2 /**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21 namespace MediaWiki\Logger\Monolog;
22
23 use Kafka\MetaDataFromKafka;
24 use Kafka\Produce;
25 use MediaWiki\Logger\LoggerFactory;
26 use Monolog\Handler\AbstractProcessingHandler;
27 use Monolog\Logger;
28 use Psr\Log\LoggerInterface;
29
30 /**
31 * Log handler sends log events to a kafka server.
32 *
33 * Constructor options array arguments:
34 * * alias: map from monolog channel to kafka topic name. When no
35 * alias exists the topic "monolog_$channel" will be used.
36 * * swallowExceptions: Swallow exceptions that occur while talking to
37 * kafka. Defaults to false.
38 * * logExceptions: Log exceptions talking to kafka here. Either null,
39 * the name of a channel to log to, or an object implementing
40 * FormatterInterface. Defaults to null.
41 *
42 * Requires the nmred/kafka-php library, version >= 1.3.0
43 *
44 * @since 1.26
45 * @author Erik Bernhardson <ebernhardson@wikimedia.org>
46 * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
47 */
48 class KafkaHandler extends AbstractProcessingHandler {
49 /**
50 * @var Produce Sends requests to kafka
51 */
52 protected $produce;
53
54 /**
55 * @var array Optional handler configuration
56 */
57 protected $options;
58
59 /**
60 * @var array Map from topic name to partition this request produces to
61 */
62 protected $partitions = [];
63
64 /**
65 * @var array defaults for constructor options
66 */
67 private static $defaultOptions = [
68 'alias' => [], // map from monolog channel to kafka topic
69 'swallowExceptions' => false, // swallow exceptions sending records
70 'logExceptions' => null, // A PSR3 logger to inform about errors
71 ];
72
73 /**
74 * @param Produce $produce Kafka instance to produce through
75 * @param array $options optional handler configuration
76 * @param int $level The minimum logging level at which this handler will be triggered
77 * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
78 */
79 public function __construct(
80 Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
81 ) {
82 parent::__construct( $level, $bubble );
83 $this->produce = $produce;
84 $this->options = array_merge( self::$defaultOptions, $options );
85 }
86
87 /**
88 * Constructs the necessary support objects and returns a KafkaHandler
89 * instance.
90 *
91 * @param string[] $kafkaServers
92 * @param array $options
93 * @param int $level The minimum logging level at which this handle will be triggered
94 * @param bool $bubble Whether the messages that are handled can bubble the stack or not
95 * @return KafkaHandler
96 */
97 public static function factory(
98 $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
99 ) {
100 $metadata = new MetaDataFromKafka( $kafkaServers );
101 $produce = new Produce( $metadata );
102
103 if ( isset( $options['sendTimeout'] ) ) {
104 $timeOut = $options['sendTimeout'];
105 $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
106 $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
107 intval( $timeOut * 1000000 )
108 );
109 }
110 if ( isset( $options['recvTimeout'] ) ) {
111 $timeOut = $options['recvTimeout'];
112 $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
113 $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
114 intval( $timeOut * 1000000 )
115 );
116 }
117 if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
118 $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
119 }
120
121 return new self( $produce, $options, $level, $bubble );
122 }
123
124 /**
125 * {@inheritDoc}
126 */
127 protected function write( array $record ) {
128 if ( $record['formatted'] !== null ) {
129 $this->addMessages( $record['channel'], [ $record['formatted'] ] );
130 $this->send();
131 }
132 }
133
134 /**
135 * {@inheritDoc}
136 */
137 public function handleBatch( array $batch ) {
138 $channels = [];
139 foreach ( $batch as $record ) {
140 if ( $record['level'] < $this->level ) {
141 continue;
142 }
143 $channels[$record['channel']][] = $this->processRecord( $record );
144 }
145
146 $formatter = $this->getFormatter();
147 foreach ( $channels as $channel => $records ) {
148 $messages = [];
149 foreach ( $records as $idx => $record ) {
150 $message = $formatter->format( $record );
151 if ( $message !== null ) {
152 $messages[] = $message;
153 }
154 }
155 if ( $messages ) {
156 $this->addMessages( $channel, $messages );
157 }
158 }
159
160 $this->send();
161 }
162
163 /**
164 * Send any records in the kafka client internal queue.
165 */
166 protected function send() {
167 try {
168 $this->produce->send();
169 } catch ( \Kafka\Exception $e ) {
170 $ignore = $this->warning(
171 'Error sending records to kafka: {exception}',
172 [ 'exception' => $e ] );
173 if ( !$ignore ) {
174 throw $e;
175 }
176 }
177 }
178
179 /**
180 * @param string $topic Name of topic to get partition for
181 * @return int|null The random partition to produce to for this request,
182 * or null if a partition could not be determined.
183 */
184 protected function getRandomPartition( $topic ) {
185 if ( !array_key_exists( $topic, $this->partitions ) ) {
186 try {
187 $partitions = $this->produce->getAvailablePartitions( $topic );
188 } catch ( \Kafka\Exception $e ) {
189 $ignore = $this->warning(
190 'Error getting metadata for kafka topic {topic}: {exception}',
191 [ 'topic' => $topic, 'exception' => $e ] );
192 if ( $ignore ) {
193 return null;
194 }
195 throw $e;
196 }
197 if ( $partitions ) {
198 $key = array_rand( $partitions );
199 $this->partitions[$topic] = $partitions[$key];
200 } else {
201 $details = $this->produce->getClient()->getTopicDetail( $topic );
202 $ignore = $this->warning(
203 'No partitions available for kafka topic {topic}',
204 [ 'topic' => $topic, 'kafka' => $details ]
205 );
206 if ( !$ignore ) {
207 throw new \RuntimeException( "No partitions available for kafka topic $topic" );
208 }
209 $this->partitions[$topic] = null;
210 }
211 }
212 return $this->partitions[$topic];
213 }
214
215 /**
216 * Adds records for a channel to the Kafka client internal queue.
217 *
218 * @param string $channel Name of Monolog channel records belong to
219 * @param array $records List of records to append
220 */
221 protected function addMessages( $channel, array $records ) {
222 if ( isset( $this->options['alias'][$channel] ) ) {
223 $topic = $this->options['alias'][$channel];
224 } else {
225 $topic = "monolog_$channel";
226 }
227 $partition = $this->getRandomPartition( $topic );
228 if ( $partition !== null ) {
229 $this->produce->setMessages( $topic, $partition, $records );
230 }
231 }
232
233 /**
234 * @param string $message PSR3 compatible message string
235 * @param array $context PSR3 compatible log context
236 * @return bool true if caller should ignore warning
237 */
238 protected function warning( $message, array $context = [] ) {
239 if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
240 $this->options['logExceptions']->warning( $message, $context );
241 }
242 return $this->options['swallowExceptions'];
243 }
244 }