-
Notifications
You must be signed in to change notification settings - Fork 23
/
sse.py
40 lines (28 loc) · 1.07 KB
/
sse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from pytonapi import AsyncTonapi
from pytonapi.schema.events import TransactionEventData
# Enter your API key
API_KEY = "" # noqa
# List of TON blockchain accounts to monitor
ACCOUNTS = [
"EQBDanbCeUqI4_v-xrnAN0_I2wRvEIaLg1Qg2ZN5c6Zl1KOh", # noqa
"UQCtiv7PrMJImWiF2L5oJCgPnzp-VML2CAt5cbn1VsKAxOVB", # noqa
]
async def handler(event: TransactionEventData, tonapi: AsyncTonapi) -> None:
"""
Handle SSEvent for transactions.
:param event: The SSEvent object containing transaction details.
:param tonapi: An instance of AsyncTonapi for interacting with TON API.
"""
trace = await tonapi.traces.get_trace(event.tx_hash)
# If the transaction is successful, print the trace
if trace.transaction.success:
print(trace.model_dump())
async def main() -> None:
tonapi = AsyncTonapi(api_key=API_KEY)
# Subscribe to transaction events for the specified accounts
await tonapi.sse.subscribe_to_transactions(
handler, accounts=ACCOUNTS, args=(tonapi,)
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())