参考 http://www.cnblogs.com/wupeiqi/articles/8259356.html
概述
作用
1,提供简单的规则
2,自动将类转换成SQL语句并执行两种设计模型
DBfirst:手动创建数据库以及表 --> ORM框架 --> 自动生成表
CODEfirst:手动创建类、数据库 --> ORM框架 --> 自动生成表 ---> SQLAlchemy连接数据库
conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"ngine = create_engine(conn, max_overflow=5) # 连接mysql
表操作
from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineBase = declarative_base()class Type(Base): __tablename__ = 'type' tid = Column(Integer, primary_key=True, autoincrement=True) tname = Column(String(32))class User(Base): __tablename__ = 'users' # 表名 id = Column(Integer, primary_key=True) # 主键 name = Column(String(32)) email = Column(String(32)) type_id = Column(Integer, ForeignKey('type.tid')) # 外键 # __table_args__ = ( # 联合 # UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一 # # Index('ix_id_name', 'name', 'extra') # 联合索引 # )conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"def create_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.create_all(engine) # 找到继承了Base的所有的类,创建表 --> 全建表def drop_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.drop_all(engine) # 找到继承了Base的所有的类,删除表 --> 全删表create_table(conn)
autuincrement = True # 自增nullable = True # 不为空default = ' ' # 默认值index = True # 索引unique = True # 唯一索引
数据行操作
增删改查
from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineBase = declarative_base()class Type(Base): __tablename__ = 'type' tid = Column(Integer, primary_key=True, autoincrement=True) tname = Column(String(32))class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) email = Column(String(32)) type_id = Column(Integer, ForeignKey('type.tid'))conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"def create_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.create_all(engine) # 找到继承了Base的所有的类,创建表def drop_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.drop_all(engine) # 找到继承了Base的所有的类,删除表# create_table(conn)engine = create_engine(conn, max_overflow=5)Session = sessionmaker(bind=engine)session = Session() # 获取一个连接"""增加一条用户"""# obj1 = Type(tname='普通用户')# session.add(obj1)"""增加多条用户"""# objs = [# Type(tname='超级用户'),# Type(tname='白金用户'),# Type(tname='黑金用户'),# ]# session.add_all(objs)"""查全部"""# type_list = session.query(Type).all()# for row in type_list:# print(row.tid, row.tname)"""查符合条件的"""# type_list = session.query(Type).filter(Type.tid > 2)# for row in type_list:# print(row.tid, row.tname)"""查某个字段[列]"""# type_list = session.query(Type.tname).all()# for row in type_list:# print(row.tname)"""删【先查到再删】"""# session.query(Type).filter(Type.tid > 2).delete()"""普通改【先查到再改】"""# session.query(Type).filter(Type.tid > 0).update({'tname': '白金'})"""在原有基础上改【字符串类型】"""session.query(Type).filter(Type.tid > 0).update({Type.tname: Type.tname + 'X'}, synchronize_session = False)"""在原有基础上改【数字类型类型】"""# session.query(Type).filter(Type.tid > 0).update({'num': Type.num+1}, synchronize_session = 'evaluate') # 字段没有创建num字段,无法测试session.commit() # 提交session.close() # 关闭连接
- 增: """增加一条用户""" obj1 = Type(tname='普通用户') session.add(obj1) """增加多条用户""" objs = [ Type(tname='超级用户'), Type(tname='白金用户'), Type(tname='黑金用户'), ] session.add_all(objs) - 删: """删【先查到再删】""" session.query(Type).filter(Type.tid > 2).delete() - 改: """普通改【先查到再改】""" session.query(Type).filter(Type.tid > 0).update({ 'tname':'白金'}) """在原有基础上改【字符串类型】""" session.query(Type).filter(Type.tid > 0).update({Type.tname: Type.tname + 'X'}, synchronize_session = False) """在原有基础上改【数字类型类型】""" session.query(Type).filter(Type.tid > 0).update({ 'num': Type.num+1}, synchronize_session = 'evaluate') # 字段没有创建num字段,无法测试- 查: """查全部""" type_list = session.query(Type) for row in type_list: print(row.tid, row.tname) """查符合条件的""" type_list = session.query(Type).filter(Type.tid > 0 ) for row in type_list: print(row.tid, row.tname) """查某个字段[列]""" # type_list = session.query(Type.tname) for row in type_list: print(row.tname)
查的扩展
from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_enginefrom sqlalchemy import and_, or_Base = declarative_base()class Type(Base): __tablename__ = 'type' tid = Column(Integer, primary_key=True, autoincrement=True) tname = Column(String(32))class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) email = Column(String(32)) type_id = Column(Integer, ForeignKey('type.tid'))conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"def create_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.create_all(engine) # 找到继承了Base的所有的类,创建表def drop_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.drop_all(engine) # 找到继承了Base的所有的类,删除表engine = create_engine(conn, max_overflow=5)Session = sessionmaker(bind=engine)session = Session() # 获取一个连接"""查全部"""# type_list = session.query(Type)# for row in type_list:# print(row.tid, row.tname)"""查符合条件的"""# type_list = session.query(Type).filter(Type.tid > 0 )# for row in type_list:# print(row.tid, row.tname)"""查某个字段[列]"""# type_list = session.query(Type.tname)# for row in type_list:# print(row.tname)"""查询符合条件的第一个"""# ret = session.query(Users).filter_by(name='alex').first()# print(ret.name, ret.email, ret.type_id)"""and"""# ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric')# for row in ret:# print(row.name)"""between"""# ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric')# for row in ret:# print(row.id, row.name)"""in"""# ret = session.query(Users).filter(Users.id.in_([1, 3, 4]))# for row in ret:# print(row.id, row.name)"""not in"""# ret = session.query(Users).filter(~Users.id.in_([1, 3, 4]))# for row in ret:# print(row.id, row.name)"""子查询"""# ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric')))# for row in ret:# print(row.id, row.name)""" and or """# from sqlalchemy import and_, or_# ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric'))# for row in ret:# print(row.id, row.name)# ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric'))# for row in ret:# print(row.id, row.name)# ret = session.query(Users).filter(# or_(# Users.id < 2,# and_(Users.name == 'eric', Users.id > 3),# Users.email != ""# ))# for row in ret:# print(row.id, row.name)"""通配符"""# ret = session.query(Users).filter(Users.name.like('e%'))# for row in ret:# print(row.id, row.name)# ret = session.query(Users).filter(~Users.name.like('e%')) # not like# for row in ret:# print(row.id, row.name)"""类似limit,但是通过切片取值"""# ret = session.query(Users)[1:3]# for row in ret:# print(row.id, row.name)"""排序"""# ret = session.query(Users).order_by(Users.id.desc())# for row in ret:# print(row.id, row.name)# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc())# for row in ret:# print(row.id, row.name)"""分组"""from sqlalchemy.sql import func# ret = session.query(Users.name).group_by(Users.name)# for row in ret:# # print(row.name)# print(row[0])# ret = session.query(# Users.name,# func.max(Users.id),# func.sum(Users.id),# func.min(Users.id)).group_by(Users.name)# print(ret)# for row in ret: # row 是一个元祖# print(row[0], row[1], row[2], row[3])# ret = session.query(# func.max(Users.id),# func.sum(Users.id),# func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2) # 对聚合函数的返回值做第二次使用# for row in ret:# print(row[0], row[1], row[2])"""连表"""# ret = session.query(Users).join(Type) # inner join# print(ret)# for row in ret:# print(row)# result = session.query(Users).join(Type, isouter=True) # left join# print(result)# for row in result:# print(row.name)"""子查询"""# 【一】 select * from user where user.id in (子查询)# q1 = session.query(Users.id).filter_by(name='eric') # 查找到名字是eric的用户id 的【子查询】# ret = session.query(Users).filter(Users.id.in_(q1))# for row in ret:# print(row.id, row.name)# 【二】 select * from (子查询)# q2 = session.query(Type).filter(Type.tid > 1).subquery() # 查找到type的tid大于1的所有信息 的【子查询】# ret = session.query(q2)# for row in ret:# print(row.tid, row.tname)# 【三*****】 select users.id,users.name,(子查询) from usersq3 = session.query(Type.tname).filter(Users.type_id == Type.tid).as_scalar() # 选出tid == type_id 的 tnameret = session.query(Users.id, Users.name, q3)for row in ret: print(row)
"""查全部"""type_list = session.query(Type)for row in type_list: print(row.tid, row.tname) """查符合条件的"""type_list = session.query(Type).filter(Type.tid > 0 )for row in type_list: print(row.tid, row.tname) """查某个字段[列]"""# type_list = session.query(Type.tname)for row in type_list: print(row.tname)"""查询符合条件的第一个"""ret = session.query(Users).filter_by(name='alex').first()print(ret.name, ret.email, ret.type_id)"""and"""ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric')for row in ret: print(row.name)"""between"""ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric')for row in ret: print(row.id, row.name)"""in"""ret = session.query(Users).filter(Users.id.in_([1, 3, 4]))for row in ret: print(row.id, row.name)"""not in"""ret = session.query(Users).filter(~Users.id.in_([1, 3, 4]))for row in ret: print(row.id, row.name)"""子查询"""ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric')))for row in ret: print(row.id, row.name)""" and or """from sqlalchemy import and_, or_ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric'))for row in ret: print(row.id, row.name)ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric'))for row in ret: print(row.id, row.name)ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.email != "" ))for row in ret: print(row.id, row.name)"""通配符"""ret = session.query(Users).filter(Users.name.like('e%'))for row in ret: print(row.id, row.name)ret = session.query(Users).filter(~Users.name.like('e%')) not likefor row in ret: print(row.id, row.name)"""类似limit,但是通过切片取值"""ret = session.query(Users)[1:3]for row in ret: print(row.id, row.name)"""排序"""ret = session.query(Users).order_by(Users.id.desc())for row in ret: print(row.id, row.name)ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc())for row in ret: print(row.id, row.name)"""分组"""from sqlalchemy.sql import funcret = session.query(Users.name).group_by(Users.name)for row in ret: print(row.name) print(row[0])ret = session.query( Users.name, func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name)print(ret)for row in ret: # row 是一个元祖 print(row[0], row[1], row[2], row[3])ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2) # 对聚合函数的返回值做第二次使用for row in ret: print(row[0], row[1], row[2])"""连表"""ret = session.query(Users).join(Type) # inner joinprint(ret)for row in ret: print(row)result = session.query(Users).join(Type, isouter=True) # left joinprint(result)for row in result: print(row.name)"""子查询"""# 【一】 select * from user where user.id in (子查询)q1 = session.query(Users.id).filter_by(name='eric') # 查找到名字是eric的用户id 的【子查询】ret = session.query(Users).filter(Users.id.in_(q1))for row in ret: print(row.id, row.name)# 【二】 select * from (子查询)q2 = session.query(Type).filter(Type.tid > 1).subquery() # 查找到type的tid大于1的所有信息 的【子查询】ret = session.query(q2)for row in ret: print(row.tid, row.tname)# 【*****三】 select users.id,users.name,(子查询) from usersq3 = session.query(Type.tname).filter(Users.type_id == Type.tid).as_scalar() # 选出tid == type_id 的 tnameret = session.query(Users.id, Users.name, q3)for row in ret: print(row)
常用操作
from sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom models import Users"""# Model结构Base = declarative_base()class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) depart_id = Column(Integer)"""engine = create_engine( "mysql+pymysql://root:@127.0.0.1:3306/s9day120?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) )SessionFactory = sessionmaker(bind=engine)session = SessionFactory()# 1 指定列# result =session.query(Users.id, Users.name.label('cname')).all()# print(result)# for i in result:# print(i.id, i.cname)# 2 默认条件and# ret = session.query(Users).filter(Users.id >= 1, Users.name == 'alexX').all()# print(ret)# 3 between# ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'helloX').all()# for i in ret:# print(i.id)# 4 in# ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()# ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()# for i in ret:# print(i.id)# 5 子查询# ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='helloX'))).all()# for i in ret:# print(i.id)# 6 and 和 orfrom sqlalchemy import and_, or_# ret = session.query(Users).filter(Users.id < 3, Users.name == 'helloX').all()# ret = session.query(Users).filter(and_(Users.id < 3, Users.name == 'helloX')).all()# ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'helloX')).all()# ret = session.query(Users).filter(# or_(# Users.id > 2,# and_(Users.name == 'alexX', Users.id < 3),# )).all()## for i in ret:# print(i.id)# 7 filter_by# ret = session.query(Users).filter_by(name='alexX').all()# for i in ret:# print(i.id)# 8 通配符# ret = session.query(Users).filter(Users.name.like('A%')).all() # 注意不区分大小写# ret = session.query(Users).filter(Users.name.like('%x')).all() # 注意不区分大小写# ret = session.query(Users).filter(~Users.name.like('A%')).all() # 注意不区分大小写# for i in ret:# print(i.id)# 9 切片# result = session.query(Users)[1:4] # 取第二个到第五个(不包括第五个)记录# for i in result:# print(i.id)# 10 排序# ret = session.query(Users).order_by(Users.name.desc()).all() # 从大到小# ret = session.query(Users).order_by(Users.name.asc(), Users.id.desc()).all() # 从按名字从小到大, 再按id从大到小# for i in ret:# print(i.id, i.name)# 11 group by# from sqlalchemy.sql import func# ret = session.query(# Users.depart_id,# func.count(Users.id),# ).group_by(Users.depart_id).all()## for i in ret:# print(i)# from sqlalchemy.sql import func# ret = session.query(# Users.depart_id,# func.count(Users.id),# ).group_by(Users.depart_id).having(func.count(Users.id) > 1).all()## for i in ret:# print(i)# 12.union 和 union all"""select id,name from usersUNIONselect id,name from users;"""# q1 = session.query(Users.name).filter(Users.id > 2)# q2 = session.query(Users.depart_id).filter(Users.id <= 2)# ret = q1.union(q2).all()# for i in ret:# print(i)# q1 = session.query(Users.name).filter(Users.id > 2)# q2 = session.query(Users.depart_id).filter(Users.id <= 2)# ret = q1.union_all(q2).all()# for i in ret:# print(i)
便利的连表
from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHARfrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy import create_engineBase = declarative_base()class Type(Base): __tablename__ = 'type' tid = Column(Integer, primary_key=True, autoincrement=True) tname = Column(String(32))class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) email = Column(String(32)) type_id = Column(Integer, ForeignKey('type.tid')) users_type = relationship('Type', backref='type_users') # 做关联,更便利的连表conn = "mysql+pymysql://root:@127.0.0.1:3306/db6?charset=utf8"def create_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.create_all(engine) # 找到继承了Base的所有的类,创建表def drop_table(conn): engine = create_engine(conn, max_overflow=5) # 连接mysql Base.metadata.drop_all(engine) # 找到继承了Base的所有的类,删除表engine = create_engine(conn, max_overflow=5)Session = sessionmaker(bind=engine)session = Session() # 获取一个连接"""正向操作"""print('----正向操作----')result_list = session.query(Users)for row in result_list: print(row.name, row.type_id, row.users_type.tname)"""反向操作"""print('----反向操作----')result_list = session.query(Type)for row in result_list: # print(row.tid, row.tname) print('会员等级:%s' % row.tname) for i in row.type_users: print(i.id, i.name, i.type_id)
- 正向操作 result_list = session.query(Users) for row in result_list: print(row.name, row.type_id, row.users_type.tname) - 反向操作 result_list = session.query(Type) for row in result_list: # print(row.tid, row.tname) print('会员等级:%s' % row.tname) for i in row.type_users: print(i.id, i.name, i.type_id)