jobqueue.cpp 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. #include "utils.h"
  2. #include "lfqueue.h"
  3. #include "jobqueue.h"
  4. #include "pipequeue.h"
  5. #include <util/thread/factory.h>
  6. #include <util/generic/singleton.h>
  7. #include <util/system/thread.h>
  8. using namespace NNeh;
  9. namespace {
  10. class TExecThread: public IThreadFactory::IThreadAble, public IJob {
  11. public:
  12. TExecThread()
  13. : T_(SystemThreadFactory()->Run(this))
  14. {
  15. }
  16. ~TExecThread() override {
  17. Enqueue(this);
  18. T_->Join();
  19. }
  20. inline void Enqueue(IJob* job) {
  21. Q_.Enqueue(job);
  22. }
  23. private:
  24. void DoRun(TCont* c) override {
  25. c->Executor()->Abort();
  26. }
  27. void DoExecute() override {
  28. SetHighestThreadPriority();
  29. TContExecutor e(RealStackSize(20000));
  30. e.Execute<TExecThread, &TExecThread::Dispatcher>(this);
  31. }
  32. inline void Dispatcher(TCont* c) {
  33. IJob* job;
  34. while ((job = Q_.Dequeue(c))) {
  35. try {
  36. c->Executor()->Create(*job, "job");
  37. } catch (...) {
  38. (*job)(c);
  39. }
  40. }
  41. }
  42. typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
  43. TOneConsumerPipeQueue<IJob> Q_;
  44. IThreadRef T_;
  45. };
  46. class TJobScatter: public IJobQueue {
  47. public:
  48. inline TJobScatter() {
  49. for (size_t i = 0; i < 2; ++i) {
  50. E_.push_back(new TExecThread());
  51. }
  52. }
  53. void ScheduleImpl(IJob* job) override {
  54. E_[TThread::CurrentThreadId() % E_.size()]->Enqueue(job);
  55. }
  56. private:
  57. typedef TAutoPtr<TExecThread> TExecThreadRef;
  58. TVector<TExecThreadRef> E_;
  59. };
  60. }
  61. IJobQueue* NNeh::JobQueue() {
  62. return Singleton<TJobScatter>();
  63. }