-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
EdgeSync.php
85 lines (70 loc) · 3.05 KB
/
EdgeSync.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
<?php
namespace Appwrite\Platform\Tasks;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Query;
use Utopia\Platform\Action;
use Utopia\Queue\Client;
use Utopia\System\System;
class EdgeSync extends Action
{
public static function getName(): string
{
return 'edge-sync';
}
public function __construct()
{
$this
->desc('Schedules edge sync tasks')
->inject('dbForConsole')
->inject('queueForSyncOutDelivery')
->callback(fn (Database $dbForConsole, Client $queueForSyncOutDelivery) => $this->action($dbForConsole, $queueForSyncOutDelivery));
}
public function action(Database $dbForConsole, Client $queueForSyncOutDelivery): void
{
Console::title('Edge-sync V1');
Console::success(APP_NAME . ' Edge-sync v1 has started');
$interval = (int) App::getEnv('_APP_SYNC_EDGE_INTERVAL', '180');
Console::loop(function () use ($interval, $dbForConsole, $queueForSyncOutDelivery) {
$time = DateTime::now();
Console::success("[{$time}] New task every {$interval} seconds");
$chunk = 0;
$limit = 500;
$sum = $limit;
while ($sum === $limit) {
$chunk++;
$count = 0;
$results = $dbForConsole->find('syncs', [
Query::equal('sourceRegion', [System::getEnv('_APP_REGION', 'fra')]),
Query::limit($limit)
]);
$sum = count($results);
if ($sum > 0) {
foreach ($results as $sync) {
try {
if ($sync->getAttribute('status') === 200) {
$dbForConsole->deleteDocument('syncs', $sync->getId());
unlink(APP_STORAGE_SYNCS . '/' . $sync->getAttribute('filename') . '.log');
Console::log("[{$time}] Deleting {$sync->getId()}");
} else {
Console::log("[{$time}] Enqueueing {$sync->getId()} to {$sync->getAttribute('destRegion')}");
$sync->setAttribute('attempts', $sync->getAttribute('attempts')+1);
$dbForConsole->updateDocument('syncs', $sync->getId(), $sync);
$queueForSyncOutDelivery
->enqueue([
'syncId' => $sync->getId(),
]);
}
} catch(\Throwable $th) {
Console::log("[{$time}] Error: {$th->getMessage()}");
}
$count++;
}
}
}
}, $interval);
}
}