threadpool.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding: utf-8 -*-
  2. from threading import Thread
  3. from logging import getLogger
  4. from Queue import Queue
  5. from threading import Lock
  6. logger = getLogger(__name__)
  7. class WorkerThread(Thread):
  8. def __init__(self, task_queue, *args, **kwargs):
  9. super(WorkerThread, self).__init__(*args, **kwargs)
  10. self._task_queue = task_queue
  11. self._succ_task_num = 0
  12. self._fail_task_num = 0
  13. self._ret = list()
  14. def run(self):
  15. while True:
  16. func, args, kwargs = self._task_queue.get()
  17. try:
  18. ret = func(*args, **kwargs)
  19. self._succ_task_num += 1
  20. self._ret.append(ret)
  21. except Exception as e:
  22. logger.warn(str(e))
  23. self._fail_task_num += 1
  24. self._ret.append(e)
  25. finally:
  26. self._task_queue.task_done()
  27. def get_result(self):
  28. return self._succ_task_num, self._fail_task_num, self._ret
  29. class SimpleThreadPool:
  30. def __init__(self, num_threads=5):
  31. self._num_threads = num_threads
  32. self._queue = Queue()
  33. self._lock = Lock()
  34. self._active = False
  35. self._workers = list()
  36. self._finished = False
  37. def add_task(self, func, *args, **kwargs):
  38. if not self._active:
  39. with self._lock:
  40. if not self._active:
  41. self._active = True
  42. for i in range(self._num_threads):
  43. w = WorkerThread(self._queue)
  44. self._workers.append(w)
  45. w.start()
  46. self._queue.put((func, args, kwargs))
  47. def wait_completion(self):
  48. self._queue.join()
  49. self._finished = True
  50. def get_result(self):
  51. assert self._finished
  52. detail = [worker.get_result() for worker in self._workers]
  53. succ_all = all([tp[1] == 0 for tp in detail])
  54. return {'success_all': succ_all, 'detail': detail}
  55. if __name__ == '__main__':
  56. pool = SimpleThreadPool(2)
  57. def task_sleep(x):
  58. from time import sleep
  59. sleep(x)
  60. return 'hello, sleep %d seconds' % x
  61. def raise_exception():
  62. raise ValueError("Pa! Exception!")
  63. pool.add_task(task_sleep, 5)
  64. pool.add_task(task_sleep, 2)
  65. pool.add_task(task_sleep, 3)
  66. pool.add_task(raise_exception)
  67. pool.add_task(raise_exception)
  68. pool.wait_completion()
  69. print pool.get_result()
  70. # [(1, 0, ['hello, sleep 5 seconds']), (2, 1, ['hello, sleep 2 seconds', 'hello, sleep 3 seconds', ValueError('Pa! Exception!',)])]