from peewee import *
db = SqliteDatabase('people.db')
class BaseModel(Model):
class Meta:
database = db # This model uses the "people.db" database.
class Person(BaseModel):
name = CharField()
birthday = DateField()
is_relative = BooleanField()
from playhouse.db_url import connect
from dock.common import config
# url: mysql+pool://root:root@127.0.0.1:3306/appmanage?max_connections=300&stale_timeout=300
mysql_config_url = config_dict.get('config').get('mysql').get('url')
db = connect(url=mysql_config_url)
def _connect(self, *args, **kwargs):
while True:
try:
# Remove the oldest connection from the heap.
ts, conn = heapq.heappop(self._connections) # _connections是连接实例的list(pool)
key = self.conn_key(conn)
except IndexError:
ts = conn = None
logger.debug('No connection available in pool.')
break
else:
if self._is_closed(key, conn):
# This connecton was closed, but since it was not stale
# it got added back to the queue of available conns. We
# then closed it and marked it as explicitly closed, so
# it's safe to throw it away now.
# (Because Database.close() calls Database._close()).
logger.debug('Connection %s was closed.', key)
ts = conn = None
self._closed.discard(key)
elif self.stale_timeout and self._is_stale(ts):
# If we are attempting to check out a stale connection,
# then close it. We don't need to mark it in the "closed"
# set, because it is not in the list of available conns
# anymore.
logger.debug('Connection %s was stale, closing.', key)
self._close(conn, True)
self._closed.discard(key)
ts = conn = None
else:
break
if conn is None:
if self.max_connections and (
len(self._in_use) >= self.max_connections):
raise ValueError('Exceeded maximum connections.')
conn = super(PooledDatabase, self)._connect(*args, **kwargs)
ts = time.time()
key = self.conn_key(conn)
logger.debug('Created new connection %s.', key)
self._in_use[key] = ts # 使用中的数据库连接实例dict
return conn
def _close(self, conn, close_conn=False):
key = self.conn_key(conn)
if close_conn:
self._closed.add(key)
super(PooledDatabase, self)._close(conn) # 关闭数据库连接的方法
elif key in self._in_use:
ts = self._in_use[key]
del self._in_use[key]
if self.stale_timeout and self._is_stale(ts): # 到这里才会判断_in_use中的连接实例是否超时
logger.debug('Closing stale connection %s.', key)
super(PooledDatabase, self)._close(conn) # 超时的话,关闭数据库连接
else:
logger.debug('Returning %s to pool.', key)
heapq.heappush(self._connections, (ts, conn)) # 没有超时的话,放回到pool中
def send_rule():
with db.execution_context():
# A new connection will be opened or, if using a connection pool,
# pulled from the pool of available connections. Additionally, a
# transaction will be started.
for user in get_all_user():
user_id = user['id']
rule = Rule(user_id)
rule_dict = rule.slack_rule(index)
.....do something.....
@app.before_request
def _db_connect():
database.connect()
#
# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.teardown_request
def _db_close(exc):
if not database.is_closed():
database.close()
#
#
# 更优雅的用法:
from playhouse.flask_utils import FlaskDB
from dock_fastgear.model.base import db
#
app = Flask(__name__)
FlaskDB(app, db) # 这样就自动做了上面的事情(具体实现可查看http://docs.peewee-orm.com/en/latest/peewee/playhouse.html?highlight=Flask%20DB#flask-utils)
from playhouse.shortcuts import model_to_dict
from model import HelloGitHub
def read_from_db(input_vol):
content_list = []
category_object_list = HelloGitHub.select(HelloGitHub.category).where(HelloGitHub.vol == input_vol)\
.group_by(HelloGitHub.category).order_by(HelloGitHub.category)
for fi_category_object in category_object_list:
hellogithub = HelloGitHub.select()\
.where((HelloGitHub.vol == input_vol)
& (HelloGitHub.category == fi_category_object.category))\
.order_by(HelloGitHub.create_time)
for fi_hellogithub in hellogithub:
content_list.append(model_to_dict(fi_hellogithub))
return content_list
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有