Sqlalchemy Python ORM(Object Relational Mapping) 使用介绍
介绍
Sqlalchemy API 分新旧两种:
1.3 及之前的老 API
1.4 及之后的 2.0 新 API
| 概念 |
对应数据库 |
说明 |
| Engine |
连接 |
驱动引擎 |
| Session |
连接池,事务 |
由此开始查询 |
| Model |
表 |
类定义 |
| Column |
列 |
|
| Query |
若干行 |
可以链式添加多个条件 |
| 数据类型 |
数据库数据类型 |
python 数据类型 |
说明 |
| Integer |
int |
int |
整形,32 位 |
| String |
varchar |
string |
字符串 |
| Text |
text |
string |
长字符串 |
| Float |
float |
float |
浮点型 |
| Boolean |
tinyint |
bool |
True / False |
| Date |
date |
datetime.date |
存储时间年月日 |
| DateTime |
datetime |
datetime.datetime |
存储年月日时分秒毫秒等 |
| Time |
time |
datetime.datetime |
存储时分秒 |
使用步骤
- 导入 SQLAlchemy
- 创建类,定义表名和字段名
- 初始化数据库链接:
create_engine('数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名')
- 创建 session:
sessionmaker(bind=engine)
session.add(对象)
session.commit()
session.close()
安装
pip install sqlalchemy
create_engine
from sqlalchemy import create_engine
engine = create_engine(
DATABASE_URI,
# connect_args={"ssl": {"key": SQLALCHEMY_DATABASE_PEM}},
echo=True,
pool_size=8,
pool_recycle=60*30,)
说明:
echo 为 True 时打印 sql 语句,一般与 debug 配合使用
pool_size 连接池的大小,默认为 5 个,设置为 0 时表示连接无限制
pool_recycle 设置数据库多久没连接自动断开
模型
class User(Base):
"""User account."""
__tablename__ = "user"
id = Column(Integer, primary_key=True, autoincrement="auto")
username = Column(String(255), unique=True, nullable=False)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, onupdate=func.now())
def __repr__(self):
return "<User %r>" % self.username
__repr__ 的作用
__repr__ 的主要目的是为开发人员提供调试和日志记录的便利。它的输出应该尽可能清晰、无歧义,并且理想情况下,是一个有效的 Python 表达式,你可以直接将其复制粘贴到代码中来重新创建该对象。
当你在交互式会话中打印一个对象,或者在列表、字典等容器中查看对象时,Python 默认会调用对象的 __repr__ 方法来获取其字符串表示。
与 __str__ 的区别
__repr__ 常常与另一个特殊方法 __str__ 进行比较。
__repr__:用于开发人员。它返回一个精确、明确的字符串,其目标是无歧义。
__str__:用于普通用户。它返回一个可读性高、友好的字符串,其目标是可读性。
举个例子:
import datetime
now = datetime.datetime.now()
# __repr__ 的输出:适合开发人员,精确到微秒,格式严谨
print(repr(now))
# 例如: datetime.datetime(2016, 05, 07, 11, 42, 6, 888777)
# __str__ 的输出:适合普通用户,更易读
print(str(now))
# 例如: 2016-05-07 11:42:06.888777
如果一个类没有定义 __str__ 方法,但定义了 __repr__ 方法,那么 str() 函数和 print() 函数会退而求其次地使用 __repr__ 的输出来作为字符串表示。
生成数据表
Base.metadata.create_all(engine)
操作数据
from sqlalchemy.orm import sessionmaker
# Create database session
Session = sessionmaker(bind=engine)
session = Session()
session 的常见操作:
-
flush 预提交,未写入数据库中
-
commit 提交事务
-
rollback 回滚事务
-
close 关闭事务
-
增加
add_user = Users("test")
session.add(add_user)
session.commit()
| filter |
filter_by |
| 比较运算符,相等比较用== |
只能使用"=","!=“和”><" |
| 类名.属性名 |
属性名 |
| 类名.属性名 |
参数是**kwargs,支持组合查询 |
| 支持 and,or 和 in 等 |
|
existing_user = (
session.query(User).filter(User.username == user.username).first()
)
session.query(Users).filter_by(id=1).update({'username': "xianbin"})
# 或
users = session.query(Users).filter_by(username="test").first()
users.username = "xianbin"
session.add(users)
session.delete(user) # Delete the user
session.commit() # Commit the change
# 或
session.query(Users).filter(Users.username == "test").delete()
session.commit()
源码
原生 SQL
from sqlalchemy import create_engine
from sqlalchemy import text
engine = create_engine('sqlite:///:memory:')
result = engine.execute(
text("SELECT * FROM users WHERE age >= :age"), {'age': 21})
for row in result:
print(row)
from sqlalchemy.orm import Session
session = Session(bind=engine)
result = session.execute(text("SELECT * FROM users WHERE age >= :age"), {'age': 21})
for row in result:
print(row)
with session.begin():
session.execute(
text("UPDATE users SET age = :new_age WHERE age = :old_age"),
{'new_age': 30, 'old_age': 20}
)
F&Q
ModuleNotFoundError: No module named ‘MySQLdb’
在 python2.x 中用 mysqldb,python3.x 中使用 pymysql 代替
import pymysql
pymysql.install_as_MySQLdb()
TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30
修改 pool_size 和 max_overflow 的大小
from sqlalchemy import create_engine
engine = create_engine(
"mysql://" + loadConfigVar("user") + ":" + loadConfigVar("password") + "@" + loadConfigVar("host") + "/" + loadConfigVar("schema"),
echo=False,
pool_size=20,
pool_recycle=60*30,
max_overflow=50)
Can’t reconnect until invalid transaction is rolled back. (Background on this error at: http://sqlalche.me/e/14/8s2b)
- 原因:连接断开后,事务没有回滚,残留的锁导致后续的查询报错
- 解决:在所有的数据库操作的时候捕捉异常进行事务的回滚
from models import OrderInfo
from sqlalchemy.exc import InvalidRequestError
try:
order = OrderInfo.query.filter_by(task_id=user_dict.get('task_id')).first()
order.status = 'COMPLETE'
db.session.commit()
except InvalidRequestError:
db.session.rollback()
except Exception as e:
print(e)