在第一章中提到,Sqlalchemy提供了两套方法来访问数据库,由于Sqlalchemy 文档杂乱,对于ORM的使用步骤讲解杂乱,SqlAlchemy2.x 与j1.x版本差异也较大,很多介绍SqlAlchemy的文章上来就讲ORM,但又混杂着CoreAPI,常令初学者遇到各种问题。因此,本人建议先使用Core API来访问数据库,使用上更接近于 Sqlite3, Mysql-connector 等的方式,入门容易,而且也可以实现1套代码支持各类数据库。
因此,本教程开头3章均以Core API方式为主。
示例功能:
Step-1: 创建数据库引擎对象
DB Engine 是个全局变量,允许在其上建立多个connection访问数据库。
创建 DB Engine 实例的方法:
create_engine( db_url )
db_url参数在后面章节中详解介绍。本例使用sqlite3 内存数据库。
from sqlalchemy import create_engine
engine = create_engine("sqlite:///:memory:", echo=True)
Step-2 创建connect对象
connection 对象用于数据库操作。其支持context with语法
from sqlalchemy import text
with engine.connect() as conn:
result = conn.execute(text("select 'hello world'"))
print(result.all())
output
[('hello world',)]
Step-3 执行SQL Express 语句
text() 是SQL express 的最简单使用形式, 方便传值
创建1张表
conn.execute(text("CREATE TABLE some_table (x int, y int)"))
插入数据,
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[ { "x": 1, "y": 1}, {"x": 2, "y": 4 } ],
)
SQL express传参语法:
:x, :y
, 参数名前加:
分号[ dict, … ]
方式给出。提交事务, 即将操作保存至数据库
conn.commit()
Step4 执行查询并获取结果
with engine.connect() as conn:
result = conn.execute(text("SELECT x, y FROM some_table"))
for row in result:
print(f"x: {row.x} y: {row.y}")
本例中,select x,y from some_table
将返回所有行
返回结果类型为 sqlalchemy.engine.cursor.CursorResult,是1个由 object 组成的可迭代对象。提供了多种方法访问结果数据:
还可以向查询语句传参:
result = conn.execute(text("SELECT x, y FROM some_table WHERE y > :y"), {"y": 2})
编写代码时1个好习惯:先写出异常与错误处理语句框架,再写正常流程部分,这样的习惯可以让代码更健壮,避免程序运行中断或出错。
虽然看似麻烦,但最终代码测试中遇到的问题更少,而且错误日志也更精准,问题定位效率更高,所以这样做将更省时间。
from sqlalchemy import create_engine
from sqlalchemy.exc import IntegrityError, ProgrammingError
engine = create_engine('mysql://username:password@localhost/mydatabase')
try:
# 执行数据库操作
connection = engine.connect()
# ...
# 这里是可能引发异常的代码
# ...
connection.close()
except IntegrityError as e:
# 处理唯一性约束违反等完整性错误
print(f'Integrity error occurred: {str(e)}')
except ProgrammingError as e:
# 处理SQL语法或参数错误
print(f'Programming error occurred: {str(e)}')
except SQLAlchemyError as e:
# 处理其他SQLAlchemy异常
print(f'An error occurred: {str(e)}')
Engine 是db连接管理类,
语法:
from sqlalchemy import create_engine
#创建引擎对象
engine = create_engine("sqlite:///:memory:", echo=True)
#连接数据库
conn = engine.connect()
Sqlalchemy.create_engine( ) 方法第1个参数是db连接表达式,格式为:
dialect[+driver]://user:password@host/dbname
上面示例是sqlite的连接表达式。 Driver是python访问数据库的DBAPI库。
e = create_engine('sqlite:///path/to/database.db')
如果是绝对地址 sqlite:usr/local/myproject/database.db
:memory 表示使用内存数据库,不保存在硬盘。
对于windows 系统,
e = create_engine('sqlite:///C:\\myapp\\db\\main.db')
Mysql 的DBAPI,常用的有PyMysql 与 mysql-connector,其连接表达式分别为:
mysql+pymysql://root:123456@192.168.99.240:3306/testdb
mysql+mysqlconnector://roprot:123456@192.168.99.240:3306/testdb
通常使用的接口库为 psycopg2
postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]
engine = create_engine(
"postgresql+psycopg2://scott:tiger@localhost/test",
isolation_level="SERIALIZABLE",
)
Ssl连接
engine = sa.create_engine(
"postgresql+psycopg2://scott:tiger@192.168.0.199:5432/test?sslmode=require"
)
engine = create_engine("mongodb:///?Server=MyServer&Port=27017&Database=test&User=test&Password=Password")
定义1个mapping类
base = declarative_base()
class restaurants(base):
__tablename__ = "restaurants"
borough = Column(String,primary_key=True)
cuisine = Column(String)
查询:
engine=create_engine("mongodb:///?Server=MyServer&Port=27017&Database=test&User=test&Password=Password")
factory = sessionmaker(bind=engine)
session = factory()
for instance in session.query(restaurants).filter_by(Name="Morris Park Bake Shop"):
print("borough: ", instance.borough)
print("cuisine: ", instance.cuisine)
print("---------")
语法:
conn = engine.connect()
如
e = create_engine('sqlite:///C:\\myapp\\db\\main.db')
conn = e.connect()
推荐使用context with 语法使用connect对象
from sqlalchemy import create_engine, text
engine = create_engine('sqlite:///C:\\myapp\\db\\main.db')
with engine.connect() as connection:
result = connection.execute(text("select username from users"))
for row in result:
print("username:", row["username"])
如果修改了数据,应调用 conn.commit() 提交transaction
Sqlalchemy 对sql进行了封装,其SQL Express语法比直接使用sql 语句更方便,优势是传参与获取返回值更省事。
text()方法是CoreAPI中最基础的方法之一,主要作用,用于封装 sql 语句
from sqlalchemy import text
t_sql = text("SELECT * FROM users")
result = connection.execute(t_sql)
传参:
t_sql = text("SELECT * FROM users WHERE id=:user_id")
result = connection.execute(t_sql, { ‘user_id’: 12 } )
如果使用r” “ ,则用 : 来表示:
也可以通过 text(sql_statement).bindparams() 直接构建完整的SQL语句
from sqlalchemy import text, bindparams
stmt = text("SELECT id, name FROM user WHERE name=:name "
"AND timestamp=:timestamp")
stmt = stmt.bindparams(name='jack',
timestamp=datetime.datetime(2012, 10, 8, 15, 12, 5)
)
result = conn.execute(stmt)
print(result.all())
bindparams()中可添加参数Type检查:
from sqlalchemy import text
stmt = text("SELECT id, name FROM user WHERE name=:name "
"AND timestamp=:timestamp")
stmt = stmt.bindparams(
bindparam('name', type_=String),
bindparam('timestamp', type_=DateTime)
)
stmt = stmt.bindparams(name='jack',
timestamp=datetime.datetime(2012, 10, 8, 15, 12, 5))
result = conn.execute(stmt)
print(result.all())
查询结果类型为 sqlalchemy.engine.Result 类,是1个由 object 组成的列表。可以用多种方法访问:
说明:
result = conn.execute(text("select x, y from some_table"))
for row in result:
print(f"Row: {row.x} {row.y}")
result = conn.execute(text("select x, y from some_table"))
for dict_row in result.mappings():
x = dict_row["x"]
y = dict_row["y"]
SqlAlchemy可以用connect对象与 session 对象来执行SQL express
connect对象是直接调用DBAPI执行SQL语句,这是使用SqlAlchemy 最简单的方式,同时支持部分Sqlalchemy 的SQL Express 封装语法,但执行的SQL语句依然还要符合各数据库的接口库要求。
Session对象则实现了同1套接口适用于所有数据库。但主要用于ORM API方式。
connect对象操作数据库的好处:可使用text()方法生成SQL语句,利用bindparams() 传值,以及做类型检查。同时支持多线程访问数据库。
创建表的方法,前面已讲过。 下面示例为 insert, update, delete 操作
# insert row
print("-"*50+"Insert operation")
stmt = text("INSERT INTO some_table VALUES(:x, :y)").bindparams(x=6,y=19)
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
result = conn.execute( text("select * from some_table") )
print(result.all())
# update row
print("-"*50+"update operation")
stmt = text("UPDATE some_table SET y=:y WHERE x=:x").bindparams(y=99,x=5)
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
result = conn.execute( text("select * from some_table") )
print(result.all())
# delete row
print("-"*50+"delete operation")
stmt = text("DELETE FROM some_table WHERE x=:x").bindparams(x=4)
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
result = conn.execute( text("select * from some_table") )
print(result.rowcount)
print(result.all())
output:
--------------------------------------------------Insert operation
2023-12-03 15:50:36,978 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-12-03 15:50:36,978 INFO sqlalchemy.engine.Engine INSERT INTO some_table VALUES(?, ?)
2023-12-03 15:50:36,978 INFO sqlalchemy.engine.Engine [generated in 0.00085s] (6, 19)
2023-12-03 15:50:36,979 INFO sqlalchemy.engine.Engine COMMIT
2023-12-03 15:50:36,980 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-12-03 15:50:36,980 INFO sqlalchemy.engine.Engine select * from some_table
2023-12-03 15:50:36,981 INFO sqlalchemy.engine.Engine [generated in 0.00132s] ()
[(1, 1), (2, 4), (3, 10), (4, 11), (5, 25), (6, 19)]
2023-12-03 15:50:36,982 INFO sqlalchemy.engine.Engine ROLLBACK
--------------------------------------------------update operation
[(1, 1), (2, 4), (3, 10), (4, 11), (5, 99), (6, 19)]
2023-12-03 15:50:36,985 INFO sqlalchemy.engine.Engine ROLLBACK
--------------------------------------------------delete operation
[(1, 1), (2, 4), (3, 10), (5, 99), (6, 19)]
2023-12-03 15:50:36,989 INFO sqlalchemy.engine.Engine ROLLBACK
Sqlalchemy 使用DBAPI处理表间关系语法是依据数据库规定, 但基本均支持标准SQL语法
CREATE TABLE tracks(
……
trackartist INTEGER, -- 外键字段
FOREIGN KEY(trackartist) REFERENCES artist(artistid)
)
辅表artist.id字段须为主键或unique index。
示例 :
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker
engine = create_engine("sqlite:///order.db")
# create table people
with engine.connect() as conn:
conn.execute(text("drop table if exists people;"))
stmt = text("""
CREATE TABLE people(
id integer PRIMARY KEY,
name TEXT,
age INTEGER
)
""" )
conn.execute(stmt)
conn.execute(
text("INSERT INTO people (id,name, age) VALUES (:id,:name, :age)"),
[
{'id': 1, "name": 'Jack','age':30 },
{'id': 2, "name": 'Smith','age':28 },
{'id': 3, "name": 'Wang','age':35 },
]
)
conn.commit()
result = conn.execute( text("select * from people") )
print(result.rowcount)
print(result.all())
# create table order
# 创建会话(Session)
with engine.connect() as conn:
conn.execute(text("drop table if exists teams"))
stmt_1 = text("""
create table teams(
id integer PRIMARY KEY,
team_name TEXT,
pid integer,
foreign key (pid) REFERENCES people(id)
)
""")
conn.execute(stmt_1)
conn.commit()
conn.execute(
text("INSERT INTO teams (id, team_name, pid) VALUES (:id, :team_name, :pid)"),
[
{'id': 101, "team_name": 'TV product','pid':1 },
{'id': 102, "team_name": 'Software development','pid':2 },
{'id': 103, "team_name": 'Electric development','pid':2 },
]
)
conn.commit()
# 跨表查询
result = conn.execute( text("select a.id, a.team_name, b.name from teams as a left join people as b on a.pid=b.id") )
print(result.rowcount)
for row in result.mappings():
print(row['id'], row['team_name'], row['name'])
sqlalchemy的engine可做为全局变量, 将connect对象,或 session对象传入线程,实现多线程访问:
示例:
def thread_db(conn,name):
try:
result = conn.execute( text("select * from people") )
print(result.rowcount)
print(f"thread {{ name }} result: ")
print(result.all())
except Exception as e:
print("can't open connection object")
finally:
conn.close()
from threading import Thread
t1 = Thread(target=thread_db, args=(engine.connect(),"thread_a"))
t2 = Thread(target=thread_db, args=(engine.connect(),"thread_b"))
t1.start()
t2.start()
t1.join()
t2.join()
print("main thread is ended")
output:
thread { name } result:
thread { name } result:
[(1, 'Jack', 30), (2, 'Smith', 28), (3, 'Wang', 35)]
[(1, 'Jack', 30), (2, 'Smith', 28), (3, 'Wang', 35)]
main thread is ended