1. 封装 sqlite3
1.1. 依赖包引入
import sys
import datetime
import logging
import sqlite3
1.2. 封装类
class SqliteTool(object):
def __init__(self, host, database):
self._host = host
self._database = database
self._pool = None
self._maxconns = 6
print("__init__", self._database)
1.3. 连接池操作
def init_pool(self):
'''初始化连接池'''
try:
logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(self._host, datetime.datetime.now()))
pool = []
for _ in range(self._maxconns):
conn = sqlite3.connect(self._database, check_same_thread=False)
pool.append(conn)
self._pool = pool
logging.info('SUCCESS: create {0} postgresql pool success on {1}.\n'.format(self._host, datetime.datetime.now()))
except Exception as e:
logging.error('ERROR: create {0} postgresql pool failed on {1}.\n'.format(self._host, datetime.datetime.now()))
self.close_pool()
sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))
def close_pool(self):
'''关闭 pool'''
if self._pool != None:
for conn in self._pool:
conn.close()
def get_conn(self):
if not self._pool:
self.init_pool()
return self._pool.pop()
def close_conn(self, conn):
if self._pool:
self._pool.append(conn)
1.4. 增删改查
def create_table(self, sql: str):
"""
创建表
:param sql: create sql语句
:return: True表示创建表成功
"""
print("create_table", sql)
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
print("[create table success]")
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in create_table'.format(sql, str(e)))
sys.exit('ERROR: create table from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
self.close_conn(conn)
return result
def drop_table(self, sql: str):
"""
删除表
:param sql: drop sql语句
:return: True表示删除成功
"""
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
print("[drop table success]")
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in drop_table'.format(sql, str(e)))
sys.exit('ERROR: drop table from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
self.close_conn(conn)
return result
def exec_insert(self, sql):
'''执行插入'''
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql, str(e)))
sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
self.close_conn(conn)
return result
def exec_insert_plus(self, table: str, params: dict):
'''执行插入'''
result = False
try:
key_tup = tuple(params.keys())
key_str = ",".join(key_tup)
val_str = ",".join(("?",)*len(key_tup))
sql_str = "insert into " + table + " (" + key_str + ") values (" + val_str + ")"
val_tup = ()
for item in params.values():
if type(item) == list:
val_tup += (json.dumps(item),)
elif type(item) == str:
val_tup += (item,)
elif type(item) == int:
val_tup += (item,)
else:
val_tup += (item,)
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql_str, val_tup)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_insert_plus'.format(table, str(e)))
sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
self.close_conn(conn)
return result
def exec_delete(self, sql):
'''执行查询'''
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_delete'.format(sql, str(e)))
sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
self.close_conn(conn)
return result
def exec_update(self, sql):
'''执行更新'''
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_update'.format(sql, str(e)))
sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
self.close_conn(conn)
return result
def exec_select(self, sql):
'''执行查询'''
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchall()
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_select'.format(sql, str(e)))
sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
print("init_pool", self._maxconns, len(self._pool), self._pool)
self.close_conn(conn)
return result
def test_select(self, sql):
result = self.exec_select(sql)
print("test_select", result)
return result
2. 操作使用实例
from typing import Optional, List, Dict, Union
from pydantic import BaseModel, Field
class ......
if __name__ == '__main__':
dbhost = ""
dbdatabase = "./test.db"
db = SqliteTool(dbhost, dbdatabase)
class TaskInDB(BaseModel):
task_id: str
disabled: int
def create_tests_table(db):
sql_str = "create table if not exists tests("
sql_str += "task_id char(32) primary key,"
sql_str += "disabled int not null"
sql_str += ");"
return db.create_table(sql_str)
def drop_tests_table(db):
sql_str = "drop table if exists tests;"
return db.drop_table(sql_str)
def get_tests_indb(db):
sql_str = "select * from tests;"
ret_tasks = db.exec_select(sql_str)
return ret_tasks
def create_tests_indb(db, task_indb: TaskInDB):
return db.exec_insert_plus("tests", task_indb.dict())
if not drop_tests_table(db) or not create_tests_table(db):
print("ERROR")
task_indb = TaskInDB(task_id="11111111", disabled=1)
create_tests_indb(db, task_indb)
task_indb = TaskInDB(task_id="22222222", disabled=0)
create_tests_indb(db, task_indb)
key_tup = tuple(TaskInDB.__fields__.keys())
ret_tasks = get_tests_indb(db)
for ret_task in ret_tasks:
print(ret_task)
task_indb = TaskInDB(**{key: ret_task[i] for i,key in enumerate(key_tup)})
print(task_indb)
print("OK")