class ThreadPool:
"""A thread pool, distributing work requests and collecting results.
See the module docstring for more information.
"""
def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
pass
def createWorkers(self, num_workers, poll_timeout=5):
pass
def dismissWorkers(self, num_workers, do_join=False):
pass
def joinAllDismissedWorkers(self):
pass
def putRequest(self, request, block=True, timeout=None):
pass
def poll(self, block=False):
pass
def wait(self):
pass
task_pool=threadpool.ThreadPool(num_works)
def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
"""Set up the thread pool and start num_workers worker threads.
``num_workers`` is the number of worker threads to start initially.
If ``q_size > 0`` the size of the work *request queue* is limited and
the thread pool blocks when the queue is full and it tries to put
more work requests in it (see ``putRequest`` method), unless you also
use a positive ``timeout`` value for ``putRequest``.
If ``resq_size > 0`` the size of the *results queue* is limited and the
worker threads will block when the queue is full and they try to put
new results in it.
.. warning:
If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
the possibilty of a deadlock, when the results queue is not pulled
regularly and too many jobs are put in the work requests queue.
To prevent this, always set ``timeout > 0`` when calling
``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
"""
self._requests_queue = Queue.Queue(q_size)#任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中
self._results_queue = Queue.Queue(resq_size)#字典,任务对应的任务执行结果</span>
self.workers = []#工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中
self.dismissedWorkers = []#被设置线程事件并且没有被join的工作线程
self.workRequests = {}#字典,记录任务被分配到哪个工作线程中</span>
self.createWorkers(num_workers, poll_timeout)
def createWorkers(self, num_workers, poll_timeout=5):
"""Add num_workers worker threads to the pool.
``poll_timout`` sets the interval in seconds (int or float) for how
ofte threads should check whether they are dismissed, while waiting for
requests.
"""
for i in range(num_workers):
self.workers.append(WorkerThread(self._requests_queue,
self._results_queue, poll_timeout=poll_timeout))
class WorkerThread(threading.Thread):
"""Background thread connected to the requests/results queues.
A worker thread sits in the background and picks up work requests from
one queue and puts the results in another until it is dismissed.
"""
def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
"""Set up thread in daemonic mode and start it immediatedly.
``requests_queue`` and ``results_queue`` are instances of
``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a
new worker thread.
"""
threading.Thread.__init__(self, **kwds)
self.setDaemon(1)#
self._requests_queue = requests_queue#任务队列
self._results_queue = results_queue#任务结果队列
self._poll_timeout = poll_timeout#run函数中从任务队列中get任务时的超时时间,如果超时则继续while(true);
self._dismissed = threading.Event()#线程事件,如果set线程事件则run会执行break,直接退出工作线程;
self.start()
def run(self):
"""Repeatedly process the job queue until told to exit."""
while True:
if self._dismissed.isSet():#如果设置了self._dismissed则退出工作线程
# we are dismissed, break out of loop
break
# get next work request. If we don't get a new request from the
# queue after self._poll_timout seconds, we jump to the start of
# the while loop again, to give the thread a chance to exit.
try:
request = self._requests_queue.get(True, self._poll_timeout)
except Queue.Empty:#尝从任务 队列self._requests_queue 中get任务,如果队列为空,则continue
continue
else:
if self._dismissed.isSet():#检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程
# we are dismissed, put back request in queue and exit loop
self._requests_queue.put(request)
break
try:<span style="color:#如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中
result = request.callable(*request.args, **request.kwds)
self._results_queue.put((request, result))
except:
request.exception = True
self._results_queue.put((request, sys.exc_info()))#如果任务处理函数出现异常,则将异常压入到队列中
def dismiss(self):</span>
"""Sets a flag to tell the thread to exit when done with current job.
"""
self._dismissed.set()
# utility functions
def makeRequests(callable_, args_list, callback=None,
exc_callback=_handle_thread_exception):
"""Create several work requests for same callable with different arguments.
Convenience function for creating several work requests for the same
callable where each invocation of the callable receives different values
for its arguments.
``args_list`` contains the parameters for each invocation of callable.
Each item in ``args_list`` should be either a 2-item tuple of the list of
positional arguments and a dictionary of keyword arguments or a single,
non-tuple argument.
See docstring for ``WorkRequest`` for info on ``callback`` and
``exc_callback``.
"""
requests = []
for item in args_list:
if isinstance(item, tuple):
requests.append(
WorkRequest(callable_, item[0], item[1], callback=callback,
exc_callback=exc_callback)
)
else:
requests.append(
WorkRequest(callable_, [item], None, callback=callback,
exc_callback=exc_callback)
)
return requests
class WorkRequest:
"""A request to execute a callable for putting in the request queue later.
See the module function ``makeRequests`` for the common case
where you want to build several ``WorkRequest`` objects for the same
callable but with different arguments for each call.
"""
def __init__(self, callable_, args=None, kwds=None, requestID=None,
callback=None, exc_callback=_handle_thread_exception):
"""Create a work request for a callable and attach callbacks.
A work request consists of the a callable to be executed by a
worker thread, a list of positional arguments, a dictionary
of keyword arguments.
A ``callback`` function can be specified, that is called when the
results of the request are picked up from the result queue. It must
accept two anonymous arguments, the ``WorkRequest`` object and the
results of the callable, in that order. If you want to pass additional
information to the callback, just stick it on the request object.
You can also give custom callback for when an exception occurs with
the ``exc_callback`` keyword parameter. It should also accept two
anonymous arguments, the ``WorkRequest`` and a tuple with the exception
details as returned by ``sys.exc_info()``. The default implementation
of this callback just prints the exception info via
``traceback.print_exception``. If you want no exception handler
callback, just pass in ``None``.
``requestID``, if given, must be hashable since it is used by
``ThreadPool`` object to store the results of that work request in a
dictionary. It defaults to the return value of ``id(self)``.
"""
if requestID is None:
self.requestID = id(self)
else:
try:
self.requestID = hash(requestID)
except TypeError:
raise TypeError("requestID must be hashable.")
self.exception = False
self.callback = callback
self.exc_callback = exc_callback
self.callable = callable_
self.args = args or []
self.kwds = kwds or {}
def __str__(self):
return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
(self.requestID, self.args, self.kwds, self.exception)
def putRequest(self, request, block=True, timeout=None): """Put work request into work queue and save its id for later.""" assert isinstance(request, WorkRequest) # don't reuse old work requests assert not getattr(request, 'exception', None) self._requests_queue.put(request, block, timeout) self.workRequests[request.requestID] = request
try: request = self._requests_queue.get(True, self._poll_timeout) except Queue.Empty:#尝从任务 队列self._requests_queue 中get任务,如果队列为空,则continue continue
def run(self):
"""Repeatedly process the job queue until told to exit."""
while True:
if self._dismissed.isSet():#如果设置了self._dismissed则退出工作线程
# we are dismissed, break out of loop
break
# get next work request. If we don't get a new request from the
# queue after self._poll_timout seconds, we jump to the start of
# the while loop again, to give the thread a chance to exit.
try:
request = self._requests_queue.get(True, self._poll_timeout)
except Queue.Empty:#尝从任务 队列self._requests_queue 中get任务,如果队列为空,则continue
continue
else:
if self._dismissed.isSet():#检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程
# we are dismissed, put back request in queue and exit loop
self._requests_queue.put(request)
break
try:#如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中
result = request.callable(*request.args, **request.kwds)
self._results_queue.put((request, result))
except:
request.exception = True
self._results_queue.put((request, sys.exc_info()))#如果任务处理函数出现异常,则将异常压入到队列中
def wait(self):
"""Wait for results, blocking until all have arrived."""
while 1:
try:
self.poll(True)
except NoResultsPending:
break
def poll(self, block=False):
"""Process any new results in the queue."""
while True:
# still results pending?
if not self.workRequests:
raise NoResultsPending
# are there still workers to process remaining requests?
elif block and not self.workers:
raise NoWorkersAvailable
try:
# get back next results
request, result = self._results_queue.get(block=block)
# has an exception occured?
if request.exception and request.exc_callback:
request.exc_callback(request, result)
# hand results to callback, if any
if request.callback and not \
(request.exception and request.exc_callback):
request.callback(request, result)
del self.workRequests[request.requestID]
except Queue.Empty:
break
def dismissWorkers(self, num_workers, do_join=False):
"""Tell num_workers worker threads to quit after their current task."""
dismiss_list = []
for i in range(min(num_workers, len(self.workers))):
worker = self.workers.pop()
worker.dismiss()
dismiss_list.append(worker)
if do_join:
for worker in dismiss_list:
worker.join()
else:
self.dismissedWorkers.extend(dismiss_list)
def joinAllDismissedWorkers(self):
"""Perform Thread.join() on all worker threads that have been dismissed.
"""
for worker in self.dismissedWorkers:
worker.join()
self.dismissedWorkers = []
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有