Neos Flow package that allows for asynchronous and distributed execution of tasks.
- Quickstart
- Introduction
- Message Queue
- Job Queue
- Command Line Interface
- Signals & Slots
- License
- Contributions
- Install this package using composer:
composer require flowpack/jobqueue-common
(or by adding the dependency to the composer manifest of an installed package)
- Configure a basic queue by adding the following to your
Settings.yaml
:
Flowpack:
JobQueue:
Common:
queues:
'some-queue':
className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
- Initialize the queue (if required)
With
./flow queue:setup some-queue
you can setup the queue and/or verify its configuration.
In the case of the FakeQueue
that step is not required.
Note: The queue:setup
command won't remove any existing messages, there is no harm in calling it multiple times
- Annotate any public method you want to be executed asynchronously:
use Flowpack\JobQueue\Common\Annotations as Job;
class SomeClass {
/**
* @Job\Defer(queueName="some-queue")
*/
public function sendEmail($emailAddress)
{
// send some email to $emailAddress
}
}
or use attributes instead of annotations (PHP 8.0 and later):
use Flowpack\JobQueue\Common\Annotations as Job;
class SomeClass {
#[Job\Defer(queueName: "some-queue")]
public function sendEmail($emailAddress)
{
// send some email to $emailAddress
}
}
Note: The method needs to be public and it must not return anything
- Start the worker (if required)
With the above code in place, whenever the method SomeClass::sendEmail()
is about to be called that method call is converted into a job that is executed asynchronously[1].
Unless you use the FakeQueue
like in the example, a so called worker
has to be started, to listen for new jobs and execute them::
./flow flowpack.jobqueue.common:job:work some-queue --verbose
To get started let's first define some terms:
- Message
-
A piece of information passed between programs or systems, sometimes also referred to as "Event".
In the JobQueue packages we use messages to transmit `Jobs`. - Message Queue
-
According to Wikipedia "message queues [...] are software-engineering components used for inter-process communication (IPC), or for inter-thread communication within the same process".
In the context of the JobQueue packages we refer to "Message Queue" as a FIFO buffer that distributes messages to one or more consumers, so that every message is only processed once. - Job
-
A unit of work to be executed (asynchronously).
In the JobQueue packages we use the Message Queue to store serialized jobs, so it acts as a "Job stream". - Job Manager
- Central authority allowing adding and fetching jobs to/from the Message Queue.
- Worker
-
The worker watches a queue and triggers the job execution.
This package comes with a `job:work` command that does this (see below) - submit
- New messages are *submitted* to a queue to be processed by a worker
- reserve
-
Before a message can be processed it has to be *reserved*.
The queue guarantees that a single message can never be reserved by two workers (unless it has been released again) - release
-
A reserved message can be *released* to the queue to be processed at a later time.
The *JobManager* does this if Job execution failed and the `maximumNumberOfReleases` setting for the queue is greater than zero - abort
-
If a message could not be processed successfully it is *aborted* marking it *failed* in the respective queue so that it can't be reserved again.
The *JobManager* aborts a message if Job execution failed and the message can't be released (again) - finish
-
If a message was processed successfully it is marked *finished*.
The *JobManager* finishes a message if Job execution succeeded.
The Flowpack.JobQueue.Common
package comes with a very basic Message Queue implementation Flowpack\JobQueue\Common\Queue\FakeQueue
that allows for execution of Jobs using sub requests.
It doesn't need any 3rd party tools or server loops and works for basic scenarios. But it has a couple of limitations to be aware of:
-
It is not actually a queue, but dispatches jobs immediately as they are queued. So it's not possible to distribute the work to multiple workers
-
The
JobManager
is not involved in processing of jobs so the jobs need to take care of error handling themselves. -
For the same reason Signals are not emitted for the
FakeQueue
. -
With Flow 3.3+ The
FakeQueue
supports a flagasync
. Without that flag set, executing jobs block the main thread!
For advanced usage it is recommended to use one of the implementing packages like one of the following:
This is the simplest configuration for a queue:
Flowpack:
JobQueue:
Common:
queues:
'test':
className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
With this a queue named test
will be available.
Note: For reusable packages you should consider adding a vendor specific prefixes to avoid collisions. We recommend to use a classname or the package name with the function name (e.g. Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.
The following parameters are supported by all queues:
Parameter | Type | Default | Description |
---|---|---|---|
className | string | - | FQN of the class implementing the queue |
maximumNumberOfReleases | integer | 3 | Max. number of times a message is re- released to the queue if a job failed |
executeIsolated | boolean | FALSE | If TRUE jobs for this queue are executed in a separate Thread. This makes sense in order to avoid memory leaks and side-effects |
queueNamePrefix | string | - | Optional prefix for the internal queue name, allowing to re-use the same backend over multiple installations |
options | array | - | Options for the queue. Implementation specific (see corresponding package) |
releaseOptions | array | - | Options that will be passed to release() when a job failed.Implementation specific (see corresponding package) |
A more complex example could look something like:
Flowpack:
JobQueue:
Common:
queues:
'email':
className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue'
maximumNumberOfReleases: 5
executeIsolated: true
queueNamePrefix: 'staging-'
options:
client:
host: 127.0.0.11
port: 11301
defaultTimeout: 50
releaseOptions:
priority: 512
delay: 120
'log':
className: 'Flowpack\JobQueue\Redis\Queue\RedisQueue'
options:
defaultTimeout: 10
As you can see, you can have multiple queues in one installations. That allows you to use different backends/options for queues depending on the requirements.
If multiple queries share common configuration presets can be used to ease readability and maintainability:
Flowpack:
JobQueue:
Common:
presets:
'staging-default':
className: 'Flowpack\JobQueue\Doctrine\Queue\DoctrineQueue'
queueNamePrefix: 'staging-'
options:
pollInterval: 2
queues:
'email':
preset: 'staging-default'
options:
tableName: 'queue_email' # default table name would be "flowpack_jobqueue_messages_email"
'log':
preset: 'staging-default'
options:
pollInterval: 1 # overrides "pollInterval" of the preset
This will configure two DoctrineQueue
s "email" and "log" with some common options but different table names and poll intervals.
The job is an arbitrary class implementing Flowpack\JobQueue\Common\Job\JobInterface
.
This package comes with one implementation StaticMethodCallJob
that allows for invoking a public method (see Quickstart)
but often it makes sense to create a custom Job:
<?php
use Flowpack\JobQueue\Common\Job\JobInterface;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
class SendEmailJob implements JobInterface
{
protected $emailAddress;
public function __construct($emailAddress)
{
$this->emailAddress = $emailAddress;
}
public function execute(QueueInterface $queue, Message $message)
{
// TODO: send the email to $this->emailAddress
return true;
}
public function getIdentifier()
{
return 'SendEmailJob';
}
public function getLabel()
{
return sprintf('SendEmailJob (email: "%S")', $this->emailAddress);
}
}
Note: It's crucial that the execute()
method returns TRUE on success, otherwise the corresponding message will be released again and/or marked failed.
With that in place, the new job can be added to a queue like this:
use Flowpack\JobQueue\Common\Job\JobInterface;
use Flowpack\JobQueue\Common\Job\JobManager;
use Neos\Flow\Annotations as Flow;
class SomeClass {
/**
* @Flow\Inject
* @var JobManager
*/
protected $jobManager;
/**
* @return void
*/
public function queueJob()
{
$job = new SendEmailJob('[email protected]');
$this->jobManager->queue('queue-name', $job);
}
}
Use the flowpack.jobqueue.common:queue:*
and flowpack.jobqueue.common:job:*
commands to interact with the job queues:
Command | Description |
---|---|
queue:list | List configured queues |
queue:describe | Shows details for a given queue (settings, ..) |
queue:setup | Initialize a queue (i.e. create required db tables, check connection, ...) |
queue:flush | Remove all messages from a queue (requires --force flag) |
queue:submit | Submit a message to a queue (mainly for testing) |
job:work | Work on a queue and execute jobs |
job:list | List queued jobs |
When working with JobQueues proper monitoring is crucial as failures might not be visible immediately.
The JobManager
emits signals for all relevant events, namely:
- messageSubmitted
- messageTimeout
- messageReserved
- messageFinished
- messageReleased
- messageFailed
Those can be used to implement some more sophisticated logging for example:
<?php
namespace Your\Package;
use Flowpack\JobQueue\Common\Job\JobManager;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Neos\Flow\Core\Bootstrap;
use Neos\Flow\Log\SystemLoggerInterface;
use Neos\Flow\Package\Package as BasePackage;
class Package extends BasePackage
{
/**
* @param Bootstrap $bootstrap
* @return void
*/
public function boot(Bootstrap $bootstrap)
{
$dispatcher = $bootstrap->getSignalSlotDispatcher();
$dispatcher->connect(
JobManager::class, 'messageFailed',
function(QueueInterface $queue, Message $message, \Exception $jobExecutionException = null) use ($bootstrap) {
$additionalData = [
'queue' => $queue->getName(),
'message' => $message->getIdentifier()
];
if ($jobExecutionException !== null) {
$additionalData['exception'] = $jobExecutionException->getMessage();
}
$bootstrap->getObjectManager()->get(SystemLoggerInterface::class)->log('Job failed', LOG_ERR, $additionalData);
}
);
}
}
This would log every failed message to the system log.
This package is licensed under the MIT license
Pull-Requests are more than welcome. Make sure to read the Code Of Conduct.
[1] The FakeQueue
actually executes Jobs synchronously unless the async
flag is set (requires Flow 3.3+)