之前,以前分享过两篇python操作mysql数据库的文章:
《python操作mysql数据库的精美实用模块》
《python操作mysql数据库的精美实用模块(升级版)》
好了,今天继续升级,二次升级。
为何又升级呢?缘起是,在实际项目开发中,当前的模块都是基于列表来操作,当面对复杂表格时就会很不方便,严重影响开发效率和开发体验。
所以,非常有必要,将当前的模块,从基于列表转为基于字典,来进行操作。
然后,网上资料一顿查,终于成功升级了。来吧,不啰嗦,分享给大家!
import pymysql
from dbutils.pooled_db import PooledDB
#############################################################################################
# 连接mysql数据库-----使用数据库连接池技术----dbutils库
#############################################################################################
db_pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=800, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=8, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root'
password='root',
database='mydbdemo',
charset='utf8'
)
class DBManager():
# 创建数据库连接
def __init__(self):
self.db = db_pool.connection()
# 关闭数据库连接
def closeDB(self):
self.db.close()
#连接数据库,执行插入SQl
def executeQueryID(self,sqlstring):
try:
cursor = self.db.cursor()
cursor.execute(sqlstring)
self.db.commit()
the_id = int(cursor.lastrowid)
#the_id = int(db.insert_id())
return the_id
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 连接数据库,执行SQl,无返回
def executeQueryNO(self, sqlstring):
try:
cursor = self.db.cursor()
cursor.execute(sqlstring)
self.db.commit()
return True
except Exception:
self.db.rollback()
return False
finally:
self.db.close() # 关闭数据库连接
#连接数据库,执行SQL,返回单条数据
def executeQueryone(self, sqlstring):
try:
cursor = self.db.cursor()
cursor.execute(sqlstring)
dataone = cursor.fetchone()
return dataone
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
#连接数据库,执行SQL,返回多条数据
def executeQueryall(self, sqlstring):
try:
cursor = self.db.cursor()
cursor.execute(sqlstring)
dataall = cursor.fetchall()
return dataall
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
#######二次升级版###############################################
# 查询单条数据
def select_dataone_dict(self,sqlstring):
try:
cursor = self.db.cursor(cursor = DictCursor)
cursor.execute(sqlstring)
dataone = cursor.fetchone()
return dataone
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 查询前n行数据
def select_datamany_dict(self,sqlstring,manyncount):
try:
cursor = self.db.cursor(cursor = DictCursor)
cursor.execute(sqlstring)
datamany = cursor.fetchmany(manyncount)
return datamany
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 查询所有数据
def select_dataall_dict(self,sqlstring):
try:
cursor = self.db.cursor(cursor = DictCursor)
cursor.execute(sqlstring)
dataall = cursor.fetchall()
return dataall
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 插入单条数据
def insert_dataone_dict(self,table,dataone):
try:
sqlstring = self.generate_sqlstring_insert(table, dataone)
cursor = self.db.cursor()
cursor.execute(sqlstring,dataone)
self.db.commit()
the_id = int(cursor.lastrowid)
return the_id
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 插入多条数据
def insert_datamany_dict(self,table,datamany):
try:
sqlstring = self.generate_sqlstring_insert(table, datamany[0])
cursor = self.db.cursor()
cursor.executemany(sqlstring,datamany)
self.db.commit()
return True
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 更新数据
def update_data_dict(self,table,data,where):
try:
sqlstring = self.generate_sqlstring_update(table, data,where)
cursor = self.db.cursor()
cursor.execute(sqlstring,data)
self.db.commit()
return True
except Exception:
self.db.rollback()
return False
finally:
self.db.close() # 关闭数据库连接
# 删除数据
def delete_data_dict(self,table,where):
try:
sqlstring = self.generate_sqlstring_delete(table, where)
cursor = self.db.cursor()
cursor.execute(sqlstring)
self.db.commit()
return True
except Exception:
self.db.rollback()
return False
finally:
self.db.close() # 关闭数据库连接
# 生成sql语句-----插入
def generate_sqlstring_insert(self, table, data):
cols_name = ", ".join(f'`{key}`' for key in data.keys())
cols_value = ', '.join(f'%({key})s' for key in data.keys())
sqlstring = f"INSERT INTO {table} ({cols_name}) VALUES ({cols_value})"
return sqlstring
# 生成sql语句----更新
def generate_sqlstring_update(self, table, data,where):
cols = ", ".join(f'`{key}`=%({key})s' for key in data.keys())
sqlstring = f"UPDATE {table} SET {cols} {where}"
return sqlstring
# 生成sql语句----删除
def generate_sqlstring_delete(self, table ,where):
sqlstring = f"DELETE FROM {table} {where}"
return sqlstring
##############################################################################################
# 连接mysql数据库
#############################################################################################
class DBManager_Old():
# 创建数据库连接
def __init__(self):
db_config = ConfigManager.get_db_config()
self.db = pymysql.Connect(
host='127.0.0.1',
port=3306,
user='root',
passwd='root',
db='mydbdemo',
charset='utf8')
# 关闭数据库连接
def closeDB(self):
self.db.close()
#连接数据库,执行插入SQl
def executeQueryID(self,sqlstring):
try:
cursor = self.db.cursor()
cursor.execute(sqlstring)
self.db.commit()
the_id = int(cursor.lastrowid)
#the_id = int(db.insert_id())
return the_id
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 连接数据库,执行SQl,无返回
def executeQueryNO(self, sqlstring):
try:
cursor = self.db.cursor()
cursor.execute(sqlstring)
self.db.commit()
return True
except Exception:
self.db.rollback()
return False
finally:
self.db.close() # 关闭数据库连接
#连接数据库,执行SQL,返回单条数据
def executeQueryone(self, sqlstring):
try:
cursor = self.db.cursor()
cursor.execute(sqlstring)
dataone = cursor.fetchone()
return dataone
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
#连接数据库,执行SQL,返回多条数据
def executeQueryall(self, sqlstring):
try:
cursor = self.db.cursor()
cursor.execute(sqlstring)
dataall = cursor.fetchall()
return dataall
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
#######二次升级版###############################################
# 查询单条数据
def select_dataone_dict(self,sqlstring):
try:
cursor = self.db.cursor(cursor = DictCursor)
cursor.execute(sqlstring)
dataone = cursor.fetchone()
return dataone
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 查询前n行数据
def select_datamany_dict(self,sqlstring,manyncount):
try:
cursor = self.db.cursor(cursor = DictCursor)
cursor.execute(sqlstring)
datamany = cursor.fetchmany(manyncount)
return datamany
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 查询所有数据
def select_dataall_dict(self,sqlstring):
try:
cursor = self.db.cursor(cursor = DictCursor)
cursor.execute(sqlstring)
dataall = cursor.fetchall()
return dataall
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 插入单条数据
def insert_dataone_dict(self,table,dataone):
try:
sqlstring = self.generate_sqlstring_insert(table, dataone)
cursor = self.db.cursor()
cursor.execute(sqlstring,dataone)
self.db.commit()
the_id = int(cursor.lastrowid)
return the_id
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 插入多条数据
def insert_datamany_dict(self,table,datamany):
try:
sqlstring = self.generate_sqlstring_insert(table, datamany[0])
cursor = self.db.cursor()
cursor.executemany(sqlstring,datamany)
self.db.commit()
return True
except Exception:
return False
finally:
self.db.close() # 关闭数据库连接
# 更新数据
def update_data_dict(self,table,data,where):
try:
sqlstring = self.generate_sqlstring_update(table, data,where)
cursor = self.db.cursor()
cursor.execute(sqlstring,data)
self.db.commit()
return True
except Exception:
self.db.rollback()
return False
finally:
self.db.close() # 关闭数据库连接
# 删除数据
def delete_data_dict(self,table,where):
try:
sqlstring = self.generate_sqlstring_delete(table, where)
cursor = self.db.cursor()
cursor.execute(sqlstring)
self.db.commit()
return True
except Exception:
self.db.rollback()
return False
finally:
self.db.close() # 关闭数据库连接
# 生成sql语句-----插入
def generate_sqlstring_insert(self, table, data):
cols_name = ", ".join(f'`{key}`' for key in data.keys())
cols_value = ', '.join(f'%({key})s' for key in data.keys())
sqlstring = f"INSERT INTO {table} ({cols_name}) VALUES ({cols_value})"
return sqlstring
# 生成sql语句----更新
def generate_sqlstring_update(self, table, data,where):
cols = ", ".join(f'`{key}`=%({key})s' for key in data.keys())
sqlstring = f"UPDATE {table} SET {cols} {where}"
return sqlstring
# 生成sql语句----删除
def generate_sqlstring_delete(self, table ,where):
sqlstring = f"DELETE FROM {table} {where}"
return sqlstring