Source code for flask_mongo_drf.mongo_models

# mongo_models.py
from bson import ObjectId
from datetime import datetime
from pymongo.collection import Collection
from typing import Dict, Any, List, Optional, Iterator
from pymongo.results import InsertOneResult, UpdateResult, DeleteResult


[docs] class MongoBaseModel: """ MongoDB 基础模型类。 通过依赖注入方式传入 PyMongo Collection 对象,与框架解耦。 """
[docs] def __init__(self, collection: Collection): """ :param collection: PyMongo Collection 实例 """ self.collection = collection
[docs] def count_documents(self, query: Dict[str, Any]) -> int: """统计文档数量""" return self.collection.count_documents(query)
[docs] def find_all(self, query: Dict[str, Any], sort_by: str = None, sort_order: int = -1) -> Iterator[Dict[str, Any]]: """ 返回游标,避免一次性加载所有文档。 若需要排序,传入 sort_by 字段名和 sort_order(1 升序,-1 降序)。 """ cursor = self.collection.find(query) if sort_by: cursor = cursor.sort(sort_by, sort_order) return cursor
[docs] def find_paginated( self, query: Dict[str, Any], skip: int, limit: int, sort_by: str = "_id", sort_order: int = -1 ) -> List[Dict[str, Any]]: """分页查询,返回文档列表""" cursor = self.collection.find(query).sort(sort_by, sort_order).skip(skip).limit(limit) return list(cursor)
[docs] def insert_one(self, data: Dict[str, Any]) -> InsertOneResult: """插入单条文档,自动添加 create_time 和 update_time""" data.setdefault("create_time", datetime.now()) data["update_time"] = datetime.now() return self.collection.insert_one(data)
[docs] def update_one_by_id(self, doc_id: str, data: Dict[str, Any]) -> UpdateResult: """根据 _id 部分更新文档(PATCH)""" mongo_id = ObjectId(doc_id) if isinstance(doc_id, str) else doc_id data['update_time'] = datetime.now() return self.collection.update_one({"_id": mongo_id}, {"$set": data})
[docs] def replace_by_id(self, doc_id: str, new_doc: Dict[str, Any]) -> UpdateResult: """根据 _id 全量替换文档(PUT)""" mongo_id = ObjectId(doc_id) if isinstance(doc_id, str) else doc_id new_doc['update_time'] = datetime.now() new_doc['_id'] = mongo_id return self.collection.replace_one({"_id": mongo_id}, new_doc)
[docs] def delete_by_id(self, doc_id: str) -> DeleteResult: """根据 _id 删除文档""" mongo_id = ObjectId(doc_id) if isinstance(doc_id, str) else doc_id return self.collection.delete_one({"_id": mongo_id})
[docs] def find_one(self, query: Dict[str, Any]) -> Optional[Dict[str, Any]]: """查询单条文档""" return self.collection.find_one(query)
[docs] def aggregate(self, pipeline: List[Dict[str, Any]]): """执行聚合管道,返回游标""" return self.collection.aggregate(pipeline)