Skip to content

Latest commit

 

History

History
243 lines (206 loc) · 7.99 KB

basic-example.md

File metadata and controls

243 lines (206 loc) · 7.99 KB

Example

This comprehensive example documents many of the features of phasync. The script is available in the examples/ folder of this project.

<?php
require '../vendor/autoload.php';

/**
 * Channel is an efficient method for coordinating coroutines.
 * The writer will pause after writing, allowing the reader to
 * read the message. When the reader becomes blocked again (for
 * example waiting for the next message, or because it tries to
 * read a file, the write resumes and can add a new message).
 * 
 * The Channel class supports multiple readers and multiple writers,
 * but messages will only be read once by the first available reader.
 * 
 * A channel can be buffered (via the buffer argument in the constructor),
 * which allows messages to be temporarily held allowing the writer to
 * resume working. This can be leveraged to design a queue system. 
 */
use phasync\Channel;

/**
 * Publisher is similar to Channel, but it is always buffered, and 
 * any message will be delivered in order to all of the readers.
 * 
 * This can be used to "multicast" the same data to many clients,
 * or as an event dispatcher. The readers will block whenever there
 * are no events pending. The read operation will return a null value
 * if the publisher goes away.
 */
use phasync\Publisher;

/**
 * WaitGroup is a mechanism to simplify waiting for many simultaneous
 * processes to complete. It is analogous to Promise.all([]) known from
 * promise based designs. Each process will invoke the $waitGroup->add()
 * method, and finally they must invoke $waitGroup->done() when they are
 * finished.
 * 
 * While all the simultaneous processes perform their task, you can call
 * $waitGroup->wait() to pause until the all coroutines have invoked
 * $waitGroup->done().
 * 
 * WARNING! You must ensure that the $waitGroup->done() method is invoked,
 * or the $waitGroup->wait() method will block forever.
 */
use phasync\WaitGroup;

/**
 * The library is primarily used via functions defined in the `phasync\`
 * namespace:
 */

use function phasync\run;
/** 
 * `run(Closure $coroutine, mixed ...$args): mixed`
 * 
 * This function will launch the coroutine and wait for it
 * to either throw an exception or return with a value.
 * When this function is used from inside another run() 
 * coroutine, it will block until all the coroutines that
 * were launched inside it are done.
 */

use function phasync\go;
/**
 * `go(Closure $coroutine, mixed ...$args): Fiber`
 * 
 * This function will launch a coroutine and return a value
 * that may be resolved in the future. You can wait for a
 * fiber to finish by using {@see phasync\await()}, which
 * effectively is identical to using `run()`.
 */

use function phasync\await;
/**
 * `await(Fiber $coroutine): mixed`
 * 
 * This function will block the calling fiber until the
 * provided $coroutine either fails and throws an exception,
 * or returns a value.
 */

use function phasync\defer;
/**
 * `defer(Closure $cleanupFunction): void`
 * 
 * This is a method for scheduling cleanup or other tasks to
 * run after the coroutine completes. The deferred functions
 * will run in reverse order of when they were scheduled. The
 * functions will run immediately after the coroutine finishes,
 * unless an exception occurs and then they will be run when
 * the coroutine is garbage collected.
 */

use function phasync\sleep;
/**
 * `sleep(float $seconds=0): void`
 * 
 * This method will pause the coroutine for a number of seconds.
 * By invoking `sleep()` without arguments, your coroutine will
 * yield to allow other coroutines to work, but resume immediately.
 */

use function phasync\wait_idle;
/**
 * `wait_idle(): void`
 * 
 * This function will pause the coroutine and allow it to resume only
 * when there is nothing else to do immediately.
 */

use function phasync\file_get_contents;
/**
 * `file_get_contents(string $filename): string|false`
 * 
 * This function will use non-blocking file operations to read the entire
 * file from disk. While the application is waiting for the disk to provide
 * data, other coroutines are allowed to continue working.
 */

 use function phasync\file_put_contents;
 /**
  * `file_put_contents(string $filename, mixed $data, int $flags = 0): void`
  * 
  * This function is also non-blocking but has an API identical to the native
  * `file_put_contents()` function in PHP.
  */

/**
 * Other functions not documented here, but which are designed after the native
 * PHP standard library while being non-blocking. The functions *behave* as if
 * they are blocking, but will allow other coroutines to work in the time they
 * block.
 * 
 * `stream_get_contents($stream, ?int $maxLength = null, int $offset = 0): string|false`
 * `fread($stream, int $length): string|false`
 * `fgets($stream, ?int $length = null): string|false`
 * `fgetc($stream): string|false`
 * `fgetcsv($stream, ?int $length = null, string $separator = ",", string $enclosure = "\"", string $escape = "\\"): array|false`
 * `fwrite($stream, string $data): int|false`
 * `ftruncate($stream, int $size): int|false`
 * `flock($stream, int $operation, int &$would_block = null): bool`
 */


// Launch your asynchronous application:
try {
    run(function() {

        $keep_running = true;
        $maintenance_events = new Publisher();
    
        // launch a background task
        $count = go(function() use (&$keep_running, $maintenance_events) {
            $count = 0;
    
            while ($keep_running) {
                // do some maintenance work
                $data = file_get_contents(__FILE__); // this is asynchronous
                $maintenance_events->write(md5($data) . " step $count");
                $count++;
                // wait a while before repeating
                sleep(0.7); // allows other tasks to do some work
            }
    
            return $count;
        });
    
        $wait_group = new WaitGroup();
        [$reader, $writer] = Channel::create(0);
    
        go(function() use ($reader) {
            echo "Waiting for completion messages\n";
            while ($message = $reader->read()) {
                echo "Completed: " . $message . "\n";
            }
            echo "No more completion messages\n";
        });
    
        $futureWithException = go(function() {
            throw new Exception("Just an exception");
        });
    
        // launch various workers
        for ($i = 0; $i < 3; $i++) {
            // Create a subscription for the events
            $subscription = $maintenance_events->subscribe();
            go(function() use ($i, $subscription, $wait_group, $writer) {
                // Register with the $waitGroup
                $wait_group->add();
                defer(function() use ($wait_group) {
                    $wait_group->done();
                });
    
                echo "Worker $i waiting for events...\n";
    
                // This worker will handle at most 10 events
                for ($count = 0; $count < 4; $count++) {
                    sleep(1 * $i);
                    $writer->write("Worker $i received: {$subscription->read()}");
                }

                /**
                 * If an exception is thrown here, it will appear to have been
                 * thrown from the outer coroutine while the $waitGroup->wait()
                 * function is blocking.
                 */
                
                echo "Worker $i done\n";
    
            });    
        }
    
        echo "Waitgroup waiting\n";

        // wait for all workers to complete
        $wait_group->wait();
        echo "Waitgroup done\n";
    
    
        // stop the background maintenance
        $keep_running = false;
    
        echo "A total of " . await($count) . " maintenance steps were completed\n";
    
        echo "Trying to resolve the error value:\n";
        try {
            await($futureWithException);
        } catch (Throwable $e) {
            echo "Could not resolve the value: \n$e\n";
        }
    
    });
} catch (Throwable $e) {
    echo "I successfully caught the missed exception in Worker 1:\n";
    echo " " . $e->getMessage() . "\n";
}