-
Notifications
You must be signed in to change notification settings - Fork 2
/
peek.py
105 lines (85 loc) · 3.04 KB
/
peek.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import argparse
import json
import sys
import threading
import time
import requests
import websocket
from rich.live import Live
from rich.table import Table
parser = argparse.ArgumentParser(description="Peek into real-time unconfirmed Bitcoin transactions.")
parser.add_argument(
"-m", "--min-val", dest="min_val",
nargs=1, metavar="dollars", type=float,
required=False, default=[0],
help="only peek at transactions above a minimum total output (default to $0)"
)
parser.add_argument(
"-t", "--time", dest="ws_time",
nargs=1, metavar="seconds", type=float,
required=False, default=[10],
help="keep websocket open for specified time (default to 10 seconds)"
)
parser.add_argument(
"-o", "--overflow", dest="overflow",
action="store_true", required=False,
help="let table print past terminal height (not recommended)")
opts = {}
one_btc = 0
table = None
def on_message(ws, message):
message = json.loads(message)
to_amt = [str(int(output["value"]) / 100000000) for output in message["x"]["out"]]
est_aud = list(map(lambda x: float(x) * one_btc, to_amt))
if sum(est_aud) < opts["min_val"][0]:
return
timestamp = time.strftime("%Y-%m-%d\n%H:%M:%S", time.localtime(message["x"]["time"]))
hash = message["x"]["hash"]
from_address = [input["prev_out"]["addr"] for input in message["x"]["inputs"]]
from_amt = [str(int(input["prev_out"]["value"]) / 100000000) for input in message["x"]["inputs"]]
to_address = [output["addr"] for output in message["x"]["out"]]
table.add_row(
timestamp,
hash,
"\n".join(from_address),
"\n".join(from_amt),
"\n".join(to_address),
"\n".join(to_amt),
"\n".join(map(lambda x: "${:,.2f}".format(x), est_aud))
)
def on_error(ws, error):
print(error)
def on_open(ws):
ws.send('{"op":"unconfirmed_sub"}')
def close_ws(ws):
time.sleep(opts["ws_time"][0])
ws.close()
sys.exit(0)
def main():
global opts, one_btc, table
opts = vars(parser.parse_args())
# websocket.enableTrace(True)
ws = websocket.WebSocketApp(
"wss:https://ws.blockchain.info/inv",
on_message = on_message,
on_error = on_error,
on_open=on_open
)
one_btc = json.loads(requests.get("https://blockchain.info/ticker").content)["AUD"]["15m"]
# build table
table = Table(show_lines=True)
table.add_column("timestamp", overflow="fold", justify="center")
table.add_column("hash", overflow="fold", min_width=32, max_width=32) # hash length 64
table.add_column("from_addr", overflow="fold")
table.add_column("from_amt", overflow="fold")
table.add_column("to_addr", overflow="fold")
table.add_column("to_amt", overflow="fold")
table.add_column("est_aud", overflow="fold")
live = Live(table, vertical_overflow="visible" if opts["overflow"] else "ellipsis", refresh_per_second=1)
live.start()
thread = threading.Thread(target=close_ws, args=(ws,))
thread.daemon = True
thread.start()
ws.run_forever()
if __name__ == "__main__":
main()