cx_oracle的简单使用

最近接触使用了oracle数据库,也是首次使用oracle。对于oracle的自动化接口,需要使用到cx_oracle。简单写了一下读取操作的相关代码。

一、安装cx_oracle

  • 1、使用pip安装
    1
    pip install cx_oracle
  • 2、使用编辑器(例如Pycharm-File-Settings-Project- Python Interpreter)安装

二、开始使用

  • 1、导入cx_oracle

    1
    import cx_Oracle
  • 2、初始化链接。这里当时被sid卡住了,后面发现使用connect_data可以解决这个问题

    1
    2
    3
    4
    5
    6
    7
    class Oracle:
    def __init__(self, user, password, host, port, sid):
    self.connect_data = cx_Oracle.makedsn(host, port, sid=sid)
    # 链接数据库
    self.connection = cx_Oracle.connect(user, password, self.connect_data, encoding='UTF-8')
    # 创建游标对象
    self.cur = self.connection.cursor()
  • 3、主要方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    def curs(self, sql):
    # 执行sql语句
    self.cur.execute(sql)
    # fetchone()每次只取出一条记录,fetchall()一次性取出所有结果
    res = self.cur.fetchall()
    # 遍历所有返回结果,返回的结果每一行都为set类型
    for row in res:
    # 把每一行set转为list
    list_row = list(row)
    # 拿到表描述
    table_description = self.cur.description
    # 将表描述遍历,取出遍历后的第一个描述,即为列名,加入到表头list中
    table_head = [item[0] for item in table_description]
    # 将表头和数据用zip函数打包成字典
    des = dict(zip(table_head, list_row))
    print(des)
    # 使用完毕,关闭游标
    self.cur.close()

这里存在一个问题,时间类型的字段,返回值为datetime.datetime(2022, 8, 3, 15, 15, 2, 740000),如果用来做assert断言非常的不友好。这里需要再单独写一个方法用来处理datetime类型数据,也就是格式化时间。

  • 4、对返回值datetime类型的格式化处理

    大体思路是这样的:拿到表描述,将表头遍历,取出datetime类型的列索引,对该索引下的数据进行格式化处理。将最终处理完成的数据替换并返回

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    def timestamp2char(self, description, row):
    # 获取DB_TYPE_NUMBER索引值
    timestamp_index_list = []
    index = 0
    cx_Oracle_table_type = [item[1] for item in description]
    for value in cx_Oracle_table_type:
    # 拿到TIMESTAMP字段的索引,存到列表中
    if value.name == 'DB_TYPE_TIMESTAMP':
    timestamp_index_list.append(index)
    index += 1
    # 将datetime.datetime更新为str
    for i in timestamp_index_list:
    # 跳过GMT时间为空的数据
    if row[i] is not None:
    # 重新格式化GMT时间并赋值
    row[i] = row[i].strftime("%Y-%m-%d %H:%M:%S.%f")
    return row

    这样处理后,返回的数据就变为2022-08-03 15:15:02.740000

  • 5、完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import cx_Oracle


class Oracle:
def __init__(self, user, password, host, port, sid):
self.connect_data = cx_Oracle.makedsn(host, port, sid=sid)
# 链接数据库
self.connection = cx_Oracle.connect(user, password, self.connect_data, encoding='UTF-8')
# 创建游标对象
self.cur = self.connection.cursor()

def timestamp2char(self, description, row):
# 获取DB_TYPE_NUMBER索引值
timestamp_index_list = []
index = 0
cx_Oracle_table_type = [item[1] for item in description]
for value in cx_Oracle_table_type:
# 拿到TIMESTAMP字段的索引,存到列表中
if value.name == 'DB_TYPE_TIMESTAMP':
timestamp_index_list.append(index)
index += 1
# 将datetime.datetime更新为str
for i in timestamp_index_list:
# 跳过GMT时间为空的数据
if row[i] is not None:
# 重新格式化GMT时间并赋值
row[i] = row[i].strftime("%Y-%m-%d %H:%M:%S.%f")
return row

def curs(self, sql):
# 执行sql语句
self.cur.execute(sql)
# fetchone()每次只取出一条记录,fetchall()一次性取出所有结果
res = self.cur.fetchall()
# self.connection.commit()
for row in res:
# 把每一行set转为list
list_row = list(row)
# 拿到表描述
table_description = self.cur.description
new_row = self.timestamp2char(table_description, list_row)
# 将表描述遍历,取出遍历后的第一个描述,即为列名,加入到表头list
table_head = [item[0] for item in table_description]
# 将表头和数据用zip函数打包成字典
des = dict(zip(table_head, new_row))
print(des)
self.cur.close()

def close(self):
self.connection.close()


orcale = Oracle('uesr', 'password', 'address', '1521', 'sid')
orcale.curs("SELECT * FROM Table ORDER BY GMT_MODIFIED DESC")
orcale.close()