-
Notifications
You must be signed in to change notification settings - Fork 0
/
planet_diffs.py
110 lines (82 loc) · 4.04 KB
/
planet_diffs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import gzip
import re
from collections.abc import Sequence
from datetime import UTC, datetime
from itertools import chain
import xmltodict
from anyio import create_task_group, fail_after
from httpx import AsyncClient
from sentry_sdk import start_span, trace
from config import AED_REBUILD_THRESHOLD, PLANET_DIFF_TIMEOUT, PLANET_REPLICA_URL
from utils import get_http_client, retry_exponential
from xmltodict_postprocessor import xmltodict_postprocessor
@trace
async def get_planet_diffs(last_update: float) -> tuple[Sequence[dict], float]:
with fail_after(PLANET_DIFF_TIMEOUT.total_seconds()):
async with get_http_client(PLANET_REPLICA_URL) as http:
sequence_numbers = []
sequence_timestamps = []
while True:
next_sequence_number = sequence_numbers[-1] - 1 if sequence_numbers else None
sequence_number, sequence_timestamp = await _get_state(http, next_sequence_number)
if sequence_timestamp <= last_update:
break
sequence_numbers.append(sequence_number)
sequence_timestamps.append(sequence_timestamp)
if not sequence_numbers:
return (), last_update
result: list[tuple[int, list[dict]]] = []
with start_span(description=f'Processing {len(sequence_numbers)} planet diffs'):
@retry_exponential(AED_REBUILD_THRESHOLD)
async def _get_planet_diff(sequence_number: int) -> None:
r = await http.get(f'{_format_sequence_number(sequence_number)}.osc.gz')
r.raise_for_status()
xml = gzip.decompress(r.content).decode()
xml = _format_actions(xml)
actions: list[dict] = xmltodict.parse(
xml,
postprocessor=xmltodict_postprocessor,
force_list=('action', 'node', 'way', 'relation', 'member', 'tag', 'nd'),
)['osmChange']['action']
node_actions: list[dict] = []
for action in actions:
# ignore everything that is not a node
if 'node' in action:
action.pop('way', None)
action.pop('relation', None)
node_actions.append(action)
result.append((sequence_number, node_actions))
async with create_task_group() as tg:
for sequence_number in sequence_numbers:
tg.start_soon(_get_planet_diff, sequence_number)
# sort by sequence number in ascending order
result.sort(key=lambda x: x[0])
data = tuple(chain.from_iterable(data for _, data in result))
data_timestamp = sequence_timestamps[0]
return data, data_timestamp
@retry_exponential(AED_REBUILD_THRESHOLD)
@trace
async def _get_state(http: AsyncClient, sequence_number: int | None) -> tuple[int, float]:
if sequence_number is None:
r = await http.get('state.txt')
else:
r = await http.get(f'{_format_sequence_number(sequence_number)}.state.txt')
r.raise_for_status()
text = r.text
text = text.replace('\\:', ':')
sequence_number = int(re.search(r'sequenceNumber=(\d+)', text).group(1))
sequence_date_str = re.search(r'timestamp=(\S+)', text).group(1)
sequence_date = datetime.strptime(sequence_date_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=UTC)
sequence_timestamp = sequence_date.timestamp()
return sequence_number, sequence_timestamp
def _format_sequence_number(sequence_number: int) -> str:
result = f'{sequence_number:09d}'
result = '/'.join(result[i : i + 3] for i in range(0, 9, 3))
return result
def _format_actions(xml: str) -> str:
# <create> -> <action type="create">
# </create> -> </action>
# etc.
xml = re.sub(r'<(create|modify|delete)>', r'<action type="\1">', xml)
xml = re.sub(r'</(create|modify|delete)>', r'</action>', xml)
return xml