Skip to content

Commit

Permalink
Added usage examples of AMQPExchangeListener and AMQPMessageProducer …
Browse files Browse the repository at this point in the history
…on the README.md
  • Loading branch information
Christian Soronellas committed Nov 23, 2015
1 parent 537fc29 commit c45eaba
Showing 1 changed file with 169 additions and 0 deletions.
169 changes: 169 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,172 @@ For the Doctrine Transactional Session, pass the EntityManager instance.
As you can see, the use case creation and execution is the same as the non transactional, the only difference is the decoration with the Transactional Application Service.

As a collateral benefit, the Doctrine Session manages internally the ```flush``` method, so you don't need to add a ```flush``` in your Domain neither your infrastructure.

## Asynchronous AMQP listeners

This library is capable to support asynchronous messaging in order to make Bounded Context capable to listen to other Bounded Context's events in an efficient way. The base for this is the both the **``[amqp](https://pecl.php.net/package/amqp)```** and the **```[react's event loop](https://github.com/reactphp/event-loop)```**. In addition, to support more efficient event loopings, we recommend the installation of one of this extensions

* **[libevent](http:https://php.net/manual/es/book.libevent.php)**
* **[libev](http:https://php.net/manual/es/intro.ev.php)**
* **[event](http:https://php.net/manual/es/book.event.php)**

The usage of any of this extensions is handled by ReactPHP's *event-loop* in a totally transparent way.

### Example

Supose we need to listen to the ```Acme\Billing\DomainModel\Order\OrderWasCreated``` event triggered via messaging from another bounded context. The following, is an example of a AMQP exchange listener that listents to the ```Acme\Billing\DomainModel\Order\OrderWasCreated``` event.

```php
<?php

namespace Acme\Inventory\Infrastructure\Messaging\Amqp;

use stdClass;
use AMQPQueue;
use JMS\Serializer\Serializer;
use League\Tactician\CommandBus;
use React\EventLoop\LoopInterface;
use Ddd\Infrastructure\Application\Notification\AmqpExchangeListener;

class OrderWasCreatedListener extends AmqpExchangeListener
{
private $commandBus;

public function __construct(AMQPQueue $queue, LoopInterface $loop, Serializer $serializer, CommandBus $commandBus)
{
$this->commandBus = $commandBus;

parent::construct($queue, $loop, $serializer);
}

/**
* This method will be responsible to decide whether this listener listens to an specific
* event type or not, given an event type name
*
* @param string $typeName
*
* @return bool
*/
protected function listensTo($typeName)
{
return 'Acme\Billing\DomainModel\Order\OrderWasCreated' === $typeName;
}

/**
* The action to perform
*
* @param stdClass $event
*
* @return void
*/
protected function handle($event)
{
$this->commandBus->handle(new CreateOrder(
$event->order_id,
// ...
));
}
}
```

And this is a possible command to create AMQP workers

```php
<?php

namespace AppBundle\Command;

use AMQPConnection;
use AMQPChannel;
use React\EventLoop\Factory;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use JMS\Serializer\Serializer;
use League\Tactician\CommandBus;

class OrderWasCreatedWorkerCommand extends Command
{
private $serializer;
private $commandBus;

public function __construct(Serializer $serializer, CommandBus $commandBus)
{
$this->serializer = $serializer;
$this->commandBus = $commandBus;

parent::__construct();
}

public function execute(InputInterface $input, OutputInterface $output)
{
$connection = new AMQPConnection([
'host' => 'example.host',
'vhost' => '/',
'port' => 5763,
'login' => 'user',
'password' => 'password'
]);
$connection->connect();

$queue = new AMQPQueue(new AMQPChannel($connection));
$queue->setName('events');
$queue->setFlags(AMQP_NOPARAM);
$queue->declareQueue();

$loop = Factory::create();

$listener = new OrderWasCreatedListener(
$queue,
$loop,
$serializer,
$this->commandBus
);

$loop->run();
}
}
```

## AMQP Message producer

The intention of the AMQP message producer is to be composed in some other class. The following is an example of the usage of the AMQP message producer.

```php
<?php

use Doctrine\ORM\Tools\Setup;
use Doctrine\ORM\EntityManager;
use Ddd\Infrastructure\Application\Notification\AmqpMessageProducer;
use Ddd\Application\Notification\NotificationService;

$connection = new AMQPConnection([
'host' => 'example.host',
'vhost' => '/',
'port' => 5763,
'login' => 'user',
'password' => 'password'
]);
$connection->connect();

$exchange = new AMQPExchange(new AMQPChannel($connection));
$exchange->setName('events');
$exchange->declare();

$config = Setup::createYAMLMetadataConfiguration([__DIR__."/src/Infrastructure/Application/Persistence/Doctrine/Config"], false);
$entityManager = EntityManager::create(['driver' => 'pdo_sqlite', 'path' => __DIR__ . '/db.sqlite'], $config);

$eventStore = $entityManager->getRepository('Ddd\Domain\Event\StoredEvent');
$publishedMessageTracker = $entityManager->getRepository('Ddd\Domain\Event\PublishedMessage');
$messageProducer = new AmqpMessageProducer($exchange);

$notificationService = new NotificationService(
$eventStore,
$publishedMessageTracker,
$messageProducer
);

$notificationService->publish(/** ... **/);
```

0 comments on commit c45eaba

Please sign in to comment.