【Python】使用pymongo库封装一个MongoDB操作类

发布时间:2023年12月29日

直接上代码不废话

# -*- coding:utf-8 _*_
"""
@Project    : Qingyuing-Server 
@File       : new_mongo_util.py 
@Author     : 晴天 
@CreateTime : 2023-12-25 17:24 
"""
from config import Config
from pymongo import MongoClient
from contextlib import contextmanager


class MongoOperator:
    def __init__(self, uri, db_name):
        """
        初始化MongoOperator类
        :param uri: MongoDB连接URI,格式为"mongodb://user:pass@host:port"
        :param db_name: 数据库名称
        """
        self.uri = uri
        self.db_name = db_name
        self.client = MongoClient(self.uri)

    @contextmanager
    def connect(self):
        """
        上下文管理器,用于安全地管理MongoDB连接
        """
        try:
            db = self.client[self.db_name]
            yield db
        finally:
            self.client.close()

    def insert_one(self, collection_name, data):
        """
        向集合中插入单条数据
        :param collection_name: 集合名称
        :param data: 要插入的数据(字典类型)
        """
        with self.connect() as db:
            collection = db[collection_name]
            result = collection.insert_one(data)
            return result.inserted_id

    def insert_many(self, collection_name, data_list):
        """
        向集合中插入多条数据
        :param collection_name: 集合名称
        :param data_list: 要插入的数据列表(列表中的每个元素都是字典类型)
        """
        with self.connect() as db:
            collection = db[collection_name]
            result = collection.insert_many(data_list)
            return result.inserted_ids

    def find_one(self, collection_name, query):
        """
        查询集合中的数据
        :param collection_name: 集合名称
        :param query: 查询条件(字典类型)
        :return: 查询结果列表
        """
        with self.connect() as db:
            collection = db[collection_name]
            return list(collection.find(query))

    def update_one(self, collection_name, query, update):
        """
        更新集合中的单条数据
        :param collection_name: 集合名称
        :param query: 查询条件(字典类型)
        :param update: 更新内容(字典类型)
        """
        with self.connect() as db:
            collection = db[collection_name]
            result = collection.update_one(query, update)
            return result.modified_count

    def update_many(self, collection_name, query, update):
        """
        更新集合中的多条数据
        :param collection_name: 集合名称
        :param query: 查询条件(字典类型)
        :param update: 更新内容(字典类型)
        """
        with self.connect() as db:
            collection = db[collection_name]
            result = collection.update_many(query, update)
            return result.modified_count

    def delete_one(self, collection_name, query):
        """
        删除集合中的单条数据
        :param collection_name: 集合名称
        :param query: 查询条件(字典类型)
        """
        with self.connect() as db:
            collection = db[collection_name]
            result = collection.delete_one(query)
            return result.deleted_count

    def delete_many(self, collection_name, query):
        """
        删除集合中的多条数据
        :param collection_name: 集合名称
        :param query: 查询条件(字典类型)
        """
        with self.connect() as db:
            collection = db[collection_name]
            result = collection.delete_many(query)
            return result.deleted_count


mongo = MongoOperator(Config.MONGODB_URL, Config.MONGODB_DB_NAME)


# 使用示例
if __name__ == '__main__':
    user_info = {
        'username': 'qingyu',
        'password': '111111',
        'email': '52111890@qq.com',
        'display_id': '52111890',
        'login': '52111890',
        'nickname': '柳神',
        'birthday': '1970-01-01',
        'gender': '女',
        'is_active': True,
        'is_deleted': False,
        'head_avatar': 'https://github.com/login.png',
        'ip_create': '192.168.11.13',
        'role_info': [],
        'fans_num': '3',
        'follows_num': '',
        'created_date': '',
        'create_time': '',
        'update_time': '',
        'created_by': 'system',
        'last_modified_by': 'system',
        'last_modified_date': '',
        'access_token': '',
        'refresh_token': '',

    }

    with mongo.connect() as db_obj:
        inserted_id = mongo.insert_one('user', user_info)
        print(f'新增数据成功: {inserted_id}')

    with mongo.connect() as delete_obj:
        results = mongo.delete_one('user', {'login': '52111890'})
        print('删除数据成功: ', results)

    with mongo.connect() as update_obj:
        results = mongo.update_one('user', {'login': '52111890'}, {'$set': {'nickname': '柳神'}})
        print('更新数据成功: ', results)

    with mongo.connect() as query_obj:
        user_info = mongo.find_one('user', {'display_id': user_info['display_id']})
        print('查询数据成功: ', user_info)

文章来源:https://blog.csdn.net/weixin_43712023/article/details/135206127
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。