我有一个这样的模型:
class Company(db.Model): __tablename__ =“my_table” id = db.Column(db.Integer(),primary_key = True) name = db.Column(db.String(128),unique = True,nullable = False)…
一般来说你可以 try/except 错误“捕获”它,记录错误,然后向用户返回自定义错误。像这样:
try/except
c = Company("Test", "test") try: db.session.add(c) db.session.commit() return "Added!" except Exception as e: db.session.rollback() return f"failed to insert company: {e.__class__.__name__}"
这似乎是一个快速的答案,而不是正确的答案。相反,我会在尝试插入之前添加一些验证:
c = Company("Test", "test") # note this is pseudo code if Company.find.get("Test"): try: db.session.add(c) db.session.commit() return "Added!" except Exception as e: db.session.rollback() return f"failed to insert company: {e.__class__.__name__}" else: return f"company {c.id} already exists"
这样你就不会引发错误,而是你的应用程序处理它的'数据而不是插入错误。
你可以导入 exception 然后,自己处理:
exception
from sqlite3.__init__ import IntegrityError
这将为您提供例外名称,然后您可以执行以下操作:
except IntegrityError : db.session.rollback() return f"duplicate data has been used!"
或者你需要处理这个例外。
但请记住,如果您使用的话,这只会捕获错误 sqlite 包,而不是 sqlalchemy 所以,如果你改变了 db engine 在某个地方,你可能无法处理这个问题 exception 。
sqlite
sqlalchemy
db engine
from sqlalchemy.exc import IntegrityError
是例外 class 你需要筹集资金 sqlalchemy 提出例外。
class
SQLAlchemy包含一种允许使用的自定义DBAPI错误的机制 了handle_error 事件钩子。我在Openstack中使用了这个API oslo.db 可以在这个文件中看到: https://github.com/openstack/oslo.db/blob/master/oslo_db/sqlalchemy/exc_filters.py 。
由于stackoverflow讨厌代码链接,这里是基于上述链接方法的POC:
import collections from sqlalchemy import event from sqlalchemy import exc as sqla_exc import re class DuplicateKeyError(Exception): """Duplicate entry at unique column error.""" def __init__(self, columns=None, inner_exception=None, value=None): self.columns = columns or [] self.value = value self.inner_exception = inner_exception def __str__(self): return "Duplicate key for columns %s" % ( self.columns, ) _registry = collections.defaultdict(lambda: collections.defaultdict(list)) def filters(ame, exception_type, regex): """Mark a function as receiving a filtered exception.""" def _receive(fn): _registry[ame][exception_type].extend( (fn, re.compile(reg)) for reg in ((regex,) if not isinstance(regex, tuple) else regex) ) return fn return _receive # each @filters() lists a database name, a SQLAlchemy exception to catch, # and a list of regular expressions that will be matched. If all the # conditions match, the handler is called which then raises a nicer # error message. @filters( "sqlite", sqla_exc.IntegrityError, ( r"^.*columns?(?P<columns>[^)]+)(is|are)\s+not\s+unique$", r"^.*UNIQUE\s+constraint\s+failed:\s+(?P<columns>.+)$", r"^.*PRIMARY\s+KEY\s+must\s+be\s+unique.*$", ), ) def _sqlite_dupe_key_error(integrity_error, match, engine_name, is_disconnect): columns = [] try: columns = match.group("columns") columns = [c.split(".")[-1] for c in columns.strip().split(", ")] except IndexError: pass raise DuplicateKeyError(columns, integrity_error) @filters( "mysql", sqla_exc.IntegrityError, r"^.*\b1062\b.*Duplicate entry '(?P<value>.*)'" r" for key '(?P<columns>[^']+)'.*$", ) @filters( "postgresql", sqla_exc.IntegrityError, ( r'^.*duplicate\s+key.*"(?P<columns>[^"]+)"\s*\n.*' r"Key\s+\((?P<key>.*)\)=\((?P<value>.*)\)\s+already\s+exists.*$", r"^.*duplicate\s+key.*\"(?P<columns>[^\"]+)\"\s*\n.*$", ), ) def _default_dupe_key_error( integrity_error, match, engine_name, is_disconnect ): columns = match.group("columns") uniqbase = "uniq_" if not columns.startswith(uniqbase): if engine_name == "postgresql": columns = [columns[columns.index("_") + 1 : columns.rindex("_")]] else: columns = [columns] else: columns = columns[len(uniqbase) :].split("0")[1:] value = match.groupdict().get("value") raise DuplicateKeyError(columns, integrity_error, value) def handler(context): """Iterate through available filters and invoke those which match. The first one which raises wins. """ def _dialect_registries(engine): if engine.dialect.name in _registry: yield _registry[engine.dialect.name] if "*" in _registry: yield _registry["*"] for per_dialect in _dialect_registries(context.engine): for exc in (context.sqlalchemy_exception, context.original_exception): for super_ in exc.__class__.__mro__: if super_ in per_dialect: regexp_reg = per_dialect[super_] for fn, regexp in regexp_reg: match = regexp.match(exc.args[0]) if match: fn( exc, match, context.engine.dialect.name, context.is_disconnect, ) if __name__ == '__main__': from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import Session from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class Company(Base): __tablename__ = "my_table" id = Column(Integer(), primary_key=True) name = Column(String(128), unique=True, nullable=False) slug = Column(String(128), unique=True, nullable=False) def __init__(self, name, slug): self.name = name self.slug = slug e = create_engine("sqlite://", echo=True) Base.metadata.create_all(e) event.listen(e, "handle_error", handler) s = Session(e) s.add(Company("Test", "test")) s.commit() s.add(Company("Test", "test")) s.commit()
运行它,我们看到:
2019-03-13 09:44:51,701 INFO sqlalchemy.engine.base.Engine INSERT INTO my_table (name, slug) VALUES (?, ?) 2019-03-13 09:44:51,701 INFO sqlalchemy.engine.base.Engine ('Test', 'test') 2019-03-13 09:44:53,387 INFO sqlalchemy.engine.base.Engine ROLLBACK Traceback (most recent call last): # ... sqlite3.IntegrityError: UNIQUE constraint failed: my_table.slug The above exception was the direct cause of the following exception: Traceback (most recent call last): # ... __main__.DuplicateKeyError: Duplicate key for columns ['slug']