from sqlalchemy import create_engine
engine = create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8", echo=True)
# -*- coding: utf-8 -*-
__author__ = 'ghost'
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
# 连接数据库
engine = create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8", echo=True)
# 获取元数据
metadata = MetaData()
# 定义表
user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
Column('fullname', String(40)),
)
address = Table('address', metadata,
Column('id', Integer, primary_key=True),
Column('user_id', None, ForeignKey('user.id')),
Column('email', String(60), nullable=False)
)
# 创建数据表,如果数据表存在,则忽视
metadata.create_all(engine)
# 获取数据库连接
conn = engine.connect()
>>> i = user.insert() # 使用查询
>>> i
<sqlalchemy.sql.dml.Insert object at 0x0000000002637748>
>>> print i # 内部构件的sql语句
INSERT INTO "user" (id, name, fullname) VALUES (:id, :name, :fullname)
>>> u = dict(name='jack', fullname='jack Jone')
>>> r = conn.execute(i, **u) # 执行查询,第一个为查询对象,第二个参数为一个插入数据字典,如果插入的是多个对象,就把对象字典放在列表里面
>>> r
<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EF9390>
>>> r.inserted_primary_key # 返回插入行 主键 id
[4L]
>>> addresses
[{'user_id': 1, 'email': 'jack@yahoo.com'}, {'user_id': 1, 'email': 'jack@msn.com'}, {'user_id': 2, 'email': 'www@www.org'}, {'user_id': 2, 'email': 'wendy@aol.com'}]
>>> i = address.insert()
>>> r = conn.execute(i, addresses) # 插入多条记录
>>> r
<sqlalchemy.engine.result.ResultProxy object at 0x0000000002EB5080>
>>> r.rowcount #返回影响的行数
4L
>>> i = user.insert().values(name='tom', fullname='tom Jim')
>>> i.compile()
<sqlalchemy.sql.compiler.SQLCompiler object at 0x0000000002F6F390>
>>> print i.compile()
INSERT INTO "user" (name, fullname) VALUES (:name, :fullname)
>>> print i.compile().params
{'fullname': 'tom Jim', 'name': 'tom'}
>>> r = conn.execute(i)
>>> r.rowcount
1L
>>> s = select([user]) # 查询 user表 >>> s <sqlalchemy.sql.selectable.Select at 0x25a7748; Select object> >>> print s SELECT "user".id, "user".name, "user".fullname FROM "user"
>>> user.c # 表 user 的字段column对象 <sqlalchemy.sql.base.ImmutableColumnCollection object at 0x0000000002E804A8> >>> print user.c ['user.id', 'user.name', 'user.fullname'] >>> s = select([user.c.name,user.c.fullname]) >>> r = conn.execute(s) >>> r <sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748> >>> r.rowcount # 影响的行数 5L >>> ru = r.fetchall() >>> ru [(u'hello', u'hello world'), (u'Jack', u'Jack Jone'), (u'Jack', u'Jack Jone'), (u'jack', u'jack Jone'), (u'tom', u'tom Jim')] >>> r <sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748> >>> r.closed # 只要 r.fetchall() 之后,就会自动关闭 ResultProxy 对象 True
>>> s = select([user.c.name, address.c.user_id]).where(user.c.id==address.c.user_id) # 使用了字段和字段比较的条件 >>> s <sqlalchemy.sql.selectable.Select at 0x2f03390; Select object> >>> print s SELECT "user".name, address.user_id FROM "user", address WHERE "user".id = address.user_id
>>> print user.c.id == address.c.user_id # 返回一个编译的字符串 "user".id = address.user_id >>> print user.c.id == 7 "user".id = :id_1 # 编译成为带参数的sql 语句片段字符串 >>> print user.c.id != 7 "user".id != :id_1 >>> print user.c.id > 7 "user".id > :id_1 >>> print user.c.id == None "user".id IS NULL >>> print user.c.id + address.c.id # 使用两个整形的变成 + "user".id + address.id >>> print user.c.name + address.c.email # 使用两个字符串 变成 || "user".name || address.email
>>> print and_(
user.c.name.like('j%'),
user.c.id == address.c.user_id,
or_(
address.c.email == 'wendy@aol.com',
address.c.email == 'jack@yahoo.com'
),
not_(user.c.id>5))
"user".name LIKE :name_1 AND "user".id = address.user_id AND (address.email = :email_1 OR address.email = :email_2) AND "user".id <= :id_1
>>>
>>> se_sql = [(user.c.fullname +", " + address.c.email).label('title')]
>>> wh_sql = and_(
user.c.id == address.c.user_id,
user.c.name.between('m', 'z'),
or_(
address.c.email.like('%@aol.com'),
address.c.email.like('%@msn.com')
)
)
>>> print wh_sql
"user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2)
>>> s = select(se_sql).where(wh_sql)
>>> print s
SELECT "user".fullname || :fullname_1 || address.email AS title
FROM "user", address
WHERE "user".id = address.user_id AND "user".name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2)
>>> r = conn.execute(s)
>>> r.fetchall()
>>> text_sql = "SELECT id, name, fullname FROM user WHERE id=:id" # 原始sql语句,参数用( :value)表示 >>> s = text(text_sql) >>> print s SELECT id, name, fullname FROM user WHERE id=:id >>> s <sqlalchemy.sql.elements.TextClause object at 0x0000000002587668> >>> conn.execute(s, id=3).fetchall() # id=3 传递:id参数 [(3L, u'Jack', u'Jack Jone')]
>>> print user.join(address) "user" JOIN address ON "user".id = address.user_id # 因为开启了外键 ,所以join 能只能识别 on 条件 >>> print user.join(address, address.c.user_id==user.c.id) # 手动指定 on 条件 "user" JOIN address ON address.user_id = "user".id >>> s = select([user.c.name, address.c.email]).select_from(user.join(address, user.c.id==address.c.user_id)) # 被jion的sql语句需要用 select_from方法配合 >>> s <sqlalchemy.sql.selectable.Select at 0x2eb63c8; Select object> >>> print s SELECT "user".name, address.email FROM "user" JOIN address ON "user".id = address.user_id >>> conn.execute(s).fetchall() [(u'hello', u'jack@yahoo.com'), (u'hello', u'jack@msn.com'), (u'hello', u'jack@yahoo.com'), (u'hello', u'jack@msn.com'), (u'Jack', u'www@www.org'), (u'Jack', u'wendy@aol.com'), (u'Jack', u'www@www.org'), (u'Jack', u'wendy@aol.com')]
>>> s = select([user.c.name]).order_by(user.c.name) # order_by >>> print s SELECT "user".name FROM "user" ORDER BY "user".name >>> s = select([user]).order_by(user.c.name.desc()) >>> print s SELECT "user".id, "user".name, "user".fullname FROM "user" ORDER BY "user".name DESC >>> s = select([user]).group_by(user.c.name) # group_by >>> print s SELECT "user".id, "user".name, "user".fullname FROM "user" GROUP BY "user".name >>> s = select([user]).order_by(user.c.name.desc()).limit(1).offset(3) # limit(1).offset(3) >>> print s SELECT "user".id, "user".name, "user".fullname FROM "user" ORDER BY "user".name DESC LIMIT :param_1 OFFSET :param_2 [(4L, u'jack', u'jack Jone')]
>>> s = user.update() >>> print s UPDATE "user" SET id=:id, name=:name, fullname=:fullname >>> s = user.update().values(fullname=user.c.name) # values 指定了更新的字段 >>> print s UPDATE "user" SET fullname="user".name >>> s = user.update().where(user.c.name == 'jack').values(name='ed') # where 进行选择过滤 >>> print s UPDATE "user" SET name=:name WHERE "user".name = :name_1 >>> r = conn.execute(s) >>> print r.rowcount # 影响行数 3
>>> s = user.update().where(user.c.name==bindparam('oldname')).values(name=bindparam('newname')) # oldname 与下面的传入的从拿书进行绑定,newname也一样
>>> print s
UPDATE "user" SET name=:newname WHERE "user".name = :oldname
>>> u = [{'oldname':'hello', 'newname':'edd'},
{'oldname':'ed', 'newname':'mary'},
{'oldname':'tom', 'newname':'jake'}]
>>> r = conn.execute(s, u)
>>> r.rowcount
5L
>>> r = conn.execute(address.delete()) # 清空表 >>> print r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EAF550> >>> r.rowcount 8L >>> r = conn.execute(users.delete().where(users.c.name > 'm')) # 删除记录 >>> r.rowcount 3L
pip install flask-sqlalchemy
from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy app = Flask(__name__) # dialect+driver://username:password@host:port/database?charset=utf8 # 配置 sqlalchemy 数据库驱动://数据库用户名:密码@主机地址:端口/数据库?编码 app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:@localhost:3306/sqlalchemy?charset=utf8' # 初始化 db = SQLAlchemy(app)
class User(db.Model):
""" 定义了三个字段, 数据库表名为model名小写
"""
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
email = db.Column(db.String(120), unique=True)
def __init__(self, username, email):
self.username = username
self.email = email
def __repr__(self):
return '<User %r>' % self.username
def save(self):
db.session.add(self)
db.session.commit()
>>> from yourapp import db, User >>> u = User(username='admin', email='admin@example.com') # 创建实例 >>> db.session.add(u) # 添加session >>> db.session.commit() # 提交查询 >>> users = User.query.all() # 查询
>>> u = User(username=u'人世间', email='rsj@example.com') >>> u.save()
class Category(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50))
def __init__(self, name):
self.name = name
def __repr__(self):
return '<Category %r>' % self.name
class Post(db.Model):
""" 定义了五个字段,分别是 id,title,body,pub_date,category_id
"""
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(80))
body = db.Column(db.Text)
pub_date = db.Column(db.String(20))
# 用于外键的字段
category_id = db.Column(db.Integer, db.ForeignKey('category.id'))
# 外键对象,不会生成数据库实际字段
# backref指反向引用,也就是外键Category通过backref(post_set)查询Post
category = db.relationship('Category', backref=db.backref('post_set', lazy='dynamic'))
def __init__(self, title, body, category, pub_date=None):
self.title = title
self.body = body
if pub_date is None:
pub_date = time.time()
self.pub_date = pub_date
self.category = category
def __repr__(self):
return '<Post %r>' % self.title
def save(self):
db.session.add(self)
db.session.commit()
>>> c = Category(name='Python') >>> c <Category 'Python'> >>> c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B58F60> >>> c.post_set.all() [] >>> p = Post(title='hello python', body='python is cool', category=c) >>> p.save() >>> c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B73710> >>> c.post_set.all() # 反向查询 [<Post u'hello python'>] >>> p <Post u'hello python'> >>> p.category <Category u'Python'> # 也可以使用category_id 字段来添加 >>> p = Post(title='hello flask', body='flask is cool', category_id=1) >>> p.save()
post_tag = db.Table('post_tag',
db.Column('post_id', db.Integer, db.ForeignKey('post.id')),
db.Column('tag_id', db.Integer, db.ForeignKey('tag.id'))
)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
# ... 省略
# 定义一个反向引用,tag可以通过 post_set查询到 post的集合
tags = db.relationship('Tag', secondary=post_tag,
backref=db.backref('post_set', lazy='dynamic'))
class Tag(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.String(10), unique=True)
# 定义反向查询
posts = db.relationship('Post', secondary=post_tag,
backref=db.backref('tag_set', lazy='dynamic'))
def __init__(self, content):
self.content = content
def save(self):
db.session.add(self)
db.session.commit()
>>> tag_list = []
>>> tags = ['python', 'flask', 'ruby', 'rails']
>>> for tag in tags:
t = Tag(tag)
tag_list.append(t)
>>> tag_list
[<f_sqlalchemy.Tag object at 0x0000000003B7CF28>, <f_sqlalchemy.Tag object at 0x0000000003B7CF98>, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8>, <f_sqlalchemy.Tag object at 0x0000000003B7CE80>]
>>> p
<Post u'hello python'>
>>> p.tags
[]
>>> p.tags = tag_list # 添加多对多的数据
>>> p.save()
>>> p.tags
[<f_sqlalchemy.Tag object at 0x0000000003B7CF28>, <f_sqlalchemy.Tag object at 0x0000000003B7CF98>, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8>, <f_sqlalchemy.Tag object at 0x0000000003B7CE80>]
>>> p.tag_set # 反向查询
<sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C080>
>>> p.tag_set.all()
[<f_sqlalchemy.Tag object at 0x0000000003B7CF28>, <f_sqlalchemy.Tag object at 0x0000000003B7CF98>, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8>, <f_sqlalchemy.Tag object at 0x0000000003B7CE80>]
>>> t = Tag.query.all()[1]
>>> t
<f_sqlalchemy.Tag object at 0x0000000003B7CF28>
>>> t.content
u'python'
>>> t.posts
[<Post u'hello python'>]
>>> t.post_set
<sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C358>
>>> t.post_set.all()
[<Post u'hello python'>]
self one to one
class Category(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50))
# 父级 id
pid = db.Column(db.Integer, db.ForeignKey('category.id'))
# 父栏目对象
pcategory = db.relationship('Category', uselist=False, remote_side=[id], backref=db.backref('scategory', uselist=False))
def __init__(self, name, pcategory=None):
self.name = name
self.pcategory = pcategory
def __repr__(self):
return '<Category %r>' % self.name
def save(self):
db.session.add(self)
db.session.commit()
>>> p = Category('Python')
>>> p
<Category 'Python'>
>>> p.pid
>>> p.pcategory # 查询父栏目
>>> p.scategory # 查询子栏目
>>> f = Category('Flask', p)
>>> f.save()
>>> f
<Category u'Flask'>
>>> f.pid
1L
>>> f.pcategory # 查询父栏目
<Category u'Python'>
>>> f.scategory # 查询父栏目
>>> p.scategory # 查询子栏目
<Category u'Flask'>
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有