SQLAlchemy 是否有与 Django 的 get_or_create 等效的功能?

2025-02-18 09:24:00
admin
原创
35
摘要:问题描述:如果对象已存在,则我想从数据库中获取它(基于提供的参数),如果对象不存在,则创建它。Django get_or_create(或源) 就是这样做的。SQLAlchemy 中是否有等效的快捷方式?我目前正在明确地写出如下内容:def get_or_create_instrument(session, ...

问题描述:

如果对象已存在,则我想从数据库中获取它(基于提供的参数),如果对象不存在,则创建它。

Django get_or_create(或源) 就是这样做的。SQLAlchemy 中是否有等效的快捷方式?

我目前正在明确地写出如下内容:

def get_or_create_instrument(session, serial_number):
    instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
    if instrument:
        return instrument
    else:
        instrument = Instrument(serial_number)
        session.add(instrument)
        return instrument

解决方案 1:

这基本上就是做事的方法,据我所知,没有现成的捷径。

当然你可以概括它:

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        params = {k: v for k, v in kwargs.items() if not isinstance(v, ClauseElement)}
        params.update(defaults or {})
        instance = model(**params)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

2020 更新(仅限 Python 3.9+)

这是使用 Python 3.9 的新字典联合运算符 (|=) 的更简洁版本

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        kwargs |= defaults or {}
        instance = model(**kwargs)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

笔记:

与 Django 版本类似,这将捕获重复键约束和类似错误。如果您的 get 或 create 不能保证返回单个结果,它仍然可能导致竞争条件。

为了缓解部分问题,您需要one_or_none()在 之后立即添加另一个样式提取session.commit()。除非您还使用 或可序列化事务模式,否则这仍然不能 100% 保证不会出现竞争条件with_for_update()

解决方案 2:

按照@WoLpH 的解决方案,这是对我有用的代码(简单版本):

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance

有了这个,我就能获取或创建我的模型的任何对象。

假设我的模型对象是:

class Country(Base):
    __tablename__ = 'countries'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)

为了获取或创建我的对象,我写道:

myCountry = get_or_create(session, Country, name=countryName)

解决方案 3:

我一直在研究这个问题并最终找到了一个相当可靠的解决方案:

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), False
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        created = getattr(model, create_method, model)(**kwargs)
        try:
            session.add(created)
            session.flush()
            return created, True
        except IntegrityError:
            session.rollback()
            return session.query(model).filter_by(**kwargs).one(), False

我刚刚写了一篇关于所有细节的相当详尽的博客文章,但也有一些关于我为什么使用它的想法。

  1. 它解压为一个元组,告诉你该对象是否存在。这在你的工作流程中通常很有用。

  2. 该函数可以与@classmethod装饰的创建者函数(及其特定属性)一起使用。

  3. 当有多个进程连接到数据存储时,该解决方案可以防止竞争条件。

编辑:我已更改session.commit()为如本博客文章session.flush()中所述。请注意,这些决定特定于所使用的数据存储(在本例中为 Postgres)。

编辑 2:我已更新使用 {} 作为函数中的默认值,因为这是典型的 Python 陷阱。感谢 Nigel 的评论!如果您对此陷阱感到好奇,请查看此 StackOverflow 问题和此博客文章。

解决方案 4:

erik 优秀答案的修改版本

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), True
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        try:
            with session.begin_nested():
                created = getattr(model, create_method, model)(**kwargs)
                session.add(created)
            return created, False
        except IntegrityError:
            return session.query(model).filter_by(**kwargs).one(), True
  • 使用嵌套事务仅回滚新项目的添加,而不是回滚所有内容(请参阅此答案以使用 SQLite 的嵌套事务)

  • 移动create_method。如果创建的对象具有关系,并且通过这些关系为其分配了成员,则会自动将其添加到会话中。例如,创建一个book具有user_iduser作为对应关系的 ,然后book.user=<user object>在 内执行create_method会将其添加book到会话中。这意味着create_method必须在 内with才能从最终回滚中受益。请注意 会begin_nested自动触发刷新。

请注意,如果使用 MySQL,则必须将事务隔离级别设置为READ COMMITTED而不是REPEATABLE READ才能使其正常工作。Django 的get_or_create (以及此处) 使用相同的策略,另请参阅 Django文档。

解决方案 5:

这个 SQLALchemy 配方可以出色且优雅地完成工作。

要做的第一件事是定义一个函数,该函数被赋予一个 Session 来处理,并将一个字典与 Session() 关联起来,以跟踪当前的唯一键。

def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
    cache = getattr(session, '_unique_cache', None)
    if cache is None:
        session._unique_cache = cache = {}

    key = (cls, hashfunc(*arg, **kw))
    if key in cache:
        return cache[key]
    else:
        with session.no_autoflush:
            q = session.query(cls)
            q = queryfunc(q, *arg, **kw)
            obj = q.first()
            if not obj:
                obj = constructor(*arg, **kw)
                session.add(obj)
        cache[key] = obj
        return obj

使用此功能的一个示例是在 mixin 中:

class UniqueMixin(object):
    @classmethod
    def unique_hash(cls, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def unique_filter(cls, query, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def as_unique(cls, session, *arg, **kw):
        return _unique(
                    session,
                    cls,
                    cls.unique_hash,
                    cls.unique_filter,
                    cls,
                    arg, kw
            )

最后创建独特的 get_or_create 模型:

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

engine = create_engine('sqlite://', echo=True)

Session = sessionmaker(bind=engine)

class Widget(UniqueMixin, Base):
    __tablename__ = 'widget'

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)

    @classmethod
    def unique_hash(cls, name):
        return name

    @classmethod
    def unique_filter(cls, query, name):
        return query.filter(Widget.name == name)

Base.metadata.create_all(engine)

session = Session()

w1, w2, w3 = Widget.as_unique(session, name='w1'), \n                Widget.as_unique(session, name='w2'), \n                Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')

assert w1 is w1b
assert w2 is not w3
assert w2 is not w1

session.commit()

这个菜谱对这个想法进行了更深入的阐述,并提供了不同的方法,但我已经使用了这个方法,并取得了巨大的成功。

解决方案 6:

语义上最接近的可能是:

def get_or_create(model, **kwargs):
    """SqlAlchemy implementation of Django's get_or_create.
    """
    session = Session()
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance, False
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance, True

不确定依赖Sessionsqlalchemy 中全局定义是否正确,但是 Django 版本不需要连接,所以......

返回的元组包含实例和一个布尔值,指示实例是否已创建(即,如果我们从数据库读取实例,则为 False)。

Djangoget_or_create经常用于确保全局数据的可用性,因此我尽可能早地提交。

解决方案 7:

我稍微简化了@Kevin。解决方案以避免将整个函数包装在if/else语句中。这样就只有一个return,我觉得更干净:

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()

    if not instance:
        instance = model(**kwargs)
        session.add(instance)

    return instance

解决方案 8:

有一个 Python 包包含@erik 的解决方案以及一个版本update_or_create()。https ://github.com/enricobarzetti/sqlalchemy_get_or_create

解决方案 9:

根据您采用的隔离级别,上述任何解决方案都不起作用。我发现的最佳解决方案是采用以下形式的 RAW SQL:

INSERT INTO table(f1, f2, unique_f3) 
SELECT 'v1', 'v2', 'v3' 
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')

无论隔离级别和并行度如何,这在事务上都是安全的。

注意:为了提高效率,为唯一列建立索引是明智之举。

解决方案 10:

我经常遇到的一个问题是,当一个字段具有最大长度(例如STRING(40)),并且您想get or create对长度很长的字符串执行时,上述解决方案将会失败。

基于上述解决方案,我的方法如下:

from sqlalchemy import Column, String

def get_or_create(self, add=True, flush=True, commit=False, **kwargs):
    """

    Get the an entity based on the kwargs or create an entity with those kwargs.

    Params:
        add: (default True) should the instance be added to the session?
        flush: (default True) flush the instance to the session?
        commit: (default False) commit the session?
        kwargs: key, value pairs of parameters to lookup/create.

    Ex: SocialPlatform.get_or_create(**{'name':'facebook'})
        returns --> existing record or, will create a new record

    ---------

    NOTE: I like to add this as a classmethod in the base class of my tables, so that
    all data models inherit the base class --> functionality is transmitted across
    all orm defined models.

    """


    # Truncate values if necessary
    for key, value in kwargs.items():

        # Only use strings
        if not isinstance(value, str):
            continue

        # Only use if it's a column
        my_col = getattr(self.__table__.columns, key)

        if not isinstance(my_col, Column):
            continue

        # Skip non strings again here
        if not isinstance(my_col.type, String):
            continue

        # Get the max length
        max_len = my_col.type.length

        if value and max_len and len(value) > max_len:

            # Update the value
            value = value[:max_len]
            kwargs[key] = value

    # -------------------------------------------------

    # Make the query...
    instance = session.query(self).filter_by(**kwargs).first()

    if instance:
        return instance

    else:
        # Max length isn't accounted for here.
        # The assumption is that auto-truncation will happen on the child-model
        # Or directtly in the db
        instance = self(**kwargs)

    # You'll usually want to add to the session
    if add:
        session.add(instance)

    # Navigate these with caution
    if add and commit:
        try:
            session.commit()
        except IntegrityError:
            session.rollback()

    elif add and flush:
        session.flush()


    return instance
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1325  
  IPD(Integrated Product Development)流程作为一种先进的产品开发管理模式,在众多企业中得到了广泛应用。它涵盖了从产品概念产生到产品退市的整个生命周期,通过整合跨部门团队、优化流程等方式,显著提升产品开发的效率和质量,进而为项目的成功奠定坚实基础。深入探究IPD流程的五个阶段与项目成功之间...
IPD流程分为几个阶段   4  
  华为作为全球知名的科技企业,其成功背后的管理体系备受关注。IPD(集成产品开发)流程作为华为核心的产品开发管理模式,其中的创新管理与实践更是蕴含着丰富的经验和深刻的智慧,对众多企业具有重要的借鉴意义。IPD流程的核心架构IPD流程旨在打破部门墙,实现跨部门的高效协作,将产品开发视为一个整体的流程。它涵盖了从市场需求分析...
华为IPD是什么   3  
  IPD(Integrated Product Development)研发管理体系作为一种先进的产品开发模式,在众多企业的发展历程中发挥了至关重要的作用。它不仅仅是一套流程,更是一种理念,一种能够全方位提升企业竞争力,推动企业持续发展的有效工具。深入探究IPD研发管理体系如何助力企业持续发展,对于众多渴望在市场中立足并...
IPD管理流程   3  
  IPD(Integrated Product Development)流程管理旨在通过整合产品开发流程、团队和资源,实现产品的快速、高质量交付。在这一过程中,有效降低成本是企业提升竞争力的关键。通过优化IPD流程管理中的各个环节,可以在不牺牲产品质量和性能的前提下,实现成本的显著降低,为企业创造更大的价值。优化产品规划...
IPD流程分为几个阶段   4  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用