-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
sync-edge.php
87 lines (72 loc) · 2.87 KB
/
sync-edge.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
<?php
global $cli;
global $register;
use Utopia\App;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Database\Adapter\MariaDB;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Cache\Adapter\Redis as RedisCache;
use Utopia\Database\Query;
function getConsoleDatabase(): Database
{
global $register;
$attempts = 0;
do {
try {
$attempts++;
$cache = new Cache(new RedisCache($register->get('cache')));
$database = new Database(new MariaDB($register->get('db')), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace('_console'); // Main DB
if (!$database->exists($database->getDefaultDatabase(), 'certificates')) {
throw new \Exception('Console project not ready');
}
break; // leave loop if successful
} catch (\Exception $e) {
Console::warning("Database not ready. Retrying connection ({$attempts})...");
if ($attempts >= DATABASE_RECONNECT_MAX_ATTEMPTS) {
throw new \Exception('Failed to connect to database: ' . $e->getMessage());
}
sleep(DATABASE_RECONNECT_SLEEP);
}
} while ($attempts < DATABASE_RECONNECT_MAX_ATTEMPTS);
return $database;
}
$cli
->task('sync-edge')
->desc('Schedules edge sync tasks')
->action(function () {
Console::title('Syncs edges V1');
Console::success(APP_NAME . ' Syncs cloud process v1 has started');
$interval = (int) App::getEnv('_APP_SYNC_EDGE_INTERVAL', '180');
Console::loop(function () use ($interval) {
$database = getConsoleDatabase();
$time = DateTime::now();
$region = App::getEnv('_APP_REGION', 'nyc1');
Console::info("[{$time}] Notifying workers with cloud tasks every {$interval} seconds");
global $register;
$time = DateTime::now();
$chunks = $database->find('syncs', [
Query::equal('region', [$region]),
Query::limit(500)
]);
if (count($chunks) > 0) {
Console::info("[{$time}] Found " . \count($chunks) . " cache key chunks to purge.");
foreach ($chunks as $chunk) {
$register
->get('syncOut')
->enqueue([
'value' => [
'region' => $chunk->getAttribute('regionDest'),
'chunk' => $chunk->getAttribute('keys')
]
]);
$database->deleteDocument('syncs', $chunk->getId());
}
} else {
Console::info("[{$time}] No cache key chunks where found.");
}
}, $interval);
});