博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MySQL4-SQLAlchemy框架实现
阅读量:4678 次
发布时间:2019-06-09

本文共 18436 字,大约阅读时间需要 61 分钟。

参考         http://www.cnblogs.com/wupeiqi/articles/8259356.html

概述

作用

1,提供简单的规则

2,自动将类转换成SQL语句并执行

两种设计模型

DBfirst:手动创建数据库以及表 --> ORM框架 --> 自动生成表

CODEfirst:手动创建类、数据库 --> ORM框架 --> 自动生成表  --->  SQLAlchemy

连接数据库

conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"ngine = create_engine(conn, max_overflow=5)    # 连接mysql

表操作

from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineBase = declarative_base()class Type(Base):    __tablename__ = 'type'    tid = Column(Integer, primary_key=True, autoincrement=True)    tname = Column(String(32))class User(Base):    __tablename__ = 'users'                                     # 表名    id = Column(Integer, primary_key=True)                      # 主键    name = Column(String(32))    email = Column(String(32))    type_id = Column(Integer, ForeignKey('type.tid'))           # 外键    # __table_args__ = (                                        # 联合    #     UniqueConstraint('id', 'name', name='uix_id_name'),   # 联合唯一    #    #     Index('ix_id_name', 'name', 'extra')                  # 联合索引    # )conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"def create_table(conn):    engine = create_engine(conn, max_overflow=5)    # 连接mysql    Base.metadata.create_all(engine)                # 找到继承了Base的所有的类,创建表 --> 全建表def drop_table(conn):    engine = create_engine(conn, max_overflow=5)    # 连接mysql    Base.metadata.drop_all(engine)                  # 找到继承了Base的所有的类,删除表 --> 全删表create_table(conn)
SQLAlchemy操作表.py
autuincrement = True    # 自增nullable = True            # 不为空default = '  '            # 默认值index = True            # 索引unique = True            # 唯一索引
Column可传参数

数据行操作

增删改查

from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineBase = declarative_base()class Type(Base):    __tablename__ = 'type'    tid = Column(Integer, primary_key=True, autoincrement=True)    tname = Column(String(32))class User(Base):    __tablename__ = 'users'    id = Column(Integer, primary_key=True)    name = Column(String(32))    email = Column(String(32))    type_id = Column(Integer, ForeignKey('type.tid'))conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"def create_table(conn):    engine = create_engine(conn, max_overflow=5)    # 连接mysql    Base.metadata.create_all(engine)                # 找到继承了Base的所有的类,创建表def drop_table(conn):    engine = create_engine(conn, max_overflow=5)    # 连接mysql    Base.metadata.drop_all(engine)                # 找到继承了Base的所有的类,删除表# create_table(conn)engine = create_engine(conn, max_overflow=5)Session = sessionmaker(bind=engine)session = Session()     # 获取一个连接"""增加一条用户"""# obj1 = Type(tname='普通用户')# session.add(obj1)"""增加多条用户"""# objs = [#     Type(tname='超级用户'),#     Type(tname='白金用户'),#     Type(tname='黑金用户'),# ]# session.add_all(objs)"""查全部"""# type_list = session.query(Type).all()# for row in type_list:#     print(row.tid, row.tname)"""查符合条件的"""# type_list = session.query(Type).filter(Type.tid > 2)# for row in type_list:#     print(row.tid, row.tname)"""查某个字段[列]"""# type_list = session.query(Type.tname).all()# for row in type_list:#     print(row.tname)"""删【先查到再删】"""# session.query(Type).filter(Type.tid > 2).delete()"""普通改【先查到再改】"""# session.query(Type).filter(Type.tid > 0).update({'tname': '白金'})"""在原有基础上改【字符串类型】"""session.query(Type).filter(Type.tid > 0).update({Type.tname: Type.tname + 'X'}, synchronize_session = False)"""在原有基础上改【数字类型类型】"""# session.query(Type).filter(Type.tid > 0).update({'num': Type.num+1}, synchronize_session = 'evaluate')  # 字段没有创建num字段,无法测试session.commit()    # 提交session.close()     # 关闭连接
具体代码实现.py
- 增:    """增加一条用户"""    obj1 = Type(tname='普通用户')    session.add(obj1)        """增加多条用户"""    objs = [        Type(tname='超级用户'),        Type(tname='白金用户'),        Type(tname='黑金用户'),    ]    session.add_all(objs)    - 删:    """删【先查到再删】"""    session.query(Type).filter(Type.tid > 2).delete()    - 改:    """普通改【先查到再改】"""    session.query(Type).filter(Type.tid > 0).update({
'tname':'白金'}) """在原有基础上改【字符串类型】""" session.query(Type).filter(Type.tid > 0).update({Type.tname: Type.tname + 'X'}, synchronize_session = False) """在原有基础上改【数字类型类型】""" session.query(Type).filter(Type.tid > 0).update({
'num': Type.num+1}, synchronize_session = 'evaluate') # 字段没有创建num字段,无法测试- 查: """查全部""" type_list = session.query(Type) for row in type_list: print(row.tid, row.tname) """查符合条件的""" type_list = session.query(Type).filter(Type.tid > 0 ) for row in type_list: print(row.tid, row.tname) """查某个字段[列]""" # type_list = session.query(Type.tname) for row in type_list: print(row.tname)
分解代码

查的扩展

from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_enginefrom sqlalchemy import and_, or_Base = declarative_base()class Type(Base):    __tablename__ = 'type'    tid = Column(Integer, primary_key=True, autoincrement=True)    tname = Column(String(32))class Users(Base):    __tablename__ = 'users'    id = Column(Integer, primary_key=True)    name = Column(String(32))    email = Column(String(32))    type_id = Column(Integer, ForeignKey('type.tid'))conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"def create_table(conn):    engine = create_engine(conn, max_overflow=5)    # 连接mysql    Base.metadata.create_all(engine)                # 找到继承了Base的所有的类,创建表def drop_table(conn):    engine = create_engine(conn, max_overflow=5)    # 连接mysql    Base.metadata.drop_all(engine)                # 找到继承了Base的所有的类,删除表engine = create_engine(conn, max_overflow=5)Session = sessionmaker(bind=engine)session = Session()     # 获取一个连接"""查全部"""# type_list = session.query(Type)# for row in type_list:#     print(row.tid, row.tname)"""查符合条件的"""# type_list = session.query(Type).filter(Type.tid > 0 )# for row in type_list:#     print(row.tid, row.tname)"""查某个字段[列]"""# type_list = session.query(Type.tname)# for row in type_list:#     print(row.tname)"""查询符合条件的第一个"""# ret = session.query(Users).filter_by(name='alex').first()# print(ret.name, ret.email, ret.type_id)"""and"""# ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric')# for row in ret:#     print(row.name)"""between"""# ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric')# for row in ret:#     print(row.id, row.name)"""in"""# ret = session.query(Users).filter(Users.id.in_([1, 3, 4]))# for row in ret:#     print(row.id, row.name)"""not in"""# ret = session.query(Users).filter(~Users.id.in_([1, 3, 4]))# for row in ret:#     print(row.id, row.name)"""子查询"""# ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric')))# for row in ret:#     print(row.id, row.name)""" and or """# from sqlalchemy import and_, or_# ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric'))# for row in ret:#     print(row.id, row.name)# ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric'))# for row in ret:#     print(row.id, row.name)# ret = session.query(Users).filter(#     or_(#         Users.id < 2,#         and_(Users.name == 'eric', Users.id > 3),#         Users.email != ""#     ))# for row in ret:#     print(row.id, row.name)"""通配符"""# ret = session.query(Users).filter(Users.name.like('e%'))# for row in ret:#     print(row.id, row.name)# ret = session.query(Users).filter(~Users.name.like('e%'))     # not like# for row in ret:#     print(row.id, row.name)"""类似limit,但是通过切片取值"""# ret = session.query(Users)[1:3]# for row in ret:#     print(row.id, row.name)"""排序"""# ret = session.query(Users).order_by(Users.id.desc())# for row in ret:#     print(row.id, row.name)# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc())# for row in ret:#     print(row.id, row.name)"""分组"""from sqlalchemy.sql import func# ret = session.query(Users.name).group_by(Users.name)# for row in ret:#     # print(row.name)#     print(row[0])# ret = session.query(#     Users.name,#     func.max(Users.id),#     func.sum(Users.id),#     func.min(Users.id)).group_by(Users.name)# print(ret)# for row in ret:   # row 是一个元祖#     print(row[0], row[1], row[2], row[3])# ret = session.query(#     func.max(Users.id),#     func.sum(Users.id),#     func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2)   # 对聚合函数的返回值做第二次使用# for row in ret:#     print(row[0], row[1], row[2])"""连表"""# ret = session.query(Users).join(Type)                         # inner join# print(ret)# for row in ret:#     print(row)# result = session.query(Users).join(Type, isouter=True)        # left join# print(result)# for row in result:#     print(row.name)"""子查询"""# 【一】 select * from user where user.id in (子查询)# q1 = session.query(Users.id).filter_by(name='eric')     # 查找到名字是eric的用户id 的【子查询】# ret = session.query(Users).filter(Users.id.in_(q1))# for row in ret:#     print(row.id, row.name)# 【二】 select * from (子查询)# q2 = session.query(Type).filter(Type.tid > 1).subquery()    # 查找到type的tid大于1的所有信息 的【子查询】# ret = session.query(q2)# for row in ret:#     print(row.tid, row.tname)# 【三*****】 select users.id,users.name,(子查询) from usersq3 = session.query(Type.tname).filter(Users.type_id == Type.tid).as_scalar()   # 选出tid == type_id 的 tnameret = session.query(Users.id, Users.name, q3)for row in ret:    print(row)
查的扩展.py
"""查全部"""type_list = session.query(Type)for row in type_list:    print(row.tid, row.tname)    """查符合条件的"""type_list = session.query(Type).filter(Type.tid > 0 )for row in type_list:    print(row.tid, row.tname)    """查某个字段[列]"""# type_list = session.query(Type.tname)for row in type_list:    print(row.tname)"""查询符合条件的第一个"""ret = session.query(Users).filter_by(name='alex').first()print(ret.name, ret.email, ret.type_id)"""and"""ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric')for row in ret:    print(row.name)"""between"""ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric')for row in ret:    print(row.id, row.name)"""in"""ret = session.query(Users).filter(Users.id.in_([1, 3, 4]))for row in ret:    print(row.id, row.name)"""not in"""ret = session.query(Users).filter(~Users.id.in_([1, 3, 4]))for row in ret:    print(row.id, row.name)"""子查询"""ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric')))for row in ret:    print(row.id, row.name)""" and or """from sqlalchemy import and_, or_ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric'))for row in ret:    print(row.id, row.name)ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric'))for row in ret:    print(row.id, row.name)ret = session.query(Users).filter(    or_(        Users.id < 2,        and_(Users.name == 'eric', Users.id > 3),        Users.email != ""    ))for row in ret:    print(row.id, row.name)"""通配符"""ret = session.query(Users).filter(Users.name.like('e%'))for row in ret:    print(row.id, row.name)ret = session.query(Users).filter(~Users.name.like('e%'))     not likefor row in ret:    print(row.id, row.name)"""类似limit,但是通过切片取值"""ret = session.query(Users)[1:3]for row in ret:    print(row.id, row.name)"""排序"""ret = session.query(Users).order_by(Users.id.desc())for row in ret:    print(row.id, row.name)ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc())for row in ret:    print(row.id, row.name)"""分组"""from sqlalchemy.sql import funcret = session.query(Users.name).group_by(Users.name)for row in ret:    print(row.name)    print(row[0])ret = session.query(    Users.name,    func.max(Users.id),    func.sum(Users.id),    func.min(Users.id)).group_by(Users.name)print(ret)for row in ret:   # row 是一个元祖    print(row[0], row[1], row[2], row[3])ret = session.query(    func.max(Users.id),    func.sum(Users.id),    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2)   # 对聚合函数的返回值做第二次使用for row in ret:    print(row[0], row[1], row[2])"""连表"""ret = session.query(Users).join(Type)                         # inner joinprint(ret)for row in ret:    print(row)result = session.query(Users).join(Type, isouter=True)        # left joinprint(result)for row in result:    print(row.name)"""子查询"""# 【一】 select * from user where user.id in (子查询)q1 = session.query(Users.id).filter_by(name='eric')     # 查找到名字是eric的用户id 的【子查询】ret = session.query(Users).filter(Users.id.in_(q1))for row in ret:    print(row.id, row.name)# 【二】 select * from (子查询)q2 = session.query(Type).filter(Type.tid > 1).subquery()    # 查找到type的tid大于1的所有信息 的【子查询】ret = session.query(q2)for row in ret:    print(row.tid, row.tname)# 【*****三】 select users.id,users.name,(子查询) from usersq3 = session.query(Type.tname).filter(Users.type_id == Type.tid).as_scalar()   # 选出tid == type_id 的 tnameret = session.query(Users.id, Users.name, q3)for row in ret:    print(row)
分解代码

常用操作

from sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom models import Users"""# Model结构Base = declarative_base()class Users(Base):    __tablename__ = 'users'    id = Column(Integer, primary_key=True)    name = Column(String(32))    depart_id = Column(Integer)"""engine = create_engine(        "mysql+pymysql://root:@127.0.0.1:3306/s9day120?charset=utf8",        max_overflow=0,  # 超过连接池大小外最多创建的连接        pool_size=5,  # 连接池大小        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)    )SessionFactory = sessionmaker(bind=engine)session = SessionFactory()# 1 指定列# result =session.query(Users.id, Users.name.label('cname')).all()# print(result)# for i in result:#     print(i.id, i.cname)# 2 默认条件and# ret = session.query(Users).filter(Users.id >= 1, Users.name == 'alexX').all()# print(ret)# 3 between# ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'helloX').all()# for i in ret:#     print(i.id)# 4 in# ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()# ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()# for i in ret:#     print(i.id)# 5 子查询# ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='helloX'))).all()# for i in ret:#     print(i.id)# 6 and 和 orfrom sqlalchemy import and_, or_# ret = session.query(Users).filter(Users.id < 3, Users.name == 'helloX').all()# ret = session.query(Users).filter(and_(Users.id < 3, Users.name == 'helloX')).all()# ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'helloX')).all()# ret = session.query(Users).filter(#     or_(#         Users.id > 2,#         and_(Users.name == 'alexX', Users.id < 3),#     )).all()## for i in ret:#     print(i.id)# 7 filter_by# ret = session.query(Users).filter_by(name='alexX').all()# for i in ret:#     print(i.id)# 8 通配符# ret = session.query(Users).filter(Users.name.like('A%')).all()  # 注意不区分大小写# ret = session.query(Users).filter(Users.name.like('%x')).all()  # 注意不区分大小写# ret = session.query(Users).filter(~Users.name.like('A%')).all()  # 注意不区分大小写# for i in ret:#     print(i.id)# 9 切片# result = session.query(Users)[1:4]  # 取第二个到第五个(不包括第五个)记录# for i in result:#     print(i.id)# 10 排序# ret = session.query(Users).order_by(Users.name.desc()).all()    # 从大到小# ret = session.query(Users).order_by(Users.name.asc(), Users.id.desc()).all()    # 从按名字从小到大, 再按id从大到小# for i in ret:#     print(i.id, i.name)# 11 group by# from sqlalchemy.sql import func# ret = session.query(#         Users.depart_id,#         func.count(Users.id),# ).group_by(Users.depart_id).all()## for i in ret:#     print(i)# from sqlalchemy.sql import func# ret = session.query(#         Users.depart_id,#         func.count(Users.id),# ).group_by(Users.depart_id).having(func.count(Users.id) > 1).all()## for i in ret:#     print(i)# 12.union 和 union all"""select id,name from usersUNIONselect id,name from users;"""# q1 = session.query(Users.name).filter(Users.id > 2)# q2 = session.query(Users.depart_id).filter(Users.id <= 2)# ret = q1.union(q2).all()# for i in ret:#     print(i)# q1 = session.query(Users.name).filter(Users.id > 2)# q2 = session.query(Users.depart_id).filter(Users.id <= 2)# ret = q1.union_all(q2).all()# for i in ret:#     print(i)
常用操作

便利的连表

from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineBase = declarative_base()class Type(Base):    __tablename__ = 'type'    tid = Column(Integer, primary_key=True, autoincrement=True)    tname = Column(String(32))class Users(Base):    __tablename__ = 'users'    id = Column(Integer, primary_key=True)    name = Column(String(32))    email = Column(String(32))    type_id = Column(Integer, ForeignKey('type.tid'))    users_type = relationship('Type', backref='type_users')       # 做关联,更便利的连表conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"def create_table(conn):    engine = create_engine(conn, max_overflow=5)    # 连接mysql    Base.metadata.create_all(engine)                # 找到继承了Base的所有的类,创建表def drop_table(conn):    engine = create_engine(conn, max_overflow=5)    # 连接mysql    Base.metadata.drop_all(engine)                # 找到继承了Base的所有的类,删除表engine = create_engine(conn, max_overflow=5)Session = sessionmaker(bind=engine)session = Session()     # 获取一个连接"""正向操作"""print('----正向操作----')result_list = session.query(Users)for row in result_list:    print(row.name, row.type_id, row.users_type.tname)"""反向操作"""print('----反向操作----')result_list = session.query(Type)for row in result_list:    # print(row.tid, row.tname)    print('会员等级:%s' % row.tname)    for i in row.type_users:        print(i.id, i.name, i.type_id)
便利的连表.py
- 正向操作    result_list = session.query(Users)    for row in result_list:        print(row.name, row.type_id, row.users_type.tname)        - 反向操作    result_list = session.query(Type)    for row in result_list:        # print(row.tid, row.tname)        print('会员等级:%s' % row.tname)        for i in row.type_users:            print(i.id, i.name, i.type_id)
代码分解

 

转载于:https://www.cnblogs.com/sunch/p/9598641.html

你可能感兴趣的文章
hall wrong behavior
查看>>
Collection集合
查看>>
【C++】const在不同位置修饰指针变量
查看>>
github新项目挂历模式
查看>>
编写jquery插件
查看>>
敏捷开发笔记
查看>>
学前班
查看>>
关于自关联1
查看>>
hdu-1814(2-sat)
查看>>
谷歌浏览器,添加默认搜索引擎的搜索地址
查看>>
数据结构化与保存
查看>>
为什么需要Docker?
查看>>
国内5家云服务厂商 HTTPS 安全性测试横向对比
查看>>
how to control project
查看>>
转 python新手容易犯的6个错误
查看>>
第四节 -- 列表
查看>>
决策树
查看>>
团队作业
查看>>
如何避免在简单业务逻辑上面的细节上面出错
查看>>
大型网站高并发的架构演变图-摘自网络
查看>>