Skip to content

Commit

Permalink
Merge pull request #21 from msmakouz/feature/workers
Browse files Browse the repository at this point in the history
Adding an ability to register activities and workflows in specific workers
  • Loading branch information
butschster committed Jul 13, 2022
2 parents 1d63301 + e0dd9d9 commit 8be3759
Show file tree
Hide file tree
Showing 14 changed files with 396 additions and 14 deletions.
57 changes: 55 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

Make sure that your server is configured with following PHP version and extensions:

- PHP 8.0+
- Spiral framework 2.9+
- PHP 8.1+
- Spiral framework 3.0+

## Installation

Expand All @@ -34,6 +34,28 @@ protected const LOAD = [
> Note: if you are using [`spiral-packages/discoverer`](https://github.com/spiral-packages/discoverer),
> you don't need to register bootloader by yourself.
#### Configuration

The package is already configured by default, use these features only if you need to change the default configuration.
The package provides the ability to configure `address`, `namespace`, `defaultWorker`, `workers` parameters.
Create file `app/config/temporal.php` and configure options. For example:

```php
declare(strict_types=1);

use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

return [
'address' => env('TEMPORAL_ADDRESS', 'localhost:7233'),
'namespace' => 'App\\Workflow',
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [
'workerName' => WorkerOptions::new()
],
];
```

#### RoadRunner configuration

Add `temporal` plugin section in your RoadRunner `rr.yaml` config:
Expand Down Expand Up @@ -350,6 +372,37 @@ class PingController
}
```

## Running workers with different task queue

Add a `Spiral\TemporalBridge\Attribute\AssignWorker` attribute to your Workflow or Activity with the `name` of the worker.
This Workflow or Activity will be processed by the specified worker.
Example:

```php
<?php

declare(strict_types=1);

namespace App\Workflow;

use Spiral\TemporalBridge\Attribute\AssignWorker;
use Temporal\Workflow\WorkflowInterface;

#[AssignWorker(name: 'worker1')]
#[WorkflowInterface]
interface MoneyTransferWorkflowInterface
{
#[WorkflowMethod]
public function ping(...): \Generator;

#[SignalMethod]
function withdraw(): void;

#[SignalMethod]
function deposit(): void;
}
```

## Testing

```bash
Expand Down
16 changes: 16 additions & 0 deletions src/Attribute/AssignWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Attribute;

use Spiral\Attributes\NamedArgumentConstructor;

#[\Attribute(\Attribute::TARGET_CLASS), NamedArgumentConstructor]
final class AssignWorker
{
public function __construct(
public string $name
) {
}
}
25 changes: 24 additions & 1 deletion src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
namespace Spiral\TemporalBridge\Bootloader;

use Spiral\Attributes\AttributeReader;
use Spiral\Attributes\ReaderInterface;
use Spiral\Boot\AbstractKernel;
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Boot\EnvironmentInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\Bootloader\Attributes\AttributesBootloader;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\Container;
use Spiral\Core\FactoryInterface;
use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader;
use Spiral\TemporalBridge\Commands;
Expand All @@ -20,6 +22,8 @@
use Spiral\TemporalBridge\Dispatcher;
use Spiral\TemporalBridge\Preset\PresetRegistry;
use Spiral\TemporalBridge\Preset\PresetRegistryInterface;
use Spiral\TemporalBridge\WorkersRegistry;
use Spiral\TemporalBridge\WorkersRegistryInterface;
use Spiral\TemporalBridge\Workflow\WorkflowManager;
use Spiral\TemporalBridge\WorkflowManagerInterface;
use Spiral\TemporalBridge\WorkflowPresetLocator;
Expand All @@ -31,6 +35,7 @@
use Temporal\DataConverter\DataConverter;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
use Temporal\WorkerFactory;

class TemporalBridgeBootloader extends Bootloader
Expand All @@ -41,12 +46,14 @@ class TemporalBridgeBootloader extends Bootloader
WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'],
WorkflowClientInterface::class => [self::class, 'initWorkflowClient'],
WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'],
PresetRegistryInterface::class => PresetRegistry::class,
];

protected const DEPENDENCIES = [
ConsoleBootloader::class,
RoadRunnerBootloader::class,
AttributesBootloader::class,
];

public function __construct(
Expand All @@ -69,6 +76,11 @@ public function init(
$console->addCommand(Commands\PresetListCommand::class);
}

public function addWorkerOptions(string $worker, WorkerOptions $options): void
{
$this->config->modify(TemporalConfig::CONFIG, new Append('workers', $worker, $options));
}

private function initWorkflowPresetLocator(
FactoryInterface $factory,
ClassesInterface $classes
Expand All @@ -87,6 +99,8 @@ private function initConfig(EnvironmentInterface $env): void
[
'address' => $env->get('TEMPORAL_ADDRESS', '127.0.0.1:7233'),
'namespace' => 'App\\Workflow',
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE),
'workers' => [],
]
);
}
Expand All @@ -113,4 +127,13 @@ private function initDeclarationLocator(ClassesInterface $classes): DeclarationL
new AttributeReader()
);
}

private function initWorkersRegistry(
WorkerFactoryInterface $workerFactory,
ReaderInterface $reader,
FinalizerInterface $finalizer,
TemporalConfig $config
): WorkersRegistryInterface {
return new WorkersRegistry($workerFactory, $reader, $finalizer, $config);
}
}
17 changes: 16 additions & 1 deletion src/Config/TemporalConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
namespace Spiral\TemporalBridge\Config;

use Spiral\Core\InjectableConfig;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

final class TemporalConfig extends InjectableConfig
{
public const CONFIG = 'temporal';
protected array $config = [
'address' => null,
'namespace' => null
'namespace' => null,
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [],
];

public function getDefaultNamespace(): string
Expand All @@ -23,4 +27,15 @@ public function getAddress(): string
{
return $this->config['address'] ?? 'localhost:7233';
}

public function getDefaultWorker(): string
{
return $this->config['defaultWorker'] ?? WorkerFactoryInterface::DEFAULT_TASK_QUEUE;
}

/** @psalm-return array<non-empty-string, WorkerOptions> */
public function getWorkers(): array
{
return $this->config['workers'] ?? [];
}
}
14 changes: 4 additions & 10 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

use ReflectionClass;
use Spiral\Boot\DispatcherInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\Core\Container;
use Spiral\Boot\EnvironmentInterface;
use Spiral\RoadRunnerBridge\RoadRunnerMode;
use Temporal\Activity\ActivityInterface;
use Temporal\Worker\WorkerFactoryInterface;
Expand All @@ -17,7 +15,6 @@
final class Dispatcher implements DispatcherInterface
{
public function __construct(
private readonly EnvironmentInterface $env,
private readonly RoadRunnerMode $mode,
private readonly Container $container
) {
Expand All @@ -39,15 +36,12 @@ public function serve(): void
// factory initiates and runs task queue specific activity and workflow workers
$factory = $this->container->get(WorkerFactoryInterface::class);

// Worker that listens on a task queue and hosts both workflow and activity implementations.
$worker = $factory->newWorker(
(string)$this->env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE)
);

$finalizer = $this->container->get(FinalizerInterface::class);
$worker->registerActivityFinalizer(fn() => $finalizer->finalize());
$registry = $this->container->get(WorkersRegistryInterface::class);

foreach ($declarations as $type => $declaration) {
// Worker that listens on a task queue and hosts both workflow and activity implementations.
$worker = $registry->get($declaration);

if ($type === WorkflowInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerWorkflowTypes($declaration->getName());
Expand Down
57 changes: 57 additions & 0 deletions src/WorkersRegistry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Spiral\Attributes\ReaderInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\TemporalBridge\Attribute\AssignWorker;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;

final class WorkersRegistry implements WorkersRegistryInterface
{
/** @psalm-var array<non-empty-string, WorkerInterface> */
private array $workers = [];

/** @psalm-param array<non-empty-string, WorkerOptions> $options */
public function __construct(
private WorkerFactoryInterface $workerFactory,
private ReaderInterface $reader,
private FinalizerInterface $finalizer,
private TemporalConfig $config
) {
}

public function get(\ReflectionClass $declaration): WorkerInterface
{
$name = $this->resolveName($declaration);
$options = $this->config->getWorkers();

if (!$this->hasWorker($name)) {
$this->workers[$name] = $this->workerFactory->newWorker($name, $options[$name] ?? null);
$this->workers[$name]->registerActivityFinalizer(fn() => $this->finalizer->finalize());
}

return $this->workers[$name];
}

private function hasWorker(string $name): bool
{
return isset($this->workers[$name]);
}

private function resolveName(\ReflectionClass $declaration): string
{
$assignWorker = $this->reader->firstClassMetadata($declaration, AssignWorker::class);

if ($assignWorker === null) {
return $this->config->getDefaultWorker();
}

return $assignWorker->name;
}
}
12 changes: 12 additions & 0 deletions src/WorkersRegistryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Temporal\Worker\WorkerInterface;

interface WorkersRegistryInterface
{
public function get(\ReflectionClass $declaration): WorkerInterface;
}
12 changes: 12 additions & 0 deletions tests/app/src/SomeActivity.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

use Spiral\TemporalBridge\Attribute\AssignWorker;

#[AssignWorker(name: 'worker1')]
class SomeActivity
{
}
12 changes: 12 additions & 0 deletions tests/app/src/SomeWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

use Spiral\TemporalBridge\Attribute\AssignWorker;

#[AssignWorker(name: 'worker2')]
class SomeWorkflow
{
}
9 changes: 9 additions & 0 deletions tests/app/src/WithoutAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

class WithoutAttribute
{
}
30 changes: 30 additions & 0 deletions tests/src/Attribute/AssignWorkerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\Attribute;

use Spiral\Attributes\Factory;
use Spiral\TemporalBridge\Attribute\AssignWorker;
use Spiral\TemporalBridge\Tests\App\SomeActivity;
use Spiral\TemporalBridge\Tests\App\SomeWorkflow;
use Spiral\TemporalBridge\Tests\App\WithoutAttribute;
use Spiral\TemporalBridge\Tests\TestCase;

final class AssignWorkerTest extends TestCase
{
/** @dataProvider assignWorkerDataProvider */
public function testAssignWorkerAttribute(\ReflectionClass $class, ?AssignWorker $expected = null): void
{
$reader = (new Factory())->create();

$this->assertEquals($expected, $reader->firstClassMetadata($class, AssignWorker::class));
}

public function assignWorkerDataProvider(): \Traversable
{
yield [new \ReflectionClass(SomeActivity::class), new AssignWorker('worker1')];
yield [new \ReflectionClass(SomeWorkflow::class), new AssignWorker('worker2')];
yield [new \ReflectionClass(WithoutAttribute::class), null];
}
}
Loading

0 comments on commit 8be3759

Please sign in to comment.