-
Notifications
You must be signed in to change notification settings - Fork 239
/
base.py
368 lines (277 loc) · 15.5 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# -*- coding: utf-8 -*-
"""
Copyright (C) 2023, Zato Source s.r.o. https://zato.io
Licensed under AGPLv3, see LICENSE.txt for terms and conditions.
"""
# stdlib
import os
from datetime import datetime
from logging import getLogger
from traceback import format_exc
# gevent
from gevent import sleep
# Watchdog
from watchdog.events import DirCreatedEvent, DirModifiedEvent, FileCreatedEvent, FileModifiedEvent
# Zato
from zato.common.api import FILE_TRANSFER
from zato.common.util.api import spawn_greenlet
from zato.common.util.file_transfer import path_string_list_to_list
from zato.server.file_transfer.snapshot import default_interval, DirSnapshotDiff
# ################################################################################################################################
if 0:
from bunch import Bunch
from zato.common.typing_ import any_, anylist, anytuple, callable_
from zato.server.file_transfer.api import FileTransferAPI
from zato.server.file_transfer.snapshot import BaseRemoteSnapshotMaker
Bunch = Bunch
BaseRemoteSnapshotMaker
FileTransferAPI = FileTransferAPI
# ################################################################################################################################
# ################################################################################################################################
logger = getLogger(__name__)
# ################################################################################################################################
# ################################################################################################################################
class PathCreatedEvent:
def __init__(self, src_path:'str', is_dir:'bool') -> 'None':
self.src_path = src_path
self.is_dir = is_dir
# ################################################################################################################################
# ################################################################################################################################
class BaseObserver:
# Type hints
_observe_func: 'callable_'
event_handler: 'any_'
observer_type_impl = '<observer-type-impl-not-set>'
observer_type_name = '<observer-type-name-not-set>'
observer_type_name_title = observer_type_name.upper()
should_wait_for_deleted_paths = False
def __init__(self, manager:'FileTransferAPI', channel_config:'Bunch') -> 'None':
self.manager = manager
self.channel_config = channel_config
self.channel_id = channel_config.id
self.source_type = channel_config.source_type
self.is_local = self.source_type == FILE_TRANSFER.SOURCE_TYPE.LOCAL.id
self.is_notify = self.observer_type_impl == FILE_TRANSFER.SOURCE_TYPE_IMPL.LOCAL_INOTIFY
self.name = channel_config.name
self.is_active = channel_config.is_active
self.path_list = ['<initial-observer>']
self.is_recursive = False
self.keep_running = True
if pickup_interval := (os.environ.get('Zato_Hot_Deploy_Interval') or os.environ.get('Zato_Hot_Deployment_Interval')):
pickup_interval = int(pickup_interval)
else:
pickup_interval = default_interval
self.sleep_time = pickup_interval
# ################################################################################################################################
def set_up(self, event_handler:'any_', path_list:'anylist', recursive:'bool') -> 'None':
self.event_handler = event_handler
self.path_list = path_string_list_to_list('.', path_list)
self.is_recursive = recursive
# ################################################################################################################################
def start(self, observer_start_args:'anytuple') -> 'None':
if self.is_active:
_ = spawn_greenlet(self._start, observer_start_args)
else:
logger.info('Skipping an inactive file transfer channel `%s` (%s)', self.name, self.path_list)
# ################################################################################################################################
def stop(self, needs_log:'bool'=True) -> 'None':
if needs_log:
logger.info('Stopping %s file transfer observer `%s`', self.observer_type_name, self.name)
self.keep_running = False
# ################################################################################################################################
def _start(self, observer_start_args:'any_') -> 'None':
for path in self.path_list:
# Start only for paths that are valid - all invalid ones
# are handled by a background path inspector.
if self.is_path_valid(path):
logger.info('Starting %s file observer `%s` for `%s` (%s)',
self.observer_type_name, path, self.name, self.observer_type_impl)
_ = spawn_greenlet(self._observe_func, path, observer_start_args)
else:
logger.info('Skipping invalid path `%s` for `%s` (%s)', path, self.name, self.observer_type_impl)
# ################################################################################################################################
def is_path_valid(self, *args:'any_', **kwargs:'any_') -> 'bool':
""" Returns True if path can be used as a source for file transfer (e.g. it exists and it is a directory).
"""
raise NotImplementedError('Must be implemented by subclasses')
# ################################################################################################################################
def path_exists(self, path:'str', snapshot_maker:'BaseRemoteSnapshotMaker | None'=None) -> 'bool':
""" Returns True if path exists, False otherwise.
"""
raise NotImplementedError('Must be implemented by subclasses')
# ################################################################################################################################
def path_is_directory(self, path:'str', snapshot_maker:'BaseRemoteSnapshotMaker | None'=None) -> 'bool':
""" Returns True if path is a directory, False otherwise.
"""
raise NotImplementedError('Must be implemented by subclasses')
# ################################################################################################################################
def get_dir_snapshot(path, is_recursive:'bool') -> 'str':
""" Returns an implementation-specific snapshot of a directory.
"""
raise NotImplementedError()
# ################################################################################################################################
def move_file(self, path_from:'str', path_to:'str', event:'any_', snapshot_maker:'BaseRemoteSnapshotMaker') -> 'None':
""" Moves a file to a selected directory.
"""
raise NotImplementedError()
# ################################################################################################################################
def delete_file(self, path:'str', snapshot_maker:'BaseRemoteSnapshotMaker') -> 'None':
""" Deletes a file pointed to by path.
"""
raise NotImplementedError()
# ################################################################################################################################
def wait_for_path(self, path:'str', observer_start_args:'anytuple') -> 'None':
# Local aliases
utcnow = datetime.utcnow
# How many times we have tried to find the correct path and since when
idx = 0
start = utcnow()
log_every = 2
# A flag indicating if path currently exists
is_ok = False
# This becomes True only if we learn that there is something wrong with path
error_found = False
# Wait until the directory exists (possibly it does already but we do not know it yet)
while not is_ok:
idx += 1
# Honour the main loop's status
if not self.keep_running:
logger.info('Stopped `%s` path lookup function for %s file transfer observer `%s` (not found) (%s)',
path, self.observer_type_name, self.name, self.observer_type_impl)
return
if self.path_exists(path):
if self.path_is_directory(path):
is_ok = True
else:
# Indicate that there was an erorr with path
error_found = True
if idx == 1 or (idx % log_every == 0):
logger.info('%s transfer path `%s` is not a directory (%s) (c:%s d:%s t:%s)',
self.observer_type_name_title,
path,
self.name,
idx,
utcnow() - start,
self.observer_type_impl
)
else:
# Indicate that there was an erorr with path
error_found = True
if idx == 1 or (idx % log_every == 0):
logger.info('%s transfer path `%r` does not exist (%s) (c:%s d:%s t:%s)',
self.observer_type_name_title,
path,
self.name,
idx,
utcnow() - start,
self.observer_type_impl
)
if is_ok:
# Log only if had an error previously, otherwise it would emit too much to logs ..
if error_found:
logger.info('%s file transfer path `%s` found successfully (%s) (c:%s d:%s t:%s)',
self.observer_type_name_title,
path,
self.name,
idx,
utcnow() - start,
self.observer_type_impl
)
# .. and start the observer now.
self.start(observer_start_args)
else:
sleep(5)
# ################################################################################################################################
def observe_with_snapshots(
self,
snapshot_maker, # type: BaseRemoteSnapshotMaker
path, # type: str
max_iters, # type: int
log_stop_event=True, # type: bool
*args, # type: any_
**kwargs # type: any_
) -> 'None':
""" An observer's main loop that uses snapshots.
"""
try:
# How many times to run the loop - either given on input or, essentially, infinitely.
current_iter = 0
# Local aliases to avoid namespace lookups in self
timeout = self.sleep_time
handler_func = self.event_handler.on_created
is_recursive = self.is_recursive
# Take an initial snapshot
snapshot = snapshot_maker.get_snapshot(path, is_recursive, True, True)
while self.keep_running:
if current_iter == max_iters:
break
try:
# The latest snapshot ..
new_snapshot = snapshot_maker.get_snapshot(path, is_recursive, False, False)
# .. difference between the old and new will return, in particular, new or modified files ..
diff = DirSnapshotDiff(snapshot, new_snapshot) # type: ignore
for path_created in diff.files_created:
# .. ignore Python's own directorries ..
if '__pycache__' in path_created:
continue
if os.path.isdir(path_created):
class_ = DirCreatedEvent
else:
class_ = FileCreatedEvent
event = class_(path_created)
handler_func(event, self, snapshot_maker)
for path_modified in diff.files_modified:
# .. ignore Python's own directorries ..
if '__pycache__' in path_modified:
continue
if os.path.isdir(path_modified):
class_ = DirModifiedEvent
else:
class_ = FileModifiedEvent
event = class_(path_modified)
handler_func(event, self, snapshot_maker)
# .. a new snapshot which will be treated as the old one in the next iteration
snapshot = snapshot_maker.get_snapshot(path, is_recursive, False, True)
# Note that this will be caught only with local files not with FTP, SFTP etc.
except FileNotFoundError:
# Log the error ..
logger.warning('Path not found caught in %s file observer main loop (%s) `%s` (%s t:%s)',
self.observer_type_name, path, format_exc(), self.name, self.observer_type_impl)
# .. start a background inspector which will wait for the path to become available ..
self.manager.wait_for_deleted_path(path)
# .. and end the main loop.
return
except Exception as e:
logger.warning('Exception %s in %s file observer main loop `%s` e:`%s (%s t:%s)',
type(e), self.observer_type_name, path, format_exc(), self.name, self.observer_type_impl)
finally:
# Update loop counter after we completed current iteration
current_iter += 1
# Sleep for a while but only if we are a local observer because any other
# will be triggered from the scheduler and we treat the scheduler job's interval
# as the sleep time.
if self.is_local:
sleep(timeout)
except Exception:
logger.warning('Exception in %s file observer `%s` e:`%s (%s t:%s)',
self.observer_type_name, path, format_exc(), self.name, self.observer_type_impl)
if log_stop_event:
logger.warning('Stopped %s file transfer observer `%s` for `%s` (snapshot:%s/%s)',
self.observer_type_name, self.name, path, current_iter, max_iters) # type: ignore
# ################################################################################################################################
# ################################################################################################################################
class BackgroundPathInspector:
def __init__(
self,
path, # type: str
observer, # type: BaseObserver
observer_start_args=None # type: anytuple | None
) -> 'None':
self.path = path
self.observer = observer
self.observer_start_args = observer_start_args
def start(self):
if self.observer.is_active:
_ = spawn_greenlet(self.observer.wait_for_path, self.path, self.observer_start_args)
# ################################################################################################################################
# ################################################################################################################################