jobqueue: make JobQueueDB stricter about broken job_params fields
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 26 Apr 2019 20:11:09 +0000 (13:11 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Sat, 27 Apr 2019 00:14:12 +0000 (00:14 +0000)
Also rename throwDBException() to getDBException() and make the
callers throw the result to avoid phpstorm warnings. Remove $row
assignment that is always overridden as well.

Change-Id: I84bc4b11f10152eada6dd6f4788c4f79dcb4a2fb

includes/jobqueue/JobQueueDB.php
includes/jobqueue/JobQueueRedis.php

index 47ee588..7c78f40 100644 (file)
@@ -91,7 +91,7 @@ class JobQueueDB extends JobQueue {
                                'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
                return !$found;
@@ -118,7 +118,7 @@ class JobQueueDB extends JobQueue {
                                __METHOD__
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
                $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
 
@@ -150,7 +150,7 @@ class JobQueueDB extends JobQueue {
                                __METHOD__
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
 
@@ -187,7 +187,7 @@ class JobQueueDB extends JobQueue {
                                __METHOD__
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
                $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
@@ -281,7 +281,7 @@ class JobQueueDB extends JobQueue {
                                count( $rowSet ) + count( $rowList ) - count( $rows )
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
                if ( $flags & self::QOS_ATOMIC ) {
                        $dbw->endAtomic( $method );
@@ -316,12 +316,7 @@ class JobQueueDB extends JobQueue {
                                $this->incrStats( 'pops', $this->type );
 
                                // Get the job object from the row...
-                               $params = self::extractBlob( $row->job_params );
-                               $params = is_array( $params ) ? $params : []; // sanity
-                               $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
-                               $job = $this->factoryJob( $row->job_cmd, $params );
-                               $job->setMetadata( 'id', $row->job_id );
-                               $job->setMetadata( 'timestamp', $row->job_timestamp );
+                               $job = $this->jobFromRow( $row );
                                break; // done
                        } while ( true );
 
@@ -331,7 +326,7 @@ class JobQueueDB extends JobQueue {
                                $this->recycleAndDeleteStaleJobs();
                        }
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
                return $job;
@@ -352,7 +347,6 @@ class JobQueueDB extends JobQueue {
                // Check cache to see if the queue has <= OFFSET items
                $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
 
-               $row = false; // the row acquired
                $invertedDirection = false; // whether one job_random direction was already scanned
                // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
                // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
@@ -505,7 +499,7 @@ class JobQueueDB extends JobQueue {
 
                        $this->incrStats( 'acks', $this->type );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
        }
 
@@ -560,7 +554,7 @@ class JobQueueDB extends JobQueue {
                try {
                        $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
                return true;
@@ -619,22 +613,11 @@ class JobQueueDB extends JobQueue {
                        return new MappedIterator(
                                $dbr->select( 'job', self::selectFields(), $conds ),
                                function ( $row ) {
-                                       $params = strlen( $row->job_params ) ? unserialize( $row->job_params ) : [];
-                                       $params = is_array( $params ) ? $params : []; // sanity
-                                       $params += [
-                                               'namespace' => $row->job_namespace,
-                                               'title' => $row->job_title
-                                       ];
-
-                                       $job = $this->factoryJob( $row->job_cmd, $params );
-                                       $job->setMetadata( 'id', $row->job_id );
-                                       $job->setMetadata( 'timestamp', $row->job_timestamp );
-
-                                       return $job;
+                                       return $this->jobFromRow( $row );
                                }
                        );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
        }
 
@@ -764,7 +747,7 @@ class JobQueueDB extends JobQueue {
 
                        $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
                } catch ( DBError $e ) {
-                       $this->throwDBException( $e );
+                       throw $this->getDBException( $e );
                }
 
                return $count;
@@ -895,23 +878,30 @@ class JobQueueDB extends JobQueue {
        }
 
        /**
-        * @param string $blob
-        * @return bool|mixed
+        * @param stdClass $row
+        * @return RunnableJob|null
         */
-       protected static function extractBlob( $blob ) {
-               if ( (string)$blob !== '' ) {
-                       return unserialize( $blob );
-               } else {
-                       return false;
+       protected function jobFromRow( $row ) {
+               $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
+               if ( !is_array( $params ) ) { // this shouldn't happen
+                       throw new UnexpectedValueException(
+                               "Could not unserialize job with ID '{$row->job_id}'." );
                }
+
+               $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
+               $job = $this->factoryJob( $row->job_cmd, $params );
+               $job->setMetadata( 'id', $row->job_id );
+               $job->setMetadata( 'timestamp', $row->job_timestamp );
+
+               return $job;
        }
 
        /**
         * @param DBError $e
-        * @throws JobQueueError
+        * @return JobQueueError
         */
-       protected function throwDBException( DBError $e ) {
-               throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
+       protected function getDBException( DBError $e ) {
+               return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
        }
 
        /**
index 8864688..2140043 100644 (file)
@@ -639,7 +639,7 @@ LUA;
                        }
                        $item = $this->unserialize( $data );
                        if ( !is_array( $item ) ) { // this shouldn't happen
-                               throw new UnexpectedValueException( "Could not find job with ID '$uid'." );
+                               throw new UnexpectedValueException( "Could not unserialize job with ID '$uid'." );
                        }
 
                        $params = $item['params'];