-
Notifications
You must be signed in to change notification settings - Fork 5
/
phasync.php
1192 lines (1096 loc) · 41.7 KB
/
phasync.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<?php
use phasync\CancelledException;
use phasync\Context\ContextInterface;
use phasync\Context\DefaultContext;
use phasync\Debug;
use phasync\Drivers\DriverInterface;
use phasync\Drivers\StreamSelectDriver;
use phasync\Internal\AsyncStream;
use phasync\Internal\ChannelBuffered;
use phasync\Internal\ChannelUnbuffered;
use phasync\Internal\ExceptionTool;
use phasync\Internal\ReadChannel;
use phasync\Internal\Selector;
use phasync\Internal\Subscribers;
use phasync\Internal\WriteChannel;
use phasync\IOException;
use phasync\ReadChannelInterface;
use phasync\SelectableInterface;
use phasync\SubscribersInterface;
use phasync\TimeoutException;
use phasync\Util\WaitGroup;
use phasync\WriteChannelInterface;
/**
* This class defines the essential API for all coroutine based applications.
* This basic API enables implementing all forms of asynchronous programming,
* including asynchronous CURL and database connections efficiently via the
* use of flag signals {@see phasync::raiseFlag()} and {@see phasync::awaitFlag()}.
*
* The essential functions are:
*
* - {@see phasync::sleep()} to pause the coroutine and avoid wasting CPU cycles
* if there is nothing to do.
* - {@see phasync::stream()} to pause the coroutine until a stream resource becomes
* readable, writable or both.
* - {@see phasync::raiseFlag()} and {@see phasync::awaitFlag()} to pause the coroutine
* until an trigger occurs.
*
* It is bad practice for any advanced functionality to check for external events
* on every tick, so it should use sleep(), stream() or raiseFlag()/awaitFlag() to
* block between each poll.
*
* For example, to monitor curl handles using multi_curl, a separate coroutine would be
* launched using {@see phasync::go()} which will invoke curl_multi_exec(). It should
* invoke {@see phasync::sleep(0.1)} or so, to avoid busy loops and ideally a single
* such service coroutine manages all the curl handles across the application. Fibers
* that need notification would invoke phasync::awaitFlag($curlHandle) and the manager
* coroutine would invoke phasync::raiseFlag($curlHandle) when the $curlHandle is done.
*/
final class phasync
{
/**
* Block the coroutine until the stream becomes readable.
* {@see phasync::stream()}.
*/
public const READABLE = DriverInterface::STREAM_READ;
/**
* Block the coroutine until the stream becomes writable.
* {@see phasync::stream()}.
*/
public const WRITABLE = DriverInterface::STREAM_WRITE;
/**
* Block the coroutine until the stream has an except state
* (out-of-band date etc) {@see \stream_select()} for more
* details.
*
* {@see phasync::stream()}
*/
public const EXCEPT = DriverInterface::STREAM_EXCEPT;
/**
* The default timeout in seconds used throughout the library,
* unless another timeout is configured via
* {@see phasync::setDefaultTimeout()}.
*/
public const DEFAULT_TIMEOUT = 30.0;
/**
* This is the number of microseconds that a coroutine can run
* before it is *volunteeringly* preempted by invoking the
* {@see phasync::preempt()} function. When the coroutine has
* run for this number of microseconds, the phasync::preempt()
* function will suspend the coroutine and allow other
* coroutines to run.
*
* Number is in nanoseconds, measured using \hrtime(true), the
* default is 50 ms.
*/
public const DEFAULT_PREEMPT_INTERVAL = 50000000;
/**
* The recursion depth of run statements that are active.
*/
private static int $runDepth = 0;
/**
* The currently configured timeout in seconds.
*/
private static float $timeout = 30;
/**
* The currently set driver.
*/
private static ?DriverInterface $driver = null;
/**
* A function that sets an onFulfilled and/or an onRejected callback on
* a promise.
*
* @var null|Closure{object, ?Closure{mixed}, ?Closure{mixed}, false}
*/
private static ?Closure $promiseHandlerFunction = null;
/**
* The configurable preempt interval that can be set using the
* {@see phasync::setPreemptInterval()} function.
*
* @var int number of nanoseconds
*/
private static int $preemptInterval = self::DEFAULT_PREEMPT_INTERVAL;
/**
* The last time that {@see phasync::preempt()) was invoked. This means
* that the first call to phasync::preempt() will always yield.
*
* @var int number in nanoseconds from \hrtime(true)
*/
private static int $lastPreemptTime = 0;
private static array $onEnterCallbacks = [];
private static array $onExitCallbacks = [];
/**
* Register a coroutine/Fiber to run in the event loop and await the result.
* Running a coroutine this way also ensures that the event loop will run
* until all nested coroutines have completed. If you want to create a coroutine
* inside this context, and leave it running after - the coroutine must be
* created from within another coroutine outside of the context, for example by
* using a Channel.
*
* @throws FiberError
* @throws Throwable
*/
public static function run(Closure $fn, ?array $args = [], ?ContextInterface $context = null): mixed
{
$driver = self::getDriver();
try {
$runDepth = self::$runDepth++;
if (0 === $runDepth) {
\gc_disable();
// Run hooks when async context is enabled
foreach (self::$onEnterCallbacks as $exitCallback) {
$exitCallback();
}
}
if (null === $context) {
$context = new DefaultContext();
}
$exception = null;
try {
$fiber = $driver->create($fn, $args, $context);
} catch (Throwable $e) {
unset($fiber);
$exception = $e;
}
if (0 === $runDepth) {
while ($driver->count() > 0) {
$driver->tick();
}
} else {
while ($context->getFibers()->count() > 0) {
self::yield();
}
}
if (null !== $exception) {
throw $exception;
}
$result = self::await($fiber);
if ($exception = $context->getContextException()) {
throw $exception;
}
return $result;
} catch (CancelledException $e) {
if ($fiber->isTerminated()) {
throw $e;
}
phasync::cancel($fiber);
$result = phasync::await($fiber);
if ($exception = $context->getContextException()) {
throw $exception;
}
return $result;
} finally {
if (0 === --self::$runDepth) {
\gc_enable();
// Run hooks when async context is enabled
foreach (self::$onExitCallbacks as $exitCallback) {
$exitCallback();
}
}
}
}
/**
* Creates a normal coroutine and starts running it. The coroutine will be associated
* with the current context, and will block the current coroutine from completing
* until it is done by returning or throwing.
*
* If parameter `$concurrent` is greater than 1, the returned coroutine will resolve
* into an array of return values or exceptions from each instance of the coroutine.
*
* @param Closure $fn The function to run as a coroutine
* @param array $args The arguments to pass to the function
* @param int $concurrent Run the coroutine multiple times
* @param ContextInterface|null $context Set a new context interface
* @param bool $run If true, the coroutine will be run in an event loop context
*
* @throws LogicException
*/
public static function go(Closure $fn, array $args = [], int $concurrent = 1, ?ContextInterface $context = null, bool $run = false): Fiber
{
if ($concurrent > 1) {
if (null !== $context && 0 === self::$runDepth) {
throw new LogicException("Can't create concurrent root coroutines sharing a context");
}
if ($run) {
throw new LogicException("Can't combine `run=true` with multiple concurrency.");
}
return self::go(fn: static function ($fn, $args, $concurrent) {
$coroutines = [];
for ($i = 0; $i < $concurrent; ++$i) {
$coroutines[] = self::go($fn, $args);
}
$results = [];
foreach ($coroutines as $fiber) {
try {
$results[] = self::await($fiber);
} catch (Throwable $e) {
$results[] = $e;
}
}
return $results;
}, args: [$fn, $args, $concurrent]);
}
$driver = self::getDriver();
$fiber = $driver->getCurrentFiber();
if (!$fiber) {
if ($run) {
$result = phasync::run($fn, $args, $context);
return new Fiber(static function () use ($result) {
return $result;
});
}
throw ExceptionTool::popTrace(new LogicException("Can't create a coroutine outside of a context. Use `phasync::run()` to launch a context."));
}
$result = $driver->create($fn, $args, $context);
// Since coroutines start immediately, launching coroutines can effectively
// cause a busy loop. The preempt below enables coroutines to proceed while
// this launching is going on.
self::preempt();
return $result;
}
/**
* Launches a service coroutine independently of the context scope.
* This service will be permitted to continue but MUST stop running
* when it is no longer providing services to other fibers. Failing
* to do so will cause the topmost run() context to keep running.
*/
public static function service(Closure $coroutine): void
{
$driver = self::getDriver();
$fiber = $driver->getCurrentFiber();
if (null === $fiber || null === $driver->getContext($fiber)) {
throw new LogicException('Services must be started on-demand inside a coroutine.');
}
$driver->runService($coroutine);
}
/**
* Wait for a coroutine or promise to complete and return the result.
* If exceptions are thrown in the coroutine, they will be thrown here.
*
* @param float $timeout the number of seconds to wait at most
*
* @throws TimeoutException if the timeout is reached
* @throws Throwable
*/
public static function await(object $fiberOrPromise, ?float $timeout = null): mixed
{
$timeout = $timeout ?? self::getDefaultTimeout();
$startTime = \microtime(true);
$driver = self::getDriver();
$currentFiber = $driver->getCurrentFiber();
if ($fiberOrPromise instanceof Fiber) {
if ($fiberOrPromise->isTerminated()) {
try {
return $fiberOrPromise->getReturn();
} catch (FiberError) {
throw $driver->getException($fiberOrPromise);
}
}
$fiber = $fiberOrPromise;
if (!$driver->getContext($fiber)) {
throw new LogicException("Can't await a coroutine not from phasync");
}
} else {
// Convert this promise into a Fiber
$fiber = self::go(static function () use ($fiberOrPromise) {
// May be a Promise
$status = null;
$result = null;
if (!self::handlePromise($fiberOrPromise, static function (mixed $value) use (&$status, &$result) {
if (null !== $status) {
throw new LogicException('Promise resolved or rejected twice');
}
$status = true;
$result = $value;
}, static function (mixed $error) use (&$status, &$result) {
if (null !== $status) {
throw new LogicException('Promise resolved or rejected twice');
}
$status = false;
$result = $error;
})) {
throw new InvalidArgumentException('The awaited object must be a Fiber or a promise-like object');
}
// Allow the promise-like object to resolve
while (null === $status) {
self::yield();
}
if ($status) {
return $result;
} elseif ($result instanceof Throwable) {
throw $result;
}
throw new Exception((string) $result);
});
}
if ($currentFiber) {
// We are in a Fiber
while (!$fiber->isTerminated()) {
$elapsed = \microtime(true) - $startTime;
$remaining = $timeout - $elapsed;
if ($remaining < 0) {
throw new TimeoutException('The coroutine did not complete in time');
}
$driver->whenFlagged($fiber, $remaining, $currentFiber);
self::suspend();
}
} else {
/*
* @todo Move this to the phasync::run() method.
*/
while (!$fiber->isTerminated()) {
$elapsed = \microtime(true) - $startTime;
$remaining = $timeout - $elapsed;
if ($remaining < 0) {
throw new TimeoutException('The coroutine ('.Debug::getDebugInfo($fiber).') did not complete in time');
}
$driver->tick();
}
}
if (null !== ($exception = $driver->getException($fiber))) {
throw $exception;
}
return $fiber->getReturn();
}
/**
* Block until one of the selectable objects, closures, resources or fibers terminate. Note that
* this statement can be used as part of a {@see match} statement:
*
* ```php
* match(phasync::select([$a, $b, $c])) {
* $a => function() {},
* $b => function() {},
* default => function() {}
* }
* ```
*
* @param (Fiber|Closure|SelectableInterface)[] $selectables
* @param resource[] $read Wait for stream resources to become readable
* @param resource[] $write Wait for stream resources to become writable
*
* @return SelectableInterface|resource|Fiber
*
* @throws LogicException
* @throws FiberError
* @throws Throwable
*/
public static function select(array $selectables, ?float $timeout = null, ?array $read = null, ?array $write = null): mixed
{
if (null === self::getDriver()->getCurrentFiber()) {
throw new LogicException("Can't use phasync::select() outside of phasync. Use `phasync::run()` to launch a context.");
}
/**
* Start coroutines for each selectable.
*/
$flag = new stdClass();
$cos = [];
$selected = null;
try {
foreach ($selectables as $selectable) {
if ($selectable instanceof Fiber) {
$cos[] = self::go(static function () use ($selectable, $flag, &$selected) {
try {
self::await($selectable);
} catch (Throwable) {
} finally {
if ($selected === null) {
$selected = $selectable;
self::raiseFlag($flag);
}
}
});
} elseif ($selectable instanceof SelectableInterface) {
$cos[] = self::go(static function () use ($selectable, $flag, &$selected) {
try {
while (!$selectable->isReady()) {
$selectable->await();
}
} catch (Throwable) {
} finally {
if ($selected === null) {
$selected = $selectable;
self::raiseFlag($flag);
}
}
});
} elseif (null !== ($selector = Selector::create($selectable))) {
$cos[] = self::go(static function () use ($selector, $flag, &$selected) {
try {
while (!$selector->isReady()) {
$selector->await();
}
} catch (Throwable $e) {
} finally {
if ($selected === null) {
$selected = $selector->getSelected();
self::raiseFlag($flag);
$selector->returnToPool();
}
}
});
} else {
throw new InvalidArgumentException('Unsupported selectable '.\get_debug_type($selectable));
}
}
if ($read !== null) {
foreach ($read as $resource) {
$cos[] = self::go(static function () use ($resource, $flag, &$selected) {
self::readable($resource, \PHP_FLOAT_MAX);
if ($selected === null) {
$selected = $resource;
self::raiseFlag($flag);
}
});
}
}
if ($write !== null) {
foreach ($write as $resource) {
$cos[] = self::go(static function () use ($resource, $flag, &$selected) {
self::readable($resource, \PHP_FLOAT_MAX);
if ($selected === null) {
$selected = $resource;
self::raiseFlag($flag);
}
});
}
}
if (empty($cos)) {
return null;
}
if ($selected !== null) {
return $selected;
}
try {
self::awaitFlag($flag, $timeout);
} catch (TimeoutException) {
return null;
}
return $selected;
} finally {
$driver = self::getDriver();
foreach ($cos as $coroutine) {
$driver->discard($coroutine);
}
}
}
/**
* Schedule a closure to run when the current coroutine completes. This function
* is intended to be used when a coroutine uses a resource that must be cleaned
* up when the coroutine finishes. Note that it may be more efficient to use a
* try {} finally {} statement.
*/
public static function finally(Closure $fn): void
{
static $queues = null;
if (null === $queues) {
/**
* WeakMap allows this function to add more callbacks to the
* same coroutine.
*
* @var WeakMap<Fiber, Closure[]>
*/
$queues = new WeakMap();
}
$fiber = self::getFiber();
if (!isset($queues[$fiber])) {
$queues[$fiber] = [];
}
if (empty($queues[$fiber])) {
self::go(static function () use ($fiber, $queues) {
try {
self::await($fiber);
} catch (Throwable) {
}
while (!empty($queues[$fiber])) {
\array_pop($queues[$fiber])();
}
unset($queues[$fiber]);
});
}
$queues[$fiber][] = $fn;
}
/**
* Cancel a suspended coroutine. This will throw an exception inside the
* coroutine. If the coroutine handles the exception, it has the opportunity
* to clean up any resources it is using. The coroutine MUST be suspended
* using either {@see phasync::await()}, {@see phasync::sleep()}, {@see phasync::stream()}
* or {@see phasync::awaitFlag()}.
*
* @throws RuntimeException if the fiber is not currently blocked
*/
public static function cancel(Fiber $fiber, ?Throwable $exception = null): void
{
if ($fiber->isTerminated()) {
throw new InvalidArgumentException('Fiber is already terminated');
}
self::getDriver()->cancel($fiber, $exception);
}
/**
* Suspend the coroutine when it has been running for a configurable number of
* microseconds. This function is designed to be invoked from within busy loops,
* to allow other tasks to be performed. Use it at strategic places in library
* functions that do not naturally suspend - and on strategic places in slow
* calculations (avoiding invoking it on every iteration if possible).
*
* This function is highly optimized, but it benefits a lot from JIT because it
* seems to be inlined.
*/
public static function preempt(): void
{
try {
$elapsed = ($now = \hrtime(true)) - self::$lastPreemptTime;
if ($elapsed > self::$preemptInterval) {
if (null === self::getDriver()->getCurrentFiber()) {
// Minimize cost of calling this outside of phasync
return;
}
if (0 === self::$lastPreemptTime) {
// This check is too costly to perform on every preempt()
// call, so we'll just set it here and wait for the next call.
self::$lastPreemptTime = $now;
} else {
$driver = self::getDriver();
self::$lastPreemptTime = $now;
$driver->enqueue($driver->getCurrentFiber());
self::suspend();
}
}
} catch (Throwable) {
// Ignore; this function must never throw
}
}
/**
* Yield time so that other coroutines can continue processing. Note that
* if you intend to wait for something to happen in other coroutines, you
* should use {@see phasync::yield()}, which will suspend the coroutine until
* after any other fibers have done some work.
*
* @param float $seconds If null, the coroutine won't be resumed until another coroutine resumes
*
* @throws RuntimeException
*/
public static function sleep(float $seconds = 0): void
{
$driver = self::getDriver();
$fiber = $driver->getCurrentFiber();
if ($seconds <= 0) {
if (null === $fiber) {
return;
}
$driver->enqueue($fiber);
self::suspend();
} else {
if (null === $fiber) {
\usleep((int) (1000000 * $seconds));
} else {
$driver->whenTimeElapsed($seconds, $fiber);
self::suspend();
}
}
}
/**
* Suspend the fiber until immediately after some other fibers has performed
* work. Suspending a fiber this way will not cause a busy loop. If you intend
* to perform work actively, you should use {@see phasync::sleep(0)}
* instead.
*/
public static function yield(): void
{
$driver = self::getDriver();
$fiber = $driver->getCurrentFiber();
if (null === $fiber) {
return;
}
$driver->afterNext($fiber);
self::suspend();
}
/**
* Suspend the current fiber until the event loop becomes empty or will sleeps while
* waiting for future events.
*/
public static function idle(?float $timeout = null): void
{
$driver = self::getDriver();
$fiber = $driver->getCurrentFiber();
if (null === $fiber) {
return;
}
$timeout = $timeout ?? self::getDefaultTimeout();
$driver->whenIdle($timeout, $fiber);
self::suspend();
}
/**
* Make any stream resource context switch between coroutines when
* they would block.
*
* @return false|resource
*/
public static function io($resource)
{
if (!\is_resource($resource) || 'stream' !== \get_resource_type($resource)) {
return $resource;
}
return AsyncStream::wrap($resource);
}
/**
* Utility function to suspend the current fiber until a stream resource becomes readable,
* by wrapping `phasync::stream($resource, $timeout, phasync::READABLE)`.
*
* @param resource $resource
*
* @return resource Returns the same resource for convenience
*
* @throws FiberError
* @throws Throwable
*/
public static function readable(mixed $resource, ?float $timeout = null): mixed
{
self::stream($resource, self::READABLE, $timeout);
if (!\is_resource($resource)) {
throw new IOException('Not a valid stream resource');
}
return $resource;
}
/**
* Utility function to suspend the current fiber until a stream resource becomes readable,
* by wrapping `phasync::stream($resource, $timeout, phasync::WRITABLE)`.
*
* @param resource $resource
*
* @return resource Returns the same resource for convenience
*
* @throws FiberError
* @throws Throwable
*/
public static function writable(mixed $resource, ?float $timeout = null): mixed
{
self::stream($resource, self::WRITABLE, $timeout);
if (!\is_resource($resource)) {
throw new IOException('Not a valid stream resource');
}
return $resource;
}
/**
* Block the coroutine until the stream resource becomes readable, writable or raises
* an exception or any combination of these.
*
* The bitmaps use self::READABLE, self::WRITABLE and self::EXCEPT.
*
* @param int $mode a bitmap indicating which events on the resource that should resume the coroutine
*
* @return int A bitmap indicating which events on the resource that was raised
*/
public static function stream(mixed $resource, int $mode = self::READABLE | self::WRITABLE, ?float $timeout = null): int
{
if (!\is_resource($resource) || 'stream' !== \get_resource_type($resource)) {
return 0;
}
if (0 === self::$runDepth) {
$metadata = \stream_get_meta_data($resource);
if ($metadata['blocked'] ?? false) {
// No point in blocking here; instead the fwrite/fread call will block
return $mode & (self::READABLE | self::WRITABLE);
}
} else {
\stream_set_blocking($resource, false);
}
$driver = self::getDriver();
if ($fiber = $driver->getCurrentFiber()) {
// check using the event loop
$timeout = $timeout ?? self::getDefaultTimeout();
$result = null;
try {
$driver->whenResourceActivity($resource, $mode, $timeout, $fiber);
self::suspend();
return $result = $driver->getLastResourceState($fiber);
} finally {
if ($result === null) {
$driver->getLastResourceState($fiber);
}
}
} else {
// Not inside the event loop, so check directly
// The functionality should work on non-blocking resources even outside of phasync
$stopTime = \microtime(true) + ($timeout ?? self::getDefaultTimeout());
do {
$r = $w = $e = [];
if ($mode & self::READABLE) {
$r[] = $resource;
}
if ($mode & self::WRITABLE) {
$w[] = $resource;
}
if ($mode & self::EXCEPT) {
$e[] = $resource;
}
$count = \stream_select($r, $w, $e, 0, 1000000);
if (\is_int($count) && $count > 0) {
$result = 0;
if (!empty($r)) {
$result |= self::READABLE;
}
if (!empty($w)) {
$result |= self::WRITABLE;
}
if (!empty($e)) {
$result |= self::EXCEPT;
}
return $result;
}
} while ($stopTime < \microtime(true));
throw ExceptionTool::popTrace(new TimeoutException('Timeout'));
}
}
/**
* Creates a channel pair which can be used to communicate between multiple
* coroutines. Channels should be used to pass serializable data, to support
* passing channels to worker processes, but it is possible to pass more
* complex data if you are certain the data will not be passed to other
* processes.
*
* If a function is passed in either argument, it will be run a coroutine
* with the ReadChannelInterface or the WriteChannelInterface as the first
* argument.
*/
public static function channel(?ReadChannelInterface &$read, ?WriteChannelInterface &$write, int $bufferSize = 0): void
{
if (0 === $bufferSize) {
$channel = new ChannelUnbuffered();
$read = new ReadChannel($channel);
$write = new WriteChannel($channel);
} else {
$channel = new ChannelBuffered($bufferSize);
$read = new ReadChannel($channel);
$write = new WriteChannel($channel);
}
}
/**
* A publisher works like channels, but supports many subscribing coroutines
* concurrently.
*/
public static function publisher(?SubscribersInterface &$subscribers, ?WriteChannelInterface &$publisher): void
{
self::channel($internalReadChannel, $publisher, 0);
$subscribers = new Subscribers($internalReadChannel);
}
/**
* Wait groups are used to coordinate multiple coroutines. A coroutine can add work
* to the wait group when the coroutine begins processing the task, and then notify
* the wait group that the work is done.
*
* @deprecated it's generally better to just construct `new WaitGroup()`
*/
public static function waitGroup(): WaitGroup
{
return new WaitGroup();
}
/**
* Signal all coroutines that are waiting for an event represented
* by the object $signal to resume.
*
* @return int the number of resumed fibers
*/
public static function raiseFlag(object $signal): int
{
return self::getDriver()->raiseFlag($signal);
}
/**
* Pause execution of the current coroutine until an event is signalled
* represented by the object $signal. If the timeout is reached, this function
* throws TimeoutException.
*
* @throws TimeoutException if the timeout is reached
* @throws Throwable
*/
public static function awaitFlag(object $signal, ?float $timeout = null): void
{
$driver = self::getDriver();
$fiber = $driver->getCurrentFiber();
if (null === $fiber) {
throw ExceptionTool::popTrace(new LogicException('Can only await flags from within a coroutine'));
}
$driver->whenFlagged($signal, $timeout ?? self::getDefaultTimeout(), $fiber);
self::suspend();
}
/**
* Returns true when called from within a coroutine context.
*/
public static function isRunning(): bool
{
return self::$runDepth > 0;
}
/**
* Get the currently running coroutine. If there is no currently
* running coroutine, throws LogicException.
*
* @throws LogicException
*/
public static function getFiber(): Fiber
{
$fiber = self::getDriver()->getCurrentFiber();
if (!$fiber) {
throw new LogicException('This function can not be used outside of a coroutine');
}
return $fiber;
}
/**
* Get the context of the currently running coroutine. The there is no
* currently running coroutine, throws LogicException.
*
* @throws LogicException
*/
public static function getContext(): ContextInterface
{
$context = self::getDriver()->getContext(self::getFiber());
if (!$context) {
throw new LogicException('This function can only be used inside a `phasync` coroutine');
}
return $context;
}
/**
* Register a callback to be invoked whenever an application enters the event
* loop via the top level `phasync::run()` call.
*
* @see phasync::onExit()
*/
public static function onEnter(Closure $enterCallback): void
{
self::$onEnterCallbacks[] = $enterCallback;
}
/**
* Register a callback to be invoked whenever an application exits the event
* loop after a `phasync::run()` call.
*
* @see phasync::onEnter()
*/
public static function onExit(Closure $exitCallback): void
{
self::$onExitCallbacks[] = $exitCallback;
}
/**
* Set the interval between every time the {@see phasync::preempt()}
* function will cause the coroutine to suspend running.
*/
public static function setPreemptInterval(int $microseconds): void
{
self::$preemptInterval = \max(0, $microseconds * 1000);
}
/**
* Configures handling of promises from other frameworks. The
* `$promiseHandlerFunction` returns `false` if the value in
* the first argument is not a promise. If it is a promise,
* it attaches the `onFulfilled` and/or `onRejected` callbacks
* from the second and third argument and returns true.
*
* @param Closure{mixed, Closure?, Closure?, bool} $promiseHandlerFunction
*/
public static function setPromiseHandler(Closure $promiseHandlerFunction): void
{
self::$promiseHandlerFunction = $promiseHandlerFunction;
}
/**
* Returns the current promise handler function. This enables extending
* the functionality of the existing promise handler without losing the
* other integrations. {@see phasync::setPromiseHandler()} for documentation
* on the function signature.
*/
public static function getPromiseHandler(): Closure
{
if (null === self::$promiseHandlerFunction) {
self::$promiseHandlerFunction = static function (mixed $promiseLike, ?Closure $onFulfilled = null, ?Closure $onRejected = null): bool {
if (!\is_object($promiseLike) || !\method_exists($promiseLike, 'then')) {
return false;
}
$rm = new ReflectionMethod($promiseLike, 'then');
if ($rm->isStatic()) {
return false;
}
$onRejectedHandled = false;
foreach ($rm->getParameters() as $index => $rp) {
if ($rp->hasType()) {
$rt = $rp->getType();
if ($rt instanceof ReflectionNamedType) {
if (
'mixed' !== $rt->getName()
&& 'callable' !== $rt->getName()
&& Closure::class !== $rt->getName()
) {
return false;
}
}
// mixed type apparently
}
if ($rp->isVariadic()) {