| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- # -*- coding: utf-8 -*-
- from threading import Thread
- from logging import getLogger
- from Queue import Queue
- from threading import Lock
- logger = getLogger(__name__)
- class WorkerThread(Thread):
- def __init__(self, task_queue, *args, **kwargs):
- super(WorkerThread, self).__init__(*args, **kwargs)
- self._task_queue = task_queue
- self._succ_task_num = 0
- self._fail_task_num = 0
- self._ret = list()
- def run(self):
- while True:
- func, args, kwargs = self._task_queue.get()
- try:
- ret = func(*args, **kwargs)
- self._succ_task_num += 1
- self._ret.append(ret)
- except Exception as e:
- logger.warn(str(e))
- self._fail_task_num += 1
- self._ret.append(e)
- finally:
- self._task_queue.task_done()
- def get_result(self):
- return self._succ_task_num, self._fail_task_num, self._ret
- class SimpleThreadPool:
- def __init__(self, num_threads=5):
- self._num_threads = num_threads
- self._queue = Queue()
- self._lock = Lock()
- self._active = False
- self._workers = list()
- self._finished = False
- def add_task(self, func, *args, **kwargs):
- if not self._active:
- with self._lock:
- if not self._active:
- self._active = True
- for i in range(self._num_threads):
- w = WorkerThread(self._queue)
- self._workers.append(w)
- w.start()
- self._queue.put((func, args, kwargs))
- def wait_completion(self):
- self._queue.join()
- self._finished = True
- def get_result(self):
- assert self._finished
- detail = [worker.get_result() for worker in self._workers]
- succ_all = all([tp[1] == 0 for tp in detail])
- return {'success_all': succ_all, 'detail': detail}
- if __name__ == '__main__':
- pool = SimpleThreadPool(2)
- def task_sleep(x):
- from time import sleep
- sleep(x)
- return 'hello, sleep %d seconds' % x
- def raise_exception():
- raise ValueError("Pa! Exception!")
- pool.add_task(task_sleep, 5)
- pool.add_task(task_sleep, 2)
- pool.add_task(task_sleep, 3)
- pool.add_task(raise_exception)
- pool.add_task(raise_exception)
- pool.wait_completion()
- print pool.get_result()
- # [(1, 0, ['hello, sleep 5 seconds']), (2, 1, ['hello, sleep 2 seconds', 'hello, sleep 3 seconds', ValueError('Pa! Exception!',)])]
|