flask-sqlalchemy中的基本sql操作

发布时间:2024年01月12日

更新

# 批量更新某表多条数据的某些字段
HeTongTaiZhangLaoWu.query.filter_by(xiang_mu_id=xiang_mu_id).update({"is_delete": True})

# 批量更新某表不同数据不同字段,更新数据中需要有主键
update_data = [
    {'id': 1, 'is_delete': True, 'name': '张三'},
    {'id': 2, 'is_delete': False, 'age': 20}
]
db.session.bulk_update_mappings(RenYuanXinXi, update_data)

# 更新单条数据
ren_yuan_xin_xi.name = '张三'  # ren_yuan_xin_xi是数据库查询的实例化对象

新增

# 创建一条
db.session.add(RenYuanXinXi(name='张三'))

# 批量创建数据,同表数据
create_data = [
    {'name': '张三', 'age': 20},
    {'name': '李四', 'age': 22},
]
db.session.bulk_insert_mappings(RenYuanXinXi, create_data)

# 批量创建数据,不同表数据
create_data = []
create_data.append(RenYuanXinXi(name='张三'))
create_data.append(XiangMuYunXingZhiBiaoCanShuShiJianSheZhi(suo_shu_zu_zhi=1))
db.session.add_all(create_data)

查询

基本查询

# 查询符合条件的第一条, 没有返回None
ManYiDuDiaoChaShiJianSheZhi.query.filter(ManYiDuDiaoChaShiJianSheZhi.suo_shu_zu_zhi == zu_zhi_id, ManYiDuDiaoChaShiJianSheZhi.is_delete == false()).first()

# 查询符合条件的所有, 没有返回空的查询集
ManYiDuDiaoChaShiJianSheZhi.query.filter(ManYiDuDiaoChaShiJianSheZhi.suo_shu_zu_zhi == zu_zhi_id, ManYiDuDiaoChaShiJianSheZhi.is_delete == false()).all()

# 预加载数据
ManYiDuDiaoChaShiJianSheZhi.query.options(selectinload(ManYiDuDiaoChaShiJianSheZhi.can_shus)).filter(ManYiDuDiaoChaShiJianSheZhi.id == subquery).first()

查询id最大的一条数据

shi_jian = ManYiDuDiaoChaShiJianSheZhi.query.filter(ManYiDuDiaoChaShiJianSheZhi.suo_shu_zu_zhi == zu_zhi_id, ManYiDuDiaoChaShiJianSheZhi.is_delete == false()).order_by(ManYiDuDiaoChaShiJianSheZhi.id.desc()).first()

查询关联表id最大的所有数据

                    subquery = db.session.query(db.func.max(XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.id)).filter(
                        XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.suo_shu_xiang_mu == zu_zhi_id).scalar_subquery()
                    db_data = db_class.query.options(selectinload(db_class.xiang_mu_shi_jian_she_zhi)).filter(
                        db_class.suo_shu_xiang_mu_ == zu_zhi_id, db_class.shi_jian_she_zhi_id == subquery).all()

连接两表查询

                    subquery = db.session.query(db.func.max(XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.id)).filter(
                        XiangMuYunXingZhiBiaoCanShuShiJianSheZhi.suo_shu_xiang_mu == zu_zhi_id).scalar_subquery()
                    db_data = db_class.query.join(XiangMuYunXingZhiBiaoCanShu).options(
                        selectinload(db_class.xiang_mu_shi_jian_she_zhi),
                        selectinload(db_class.xiang_mu_yun_xing_zhi_biao_can_shu)).filter(
                            db_class.suo_shu_xiang_mu_ == zu_zhi_id, db_class.shi_jian_she_zhi_id == subquery,
                            XiangMuYunXingZhiBiaoCanShu.zhi_biao_type == zhi_biao).all()

查询年龄在30以下、30-40、40-50、50以上的人数,人员按照身份证号码去重

        nian_ling_ren_shu = (
            db.session.query(
                db.func.count().label("count"),
                db.case(
                    (FenBaoShangRenYuanZhuCeBiao.nian_ling < 30, "30岁以下"),
                    (FenBaoShangRenYuanZhuCeBiao.nian_ling.between(30, 50), "30-50岁"),
                    (FenBaoShangRenYuanZhuCeBiao.nian_ling > 50, "50岁以上"),
                    else_="其他",
                ).label("nian_ling"),
            )
            .distinct(FenBaoShangRenYuanZhuCeBiao.shen_fen_zheng_hao_ma)
            .group_by("nian_ling")
            .all()
        )

子查询构建指定数组内的数据

        tong_ji_yue_fen_subquery = (
            db.session.query(db.func.distinct(model.tong_ji_yue_fen))
            .filter(
                model.zu_zhi_id == 5,
                model.tong_ji_yue_fen >= start_time,
                model.tong_ji_yue_fen < end_time,
            )
            .scalar_subquery()
        )
        if data_type == "last":
            filters.append(model.tong_ji_yue_fen.in_(tong_ji_yue_fen_subquery))

枚举

根据枚举值获取枚举常量

class ShiYongDanWeiType(enum.Enum):
    lao_wu_fen_bao = '劳务分包'
    zhuan_ye_fen_bao = '专业分包'
    zong_bao_dan_wei = '总包单位'
    qi_ta_dan_wei = '其他单位'
    
# 得到枚举常量
ShiYongDanWeiType('总包单位') # 通过value获得
ShiYongDanWeiType.zong_bao_dan_wei
ShiYongDanWeiType["zong_bao_dan_wei"] # 通过name获得

# 获取枚举key
ShiYongDanWeiType.lao_wu_fen_bao.name  # 返回 lao_wu_fen_bao 字符串

# 获取枚举value
ShiYongDanWeiType.lao_wu_fen_bao.value  # 返回 劳务分包 字符串

# 枚举类查询
class ZhaoTouBiaoWenJianType(enum.Enum):
    zhao_biao_wen_jian = '招标文件'
    tou_biao_wen_jian = '投标文件'
ZhaoTouBiaoWenJian.query.filter(ZhaoTouBiaoWenJian.wen_jian_lei_xing == 'zhao_biao_wen_jian').all()

打印

.__dict__

# db.session.query方式查询出来的数据打印
._asdict()

# 打印 SQL 语句
print(db.session.query(*querys).filter(*filters).group_by(*group_by).statement)

# 打印查询参数
print(db.session.query(*querys).filter(*filters).group_by(*group_by).params)

创建或更新后立即获取新数据的相关属性

    # 创建
    record=[]
    dan_xiang_gong_chengs=[
        {
            "suo_shu_xiang_mu": 1,
        },
        {
            "suo_shu_xiang_mu": 2,
        }
    ]
    for obj in dan_xiang_gong_chengs:
        record.append(DanXiangGongCheng(**obj))
    db.session.add_all(record)
    db.session.commit()
    for obj in record:
        print('+++++++++++++++',obj.id)
    # 更新
    db_objs = DanXiangGongCheng.query.filter(DanXiangGongCheng.id.in_([4,5])).all()
    for obj in db_objs:
        # ㎡
        obj.gui_mo_dan_wei='m'
    db.session.commit()
    for obj in db_objs:
        print(obj.gui_mo_dan_wei)

当需要获取日期类型的属性时,上述方式有一个需要注意的点:

    # 创建
    record = []
    dan_xiang_gong_chengs = [
        {
            "suo_shu_xiang_mu": 1,
            "created_at": "2021-01-01",
        },
        {
            "suo_shu_xiang_mu": 2,
        },
    ]
    for obj in dan_xiang_gong_chengs:
        record.append(DanXiangGongCheng(**obj))
    db.session.add_all(record)
    db.session.flush()
    for obj in record:
        print("+++++++++++++++", type(obj.created_at))

这种情况下,第一次会输出<class 'str'>,第二次会输出<class 'datetime.datetime'>。也就是说,手动创建时间类型的数据时,flush之后,自己创建时是什么类型,获取到的还是什么类型;数据库默认创建的时间类型是datetime类型的。

文章来源:https://blog.csdn.net/weixin_45605541/article/details/135545957
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。