class BaseRateLimiter(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self, rate):
self.rate = rate
@abc.abstractmethod
def acquire(self, count):
return
class ThreadingRateLimiter(BaseRateLimiter):
def __init__(self, rate):
BaseRateLimiter.__init__(self, rate)
self.queue = Queue.Queue()
threading.Thread(target=self._clear_queue).start()
def acquire(self, count=1):
self.queue.put(1, block=False)
return self.queue.qsize() < self.rate
def _clear_queue(self):
while 1:
time.sleep(1)
self.queue.queue.clear()
class DistributeRateLimiter(BaseRateLimiter):
def __init__(self, rate, cache):
BaseRateLimiter.__init__(self, rate)
self.cache = cache
def acquire(self, count=1, expire=3, key=None, callback=None):
try:
if isinstance(self.cache, Cache):
return self.cache.fetchToken(rate=self.rate, count=count, expire=expire, key=key)
except Exception, ex:
return True
class Cache(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self):
self.key = "DEFAULT"
self.namespace = "RATELIMITER"
@abc.abstractmethod
def fetchToken(self, rate, key=None):
return
class RedisTokenCache(Cache):
def __init__(self, host, port, db=0, password=None, max_connections=None):
Cache.__init__(self)
self.redis = redis.Redis(
connection_pool=
redis.ConnectionPool(
host=host, port=port, db=db,
password=password,
max_connections=max_connections
))
def fetchToken(self, rate=100, count=1, expire=3, key=None):
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
key = ":".join([self.namespace, key if key else self.key, date])
try:
current = self.redis.get(key)
if int(current if current else "0") > rate:
raise Exception("to many requests in current second: %s" % date)
else:
with self.redis.pipeline() as p:
p.multi()
p.incr(key, count)
p.expire(key, int(expire if expire else "3"))
p.execute()
return True
except Exception, ex:
return False
limiter = ThreadingRateLimiter(rate=10000)
def job():
while 1:
if not limiter.acquire():
print '限流'
else:
print '正常'
threads = [threading.Thread(target=job) for i in range(10)]
for thread in threads:
thread.start()
token_cache = RedisTokenCache(host='10.93.84.53', port=6379, password='bigdata123')
limiter = DistributeRateLimiter(rate=10000, cache=token_cache)
r = redis.Redis(connection_pool=redis.ConnectionPool(host='10.93.84.53', port=6379, password='bigdata123'))
def job():
while 1:
if not limiter.acquire():
print '限流'
else:
print '正常'
threads = [multiprocessing.Process(target=job) for i in range(10)]
for thread in threads:
thread.start()
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有