skins: Remove 'usemsgcache' and deprecate getDynamicStylesheetQuery
[lhc/web/wiklou.git] / includes / debug / logger / monolog / KafkaHandler.php
1 <?php
2 /**
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
17 *
18 * @file
19 */
20
21 namespace MediaWiki\Logger\Monolog;
22
23 use Kafka\MetaDataFromKafka;
24 use Kafka\Produce;
25 use Kafka\Protocol\Decoder;
26 use MediaWiki\Logger\LoggerFactory;
27 use Monolog\Handler\AbstractProcessingHandler;
28 use Monolog\Logger;
29 use Psr\Log\LoggerInterface;
30
31 /**
32 * Log handler sends log events to a kafka server.
33 *
34 * Constructor options array arguments:
35 * * alias: map from monolog channel to kafka topic name. When no
36 * alias exists the topic "monolog_$channel" will be used.
37 * * swallowExceptions: Swallow exceptions that occur while talking to
38 * kafka. Defaults to false.
39 * * logExceptions: Log exceptions talking to kafka here. Either null,
40 * the name of a channel to log to, or an object implementing
41 * FormatterInterface. Defaults to null.
42 *
43 * Requires the nmred/kafka-php library, version >= 1.3.0
44 *
45 * @since 1.26
46 * @author Erik Bernhardson <ebernhardson@wikimedia.org>
47 * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
48 */
49 class KafkaHandler extends AbstractProcessingHandler {
50 /**
51 * @var Produce Sends requests to kafka
52 */
53 protected $produce;
54
55 /**
56 * @var array Optional handler configuration
57 */
58 protected $options;
59
60 /**
61 * @var array Map from topic name to partition this request produces to
62 */
63 protected $partitions = [];
64
65 /**
66 * @var array defaults for constructor options
67 */
68 private static $defaultOptions = [
69 'alias' => [], // map from monolog channel to kafka topic
70 'swallowExceptions' => false, // swallow exceptions sending records
71 'logExceptions' => null, // A PSR3 logger to inform about errors
72 'requireAck' => 0,
73 ];
74
75 /**
76 * @param Produce $produce Kafka instance to produce through
77 * @param array $options optional handler configuration
78 * @param int $level The minimum logging level at which this handler will be triggered
79 * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
80 */
81 public function __construct(
82 Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
83 ) {
84 parent::__construct( $level, $bubble );
85 $this->produce = $produce;
86 $this->options = array_merge( self::$defaultOptions, $options );
87 }
88
89 /**
90 * Constructs the necessary support objects and returns a KafkaHandler
91 * instance.
92 *
93 * @param string[] $kafkaServers
94 * @param array $options
95 * @param int $level The minimum logging level at which this handle will be triggered
96 * @param bool $bubble Whether the messages that are handled can bubble the stack or not
97 * @return KafkaHandler
98 */
99 public static function factory(
100 $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
101 ) {
102 $metadata = new MetaDataFromKafka( $kafkaServers );
103 $produce = new Produce( $metadata );
104
105 if ( isset( $options['sendTimeout'] ) ) {
106 $timeOut = $options['sendTimeout'];
107 $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
108 $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
109 intval( $timeOut * 1000000 )
110 );
111 }
112 if ( isset( $options['recvTimeout'] ) ) {
113 $timeOut = $options['recvTimeout'];
114 $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
115 $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
116 intval( $timeOut * 1000000 )
117 );
118 }
119 if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
120 $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
121 }
122
123 if ( isset( $options['requireAck'] ) ) {
124 $produce->setRequireAck( $options['requireAck'] );
125 }
126
127 return new self( $produce, $options, $level, $bubble );
128 }
129
130 /**
131 * @inheritDoc
132 */
133 protected function write( array $record ) {
134 if ( $record['formatted'] !== null ) {
135 $this->addMessages( $record['channel'], [ $record['formatted'] ] );
136 $this->send();
137 }
138 }
139
140 /**
141 * @inheritDoc
142 */
143 public function handleBatch( array $batch ) {
144 $channels = [];
145 foreach ( $batch as $record ) {
146 if ( $record['level'] < $this->level ) {
147 continue;
148 }
149 $channels[$record['channel']][] = $this->processRecord( $record );
150 }
151
152 $formatter = $this->getFormatter();
153 foreach ( $channels as $channel => $records ) {
154 $messages = [];
155 foreach ( $records as $idx => $record ) {
156 $message = $formatter->format( $record );
157 if ( $message !== null ) {
158 $messages[] = $message;
159 }
160 }
161 if ( $messages ) {
162 $this->addMessages( $channel, $messages );
163 }
164 }
165
166 $this->send();
167 }
168
169 /**
170 * Send any records in the kafka client internal queue.
171 */
172 protected function send() {
173 try {
174 $response = $this->produce->send();
175 } catch ( \Kafka\Exception $e ) {
176 $ignore = $this->warning(
177 'Error sending records to kafka: {exception}',
178 [ 'exception' => $e ] );
179 if ( !$ignore ) {
180 throw $e;
181 } else {
182 return;
183 }
184 }
185
186 if ( is_bool( $response ) ) {
187 return;
188 }
189
190 $errors = [];
191 foreach ( $response as $topicName => $partitionResponse ) {
192 foreach ( $partitionResponse as $partition => $info ) {
193 if ( $info['errCode'] === 0 ) {
194 // no error
195 continue;
196 }
197 $errors[] = sprintf(
198 'Error producing to %s (errno %d): %s',
199 $topicName,
200 $info['errCode'],
201 Decoder::getError( $info['errCode'] )
202 );
203 }
204 }
205
206 if ( $errors ) {
207 $error = implode( "\n", $errors );
208 if ( !$this->warning( $error ) ) {
209 throw new \RuntimeException( $error );
210 }
211 }
212 }
213
214 /**
215 * @param string $topic Name of topic to get partition for
216 * @return int|null The random partition to produce to for this request,
217 * or null if a partition could not be determined.
218 */
219 protected function getRandomPartition( $topic ) {
220 if ( !array_key_exists( $topic, $this->partitions ) ) {
221 try {
222 $partitions = $this->produce->getAvailablePartitions( $topic );
223 } catch ( \Kafka\Exception $e ) {
224 $ignore = $this->warning(
225 'Error getting metadata for kafka topic {topic}: {exception}',
226 [ 'topic' => $topic, 'exception' => $e ] );
227 if ( $ignore ) {
228 return null;
229 }
230 throw $e;
231 }
232 if ( $partitions ) {
233 $key = array_rand( $partitions );
234 $this->partitions[$topic] = $partitions[$key];
235 } else {
236 $details = $this->produce->getClient()->getTopicDetail( $topic );
237 $ignore = $this->warning(
238 'No partitions available for kafka topic {topic}',
239 [ 'topic' => $topic, 'kafka' => $details ]
240 );
241 if ( !$ignore ) {
242 throw new \RuntimeException( "No partitions available for kafka topic $topic" );
243 }
244 $this->partitions[$topic] = null;
245 }
246 }
247 return $this->partitions[$topic];
248 }
249
250 /**
251 * Adds records for a channel to the Kafka client internal queue.
252 *
253 * @param string $channel Name of Monolog channel records belong to
254 * @param array $records List of records to append
255 */
256 protected function addMessages( $channel, array $records ) {
257 if ( isset( $this->options['alias'][$channel] ) ) {
258 $topic = $this->options['alias'][$channel];
259 } else {
260 $topic = "monolog_$channel";
261 }
262 $partition = $this->getRandomPartition( $topic );
263 if ( $partition !== null ) {
264 $this->produce->setMessages( $topic, $partition, $records );
265 }
266 }
267
268 /**
269 * @param string $message PSR3 compatible message string
270 * @param array $context PSR3 compatible log context
271 * @return bool true if caller should ignore warning
272 */
273 protected function warning( $message, array $context = [] ) {
274 if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
275 $this->options['logExceptions']->warning( $message, $context );
276 }
277 return $this->options['swallowExceptions'];
278 }
279 }