-
Notifications
You must be signed in to change notification settings - Fork 0
/
db_util.py
119 lines (99 loc) · 3.27 KB
/
db_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utility functions related to PostgreSQL database
@author: kyleguan
"""
import psycopg2
import sys
from psycopg2 import sql
def create_database(db_name, tb_name):
"""
Create a PostgeSQL table
param: db_name: database name
tb_name: database table name
return: none
"""
con = None
connect_str= "host='localhost'"+" dbname='"+db_name+"' user='data_engineer' password='kcguan'"
try:
con = psycopg2.connect(connect_str)
cur = con.cursor()
cur.execute(sql.SQL("CREATE TABLE IF NOT EXISTS {}(\
id INTEGER PRIMARY KEY, \
image_id VARCHAR(20), \
img_w INTEGER,\
img_h INTEGER,\
model_name VARCHAR(20), \
model_ver VARCHAR(20),\
ts VARCHAR(30),\
det INT[],\
label INT[],\
iou REAL,\
conf REAL,\
color VARCHAR(20), \
succ BOOLEAN)").format(sql.Identifier(tb_name)))
con.commit()
except psycopg2.DatabaseError as e:
if con:
con.rollback()
print('Error %s' % e)
sys.exit(1)
finally:
if con:
con.close()
def insert_database(db_name, tb_name, row):
"""
Insert a row into a PostgeSQL table
param: db_name: database name
tb_name: database table name
row: a row of a table
return: none
"""
con = None
connect_str= "host='localhost'"+" dbname='"+db_name+"' user='data_engineer' password='kcguan'"
try:
con = psycopg2.connect(connect_str)
cur = con.cursor()
cur.execute(sql.SQL("insert into {} \
values (%s, %s, %s, \
%s, %s, %s,\
%s, %s, %s,\
%s, %s, %s, %s) on conflict(id) do nothing;")
.format(sql.Identifier(tb_name)), row)
con.commit()
except psycopg2.DatabaseError as e:
if con:
con.rollback()
print('Error %s' % e)
sys.exit(1)
finally:
if con:
con.close()
def print_database(db_name, tb_name):
"""
Print all the rows of a table
param: db_name: database name
tb_name: database table name
return: none
"""
con = None
connect_str= "host='localhost'"+" dbname='"+db_name+"' user='data_engineer' password='kcguan'"
try:
con = psycopg2.connect(connect_str)
cur = con.cursor()
cur.execute(sql.SQL("select * from {}").format(sql.Identifier(tb_name)))
while True:
row = cur.fetchone()
if row == None:
break
print(row)
con.commit()
except psycopg2.DatabaseError as e:
if con:
con.rollback()
print('Error %s' % e)
sys.exit(1)
finally:
if con:
con.close()