MetaData对象用于描述表结构,SQL Express Language是DBAPI SQL的统一封装器。MetaData 与SQL Express 语句可以在Core层使用,ORM层基于MetaData, SQL Express基础上做了进一步抽象。本章将介绍在Core层如何使用MetaData与SQL Express Language语句。
from sqlalchemy import MetaData
metadata_obj = MetaData()
创建了MetaData对象后,就可以用它来声明Table对象,每个字段用Column对象来表示
user_table = Table(
'user_account',
metadata_obj,
Column('id',Integer,primary_key=True),
Column('name',String(30)),
Column('speciality',String(30)),
)
说明:
必须说明,本例用 metadata 定义表结构的方式,不是ORM 表结构定义方式。
MetaData对象column对象来表示数据库字段,其主要属性
table对象的 c
属性 , 即 table.c
所有列名被放进 table.c数组中,引用列名方式:user_table.c.name
column数据类型
Alchemy 提供了足够的column数据类型,注意类型命名采用CamelCase风格,主要有:
Boolean Integer SmallInteger BigInteger Float Double String
Text Time Date DateTime Enum LargeBinary PickleType等。
Primary Key
Column("id", Integer, primary_key=True),
Index
Column(‘Addres’, String, index=True)
Foreign key
Column("user_id", ForeignKey("user_account.id"), nullable=False),
DDL 即create 语句,用MetaData对象的 create_all(),可将该对象上的所有 Table对象转为DDL发送给数据库
metadata_obj.create_all()
此方法会先查询DB中是否存在该表,再进行创建。
MetaData的其它方法
MetaData.tables 返回所有定义的table 对象
drop_all() 删除所有表
reflect() 从database已存在表创建table
使用方法:
读取db所有表,
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/test_db?charset=utf8")
meta_obj = MetaData()
meta_obj.reflect(bind=engine)
school_table = meta_obj.tables[‘school’]
读取指定表
>>> messages = Table("messages", metadata_obj, autoload_with=engine)
>>> [c.name for c in messages.columns]
['message_id', 'message_name', 'date']
如果存在外键, load主表时,也会自动加载辅表,
shopping_cart_items 外键字段引用了shopping_cards, 也被加载了
>>> shopping_cart_items = Table("shopping_cart_items", metadata_obj, autoload_with=engine)
>>> "shopping_carts" in metadata_obj.tables
True
Insert 单条数据 :
with engine.connect() as conn:
stmt = insert(asset_table).values(name="打印机",tag="A0001",value=3000,user_id=1)
result = conn.execute(stmt)
conn.commit()
print(result.inserted_primary_key)
判断插入结果 result 是否成功,
通过检查 result.inserted_primary_key, 如果为None表示插入失败。
插入多条数据
with engine.connect() as conn:
# send many statements
rows = [
{'name':'复印机','tag': 'A0002', 'value': 29000, 'user_id': 4 },
{'name':'20吨吊车','tag': 'D0001', 'value': 240000, 'user_id': 1 },
]
conn.execute( asset_table.insert(), rows )
conn.commit()
基本使用方法
from sqlalchemy import select
stmt = select(user_table).where(user_table.c.name == "spongebob")
print(stmt)
with Session(engine) as session:
for row in session.execute(stmt):
print(row)
选择部分字段:
select(user_table.c.name, user_table.c.fullname))
修改列名,
from sqlalchemy import func, cast
stmt = select(("Username: " + user_table.c.name).label("username"),).order_by(user_table.c.name)
with engine.connect() as conn:
for row in conn.execute(stmt):
print(f"{row.username}")
output
Username: 张锋
Username: 海绵宝宝
Username: 王小乙
Where 子句
select(user_table).where(user_table.c.name == "squidward"))
>>> print(
... select(address_table.c.email_address)
... .where(user_table.c.name == "squidward")
... .where(address_table.c.user_id == user_table.c.id)
... )
相当于SQL
SELECT address.email_address
FROM address, user_account
WHERE user_account.name = :name_1 AND address.user_id = user_account.id
join 联合查询
>>> print(
... select(address_table.c.email_address)
... .select_from(user_table)
... .join(address_table, user_table.c.id == address_table.c.user_id)
... )
SELECT address.email_address
FROM user_account JOIN address ON user_account.id = address.user_id
方法: Update()
stmt = update(user_table).values(fullname="Username: " + user_table.c.name)
更新多条数据 示例 :
>>> from sqlalchemy import bindparam
>>> stmt = (
... update(user_table)
... .where(user_table.c.name == bindparam("oldname"))
... .values(name=bindparam("newname"))
... )
>>> with engine.begin() as conn:
... conn.execute(
... stmt,
... [
... {"oldname": "jack", "newname": "ed"},
... {"oldname": "wendy", "newname": "mary"},
... {"oldname": "jim", "newname": "jake"},
... ],
... )
有外键数据更新
>>> scalar_subq = (
... select(address_table.c.email_address)
... .where(address_table.c.user_id == user_table.c.id)
... .order_by(address_table.c.id)
... .limit(1)
... .scalar_subquery()
... )
>>> update_stmt = update(user_table).values(fullname=scalar_subq)
>>> print(update_stmt)
主法: delete()
示例:
from sqlalchemy import delete
stmt = (
delete(user_table).
where(user_table.c.id == 5)
)
result = conn.execute(stmt)
Delete操作返回值类型为 CursorResult,可以用 result.rowcount 查看受影响行数,以确定是否成功。
多表删除:
delete_stmt = (
delete(user_table)
.where(user_table.c.id == address_table.c.user_id)
.where(address_table.c.email_address == "patrick@aol.com")
)
from sqlalchemy.dialects import mysql
print(delete_stmt.compile(dialect=mysql.dialect()))
DELETE FROM user_account USING user_account, address
WHERE user_account.id = address.user_id AND address.email_address = %s