-
Notifications
You must be signed in to change notification settings - Fork 15
/
oid_resolver.py
136 lines (107 loc) · 3.51 KB
/
oid_resolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Resolve PostgreSQL OIDs to names and cache the result"""
import sys
from urllib.parse import urlparse
import psycopg2
class OIDResolver:
def __init__(self, connection_url):
self.connection_url = connection_url
self.cache = {}
self.connection = None
self.cur = None
self.connect()
def connect(self):
"""
Open the database connection
"""
connection_url_parsed = urlparse(self.connection_url)
username = connection_url_parsed.username
password = connection_url_parsed.password
database = connection_url_parsed.path[1:]
hostname = connection_url_parsed.hostname
port = connection_url_parsed.port
try:
self.connection = psycopg2.connect(
database=database,
user=username,
password=password,
host=hostname,
port=port,
)
self.connection.set_session(autocommit=True)
self.cur = self.connection.cursor()
# Warmup cache
self.fetch_all_oids()
except psycopg2.OperationalError as error:
print(f"Unable to connect to the database {self.connection_url}")
print(f"{error}")
sys.exit(1)
def disconnect(self):
"""
Close the database connection.
"""
if self.cur:
self.cur.close()
self.cur = None
if self.connection:
self.connection.close()
self.connection = None
def fetch_all_oids(self):
"""
Fetch all Oid mappings from the catalog and cache them. This
is done because:
(1) Cache Oid cache lookups have to be fast and we want
a warm cache.
(2) Operations such as DROP delete objects from the database.
Fetching the oid mapping afterwards is not possible.
"""
select_stmt = """
SELECT n.nspname, c.relname, c.oid
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
"""
self.cur.execute(select_stmt)
oids = self.cur.fetchall()
for result_row in oids:
oid = result_row[2]
name = f"{result_row[0]}.{result_row[1]}"
self.cache[oid] = name
def fetch_oid_from_db(self, oid):
"""
Resolve the given OID into a name
"""
select_stmt = """
SELECT n.nspname, c.relname
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
WHERE c.oid = %s;
"""
oid = str(oid)
try:
self.cur.execute(
select_stmt,
[
oid,
],
)
result_row = self.cur.fetchone()
# Unable to get name, return Oid instead
if result_row is None:
return f"Oid {oid}"
name = f"{result_row[0]}.{result_row[1]}"
# Cache result
self.cache[oid] = name
return name
except psycopg2.Error as error:
print(f"Error while executing SQL statement: {error}")
print(f"pgerror: {error.pgerror}")
print(f"pgcode: {error.pgcode}")
return ""
def resolve_oid(self, oid):
"""
Resolve the given OID into a name.
"""
# OID cache hit
if oid in self.cache:
return self.cache[oid]
# OID cache miss
return self.fetch_oid_from_db(oid)