Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding an ability to register activities and workflows in specific workers #21

Merged
merged 1 commit into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

Make sure that your server is configured with following PHP version and extensions:

- PHP 8.0+
- Spiral framework 2.9+
- PHP 8.1+
- Spiral framework 3.0+

## Installation

Expand All @@ -34,6 +34,28 @@ protected const LOAD = [
> Note: if you are using [`spiral-packages/discoverer`](https://github.com/spiral-packages/discoverer),
> you don't need to register bootloader by yourself.

#### Configuration

The package is already configured by default, use these features only if you need to change the default configuration.
The package provides the ability to configure `address`, `namespace`, `defaultWorker`, `workers` parameters.
Create file `app/config/temporal.php` and configure options. For example:

```php
declare(strict_types=1);

use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

return [
'address' => env('TEMPORAL_ADDRESS', 'localhost:7233'),
'namespace' => 'App\\Workflow',
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [
'workerName' => WorkerOptions::new()
],
];
```

#### RoadRunner configuration

Add `temporal` plugin section in your RoadRunner `rr.yaml` config:
Expand Down Expand Up @@ -350,6 +372,37 @@ class PingController
}
```

## Running workers with different task queue

Add a `Spiral\TemporalBridge\Attribute\AssignWorker` attribute to your Workflow or Activity with the `name` of the worker.
This Workflow or Activity will be processed by the specified worker.
Example:

```php
<?php

declare(strict_types=1);

namespace App\Workflow;

use Spiral\TemporalBridge\Attribute\AssignWorker;
use Temporal\Workflow\WorkflowInterface;

#[AssignWorker(name: 'worker1')]
#[WorkflowInterface]
interface MoneyTransferWorkflowInterface
{
#[WorkflowMethod]
public function ping(...): \Generator;

#[SignalMethod]
function withdraw(): void;

#[SignalMethod]
function deposit(): void;
}
```

## Testing

```bash
Expand Down
16 changes: 16 additions & 0 deletions src/Attribute/AssignWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Attribute;

use Spiral\Attributes\NamedArgumentConstructor;

#[\Attribute(\Attribute::TARGET_CLASS), NamedArgumentConstructor]
final class AssignWorker
{
public function __construct(
public string $name
) {
}
}
25 changes: 24 additions & 1 deletion src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
namespace Spiral\TemporalBridge\Bootloader;

use Spiral\Attributes\AttributeReader;
use Spiral\Attributes\ReaderInterface;
use Spiral\Boot\AbstractKernel;
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Boot\EnvironmentInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\Bootloader\Attributes\AttributesBootloader;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\Container;
use Spiral\Core\FactoryInterface;
use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader;
use Spiral\TemporalBridge\Commands;
Expand All @@ -20,6 +22,8 @@
use Spiral\TemporalBridge\Dispatcher;
use Spiral\TemporalBridge\Preset\PresetRegistry;
use Spiral\TemporalBridge\Preset\PresetRegistryInterface;
use Spiral\TemporalBridge\WorkersRegistry;
use Spiral\TemporalBridge\WorkersRegistryInterface;
use Spiral\TemporalBridge\Workflow\WorkflowManager;
use Spiral\TemporalBridge\WorkflowManagerInterface;
use Spiral\TemporalBridge\WorkflowPresetLocator;
Expand All @@ -31,6 +35,7 @@
use Temporal\DataConverter\DataConverter;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
use Temporal\WorkerFactory;

class TemporalBridgeBootloader extends Bootloader
Expand All @@ -41,12 +46,14 @@ class TemporalBridgeBootloader extends Bootloader
WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'],
WorkflowClientInterface::class => [self::class, 'initWorkflowClient'],
WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'],
PresetRegistryInterface::class => PresetRegistry::class,
];

protected const DEPENDENCIES = [
ConsoleBootloader::class,
RoadRunnerBootloader::class,
AttributesBootloader::class,
];

public function __construct(
Expand All @@ -69,6 +76,11 @@ public function init(
$console->addCommand(Commands\PresetListCommand::class);
}

public function addWorkerOptions(string $worker, WorkerOptions $options): void
{
$this->config->modify(TemporalConfig::CONFIG, new Append('workers', $worker, $options));
}

private function initWorkflowPresetLocator(
FactoryInterface $factory,
ClassesInterface $classes
Expand All @@ -87,6 +99,8 @@ private function initConfig(EnvironmentInterface $env): void
[
'address' => $env->get('TEMPORAL_ADDRESS', '127.0.0.1:7233'),
'namespace' => 'App\\Workflow',
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE),
'workers' => [],
]
);
}
Expand All @@ -113,4 +127,13 @@ private function initDeclarationLocator(ClassesInterface $classes): DeclarationL
new AttributeReader()
);
}

private function initWorkersRegistry(
WorkerFactoryInterface $workerFactory,
ReaderInterface $reader,
FinalizerInterface $finalizer,
TemporalConfig $config
): WorkersRegistryInterface {
return new WorkersRegistry($workerFactory, $reader, $finalizer, $config);
}
}
17 changes: 16 additions & 1 deletion src/Config/TemporalConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
namespace Spiral\TemporalBridge\Config;

use Spiral\Core\InjectableConfig;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

final class TemporalConfig extends InjectableConfig
{
public const CONFIG = 'temporal';
protected array $config = [
'address' => null,
'namespace' => null
'namespace' => null,
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [],
];

public function getDefaultNamespace(): string
Expand All @@ -23,4 +27,15 @@ public function getAddress(): string
{
return $this->config['address'] ?? 'localhost:7233';
}

public function getDefaultWorker(): string
{
return $this->config['defaultWorker'] ?? WorkerFactoryInterface::DEFAULT_TASK_QUEUE;
}

/** @psalm-return array<non-empty-string, WorkerOptions> */
public function getWorkers(): array
{
return $this->config['workers'] ?? [];
}
}
14 changes: 4 additions & 10 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

use ReflectionClass;
use Spiral\Boot\DispatcherInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\Core\Container;
use Spiral\Boot\EnvironmentInterface;
use Spiral\RoadRunnerBridge\RoadRunnerMode;
use Temporal\Activity\ActivityInterface;
use Temporal\Worker\WorkerFactoryInterface;
Expand All @@ -17,7 +15,6 @@
final class Dispatcher implements DispatcherInterface
{
public function __construct(
private readonly EnvironmentInterface $env,
private readonly RoadRunnerMode $mode,
private readonly Container $container
) {
Expand All @@ -39,15 +36,12 @@ public function serve(): void
// factory initiates and runs task queue specific activity and workflow workers
$factory = $this->container->get(WorkerFactoryInterface::class);

// Worker that listens on a task queue and hosts both workflow and activity implementations.
$worker = $factory->newWorker(
(string)$this->env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE)
);

$finalizer = $this->container->get(FinalizerInterface::class);
$worker->registerActivityFinalizer(fn() => $finalizer->finalize());
$registry = $this->container->get(WorkersRegistryInterface::class);

foreach ($declarations as $type => $declaration) {
// Worker that listens on a task queue and hosts both workflow and activity implementations.
$worker = $registry->get($declaration);

if ($type === WorkflowInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerWorkflowTypes($declaration->getName());
Expand Down
57 changes: 57 additions & 0 deletions src/WorkersRegistry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Spiral\Attributes\ReaderInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\TemporalBridge\Attribute\AssignWorker;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;

final class WorkersRegistry implements WorkersRegistryInterface
{
/** @psalm-var array<non-empty-string, WorkerInterface> */
private array $workers = [];

/** @psalm-param array<non-empty-string, WorkerOptions> $options */
public function __construct(
private WorkerFactoryInterface $workerFactory,
private ReaderInterface $reader,
private FinalizerInterface $finalizer,
private TemporalConfig $config
) {
}

public function get(\ReflectionClass $declaration): WorkerInterface
{
$name = $this->resolveName($declaration);
$options = $this->config->getWorkers();

if (!$this->hasWorker($name)) {
$this->workers[$name] = $this->workerFactory->newWorker($name, $options[$name] ?? null);
$this->workers[$name]->registerActivityFinalizer(fn() => $this->finalizer->finalize());
}

return $this->workers[$name];
}

private function hasWorker(string $name): bool
{
return isset($this->workers[$name]);
}

private function resolveName(\ReflectionClass $declaration): string
{
$assignWorker = $this->reader->firstClassMetadata($declaration, AssignWorker::class);

if ($assignWorker === null) {
return $this->config->getDefaultWorker();
}

return $assignWorker->name;
}
}
12 changes: 12 additions & 0 deletions src/WorkersRegistryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Temporal\Worker\WorkerInterface;

interface WorkersRegistryInterface
{
public function get(\ReflectionClass $declaration): WorkerInterface;
}
12 changes: 12 additions & 0 deletions tests/app/src/SomeActivity.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

use Spiral\TemporalBridge\Attribute\AssignWorker;

#[AssignWorker(name: 'worker1')]
class SomeActivity
{
}
12 changes: 12 additions & 0 deletions tests/app/src/SomeWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

use Spiral\TemporalBridge\Attribute\AssignWorker;

#[AssignWorker(name: 'worker2')]
class SomeWorkflow
{
}
9 changes: 9 additions & 0 deletions tests/app/src/WithoutAttribute.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

class WithoutAttribute
{
}
30 changes: 30 additions & 0 deletions tests/src/Attribute/AssignWorkerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\Attribute;

use Spiral\Attributes\Factory;
use Spiral\TemporalBridge\Attribute\AssignWorker;
use Spiral\TemporalBridge\Tests\App\SomeActivity;
use Spiral\TemporalBridge\Tests\App\SomeWorkflow;
use Spiral\TemporalBridge\Tests\App\WithoutAttribute;
use Spiral\TemporalBridge\Tests\TestCase;

final class AssignWorkerTest extends TestCase
{
/** @dataProvider assignWorkerDataProvider */
public function testAssignWorkerAttribute(\ReflectionClass $class, ?AssignWorker $expected = null): void
{
$reader = (new Factory())->create();

$this->assertEquals($expected, $reader->firstClassMetadata($class, AssignWorker::class));
}

public function assignWorkerDataProvider(): \Traversable
{
yield [new \ReflectionClass(SomeActivity::class), new AssignWorker('worker1')];
yield [new \ReflectionClass(SomeWorkflow::class), new AssignWorker('worker2')];
yield [new \ReflectionClass(WithoutAttribute::class), null];
}
}
Loading