Beatmup
thread_pool.hpp
Go to the documentation of this file.
1 /*
2  Beatmup image and signal processing library
3  Copyright (C) 2019, lnstadrum
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18 
19 #pragma once
20 #include "parallelism.h"
21 #include <deque>
22 
23 
24 namespace Beatmup {
25  class ThreadPool;
26 }
27 
28 
29 /**
30  A pool of threads running tasks
31  ThreadPool runs AbstractTasks, possibly in multiple threads. Every time a task is submitted to the thread pool, it
32  is scheduled for execution and gets attributed a corresponding Job number. Jobs are used to cancel the scheduled
33  runs and track whether they have been conducted or not.
34  Exceptions thrown during the tasks execution in different threads are rethrown in the caller threads.
35 */
37 
38  class TaskThreadImpl : public TaskThread {
39  private:
40  inline void threadFunc() {
41  index == 0 ? pool.managingThreadFunc(*this) : pool.workerThreadFunc(*this);
42  }
43 
44  public:
47  pool(pool), index(index), isRunning(false), isTerminating(false),
49  {}
50 
51  virtual inline ~TaskThreadImpl() {}
52 
54  return pool.currentWorkerCount;
55  }
56 
57  bool isTaskAborted() const {
58  return pool.abortExternally;
59  }
60 
61  void synchronize() {
62  if (numThreads() > 1)
63  pool.synchronizeThread(*this);
64  }
65 
67  ThreadIndex index; //!< current thread index
68  bool isRunning; //!< if not, the thread sleeps
69  bool isTerminating; //!< if `true`, the thread is requested to terminate
70  std::thread internalThread; //!< worker thread
71  };
72 
73 
74 public:
75  /**
76  * Way how to the task should be run in the pool
77  */
78  enum class TaskExecutionMode {
79  NORMAL, //!< normal task, should be run it once
80  PERSISTENT //!< persistent task, run process() until it returns `false`
81  };
82 
83  /**
84  Event listener class
85  */
86  class EventListener {
87  public:
88  /**
89  Callback function called when a new worker thread is created
90  \param pool Thread pool number in the context
91  */
92  virtual inline void threadCreated(PoolIndex pool) {};
93 
94  /**
95  Callback function called when a worker thread is terminating
96  \param pool Thread pool number in the context
97  */
98  virtual inline void threadTerminating(PoolIndex pool) {};
99 
100  /**
101  Callback function called when a task is done or cancelled; if returns `true`, the task will be repeated
102  \param pool Thread pool number in the context
103  \param task The task
104  \param aborted If `true`, the task was aborted
105  */
106  virtual inline bool taskDone(PoolIndex pool, AbstractTask& task, bool aborted) {
107  return false;
108  }
109 
110  /**
111  Callback function called when an exception is thrown
112  \param pool Thread pool number in the context
113  \param task The task that was running when the exception was thrown
114  \param exPtr Exception pointer
115  */
116  virtual inline void taskFail(PoolIndex pool, AbstractTask& task, const std::exception_ptr exPtr) {};
117 
118  /**
119  Callback function called when the GPU cannot start
120  \param pool Thread pool number in the context
121  \param exPtr Exception pointer; points to the exception instance occurred when starting up the GPU
122  */
123  virtual inline void gpuInitFail(PoolIndex pool, const std::exception_ptr exPtr) {};
124  };
125 
126 private:
127  ThreadPool(const ThreadPool&) = delete;
128 
129  /**
130  \internal
131  Thrown in a thread when detecting that another thread failed while runinning a task, causing the current thread to stop.
132  */
133  class AnotherThreadFailed : public std::exception {
134  public:
136  };
137 
138  /**
139  Managing (#0) worker thread function
140  */
141  inline void managingThreadFunc(TaskThreadImpl& thread) {
143 
144  // locks
145  std::unique_lock<std::mutex> lock(jobsAccess, std::defer_lock);
146  std::unique_lock<std::mutex> workersLock(workersAccess, std::defer_lock);
147 
148  while (!thread.isTerminating) {
149  // wait while a task is got
150  lock.lock();
151  while (jobs.empty() && !thread.isTerminating)
152  jobsCvar.wait(lock);
153 
154  if (thread.isTerminating) {
155  lock.unlock();
156  break;
157  }
158 
159  // fetch a task
160  syncHitsCount = 0;
161  syncHitsBound = 0;
163  failFlag = false;
164  repeatFlag = false;
165  if (!jobs.empty()) {
166  currentJob = jobs.front();
168  }
169  else
170  currentJob.task = nullptr;
171 
172  // release queue access
173  lock.unlock();
174 
175  // test execution mode
177  bool useGpuForCurrentTask = false;
178  if (currentJob.task) {
179  exTarget = currentJob.task->getUsedDevices();
180 
181  if (exTarget != AbstractTask::TaskDeviceRequirement::CPU_ONLY && myIndex == 0) {
182  // test GPU if not yet
183  if (!isGpuTested) {
184  try {
185  gpu = new GraphicPipeline();
186  }
187  catch (...) {
188  eventListener.gpuInitFail(myIndex, std::current_exception());
189  std::lock_guard<std::mutex> lock(exceptionsAccess);
190  exceptions.push_back(std::current_exception());
191  gpu = nullptr;
192  }
193  isGpuTested = true;
194  }
195 
196  useGpuForCurrentTask = (gpu != nullptr);
197  }
198 
199  // run beforeProcessing
200  try {
201  if (!useGpuForCurrentTask && exTarget == AbstractTask::TaskDeviceRequirement::GPU_ONLY)
202  throw Beatmup::RuntimeError(
203  myIndex == 0 ?
204  "A task requires GPU, but GPU init is failed" :
205  "A task requiring GPU may only be run in the main pool"
206  );
208  }
209  catch (...) {
210  eventListener.taskFail(myIndex, *currentJob.task, std::current_exception());
211  std::lock_guard<std::mutex> lock(exceptionsAccess);
212  exceptions.push_back(std::current_exception());
213  failFlag = true;
214  }
215 
216  // drop the task if failed
217  if (failFlag) {
218  lock.lock();
219  jobs.pop_front();
220  lock.unlock();
221  }
222  }
223 
224  // if failed, go to next iteration
225  if (failFlag) {
226  // send a signal to threads waiting for the task to finish
227  jobsCvar.notify_all();
228  continue;
229  }
230 
231  // wake up workers
232  workersLock.lock();
233  for (ThreadIndex t = 0; t < threadCount; t++)
234  workers[t]->isRunning = true;
235  workersLock.unlock();
236  workersCvar.notify_all(); // go!
237 
238  // do the job
239  if (currentJob.task)
240  try {
241  do {
242  bool result = useGpuForCurrentTask ? currentJob.task->processOnGPU(*gpu, thread) : currentJob.task->process(thread);
243  if (!result) {
244  abortInternally = true;
245  }
247  }
248  catch (AnotherThreadFailed) {
249  // nothing special to do here
250  }
251  catch (...) {
252  eventListener.taskFail(myIndex, *currentJob.task, std::current_exception());
253  std::lock_guard<std::mutex> lock(exceptionsAccess);
254  exceptions.push_back(std::current_exception());
255  failFlag = true;
256  }
257 
258  // finalizing
259  workersLock.lock();
260 
261  // decrease remaining workers count
263 
264  // notify other workers if they're waiting for synchro
265  synchroCvar.notify_all();
266 
267  // wait until all the workers stop
268  while (remainingWorkers > 0 && !thread.isTerminating)
269  workersCvar.wait(workersLock);
270 
271  // call afterProcessing
272  if (currentJob.task)
273  try {
274  currentJob.task->afterProcessing(currentWorkerCount, useGpuForCurrentTask ? gpu : nullptr, abortExternally);
275  }
276  catch (...) {
277  eventListener.taskFail(myIndex, *currentJob.task, std::current_exception());
278  std::lock_guard<std::mutex> lock(exceptionsAccess);
279  exceptions.push_back(std::current_exception());
280  failFlag = true;
281  }
282 
283  workersLock.unlock();
284 
285  // unlock graphic pipeline, if used
286  if (useGpuForCurrentTask && gpu) {
287  gpu->flush();
288  }
289 
290  // call taskDone, ask if want to repeat
291  bool internalRepeatFlag = false;
292  if (!failFlag)
293  internalRepeatFlag = eventListener.taskDone(myIndex, *currentJob.task, abortExternally);
294 
295  // drop the task
296  lock.lock();
297  if (!(repeatFlag || internalRepeatFlag) || failFlag) {
298  jobs.pop_front();
299 
300  // send a signal to threads waiting for the task to finish
301  jobsCvar.notify_all();
302  }
303 
304  lock.unlock();
305  }
307 
308  // deleting graphic pipeline instance
309  if (gpu && myIndex == 0)
310  delete gpu;
311  }
312 
313 
314  /**
315  Ordinary worker thread function
316  */
317  inline void workerThreadFunc(TaskThreadImpl& thread) {
319  std::unique_lock<std::mutex> lock(workersAccess);
320  Job myLastJob = (Job)-1;
321  while (!thread.isTerminating) {
322  // wait for a job
323  while ((!thread.isRunning || thread.index >= currentWorkerCount || myLastJob - currentJob.id >= 0)
324  && !thread.isTerminating)
325  {
326  std::this_thread::yield();
327  workersCvar.wait(lock);
328  }
329  if (thread.isTerminating) {
330  lock.unlock();
331  break;
332  }
333 
334  // do the job
336  lock.unlock();
337 
338  /* UNLOCKED SECTION */
339  try {
340  do {
341  if (!job.task->process(thread)) {
342  abortInternally = true;
343  }
345  }
346  catch (AnotherThreadFailed) {
347  // nothing special to do here
348  }
349  catch (...) {
350  eventListener.taskFail(myIndex, *job.task, std::current_exception());
351  std::lock_guard<std::mutex> lock(exceptionsAccess);
352  exceptions.push_back(std::current_exception());
353  failFlag = true;
354  }
355 
356  // finalizing
357  lock.lock();
358 
359  // decrease remaining workers count
360  myLastJob = job.id;
362 
363  // notify other workers if they're waiting for synchro
364  synchroCvar.notify_all();
365 
366  // stop
367  thread.isRunning = false;
368 
369  // send a signal to other workers
370  workersCvar.notify_all();
371  }
372 
374  }
375 
376 
377  inline void synchronizeThread(TaskThreadImpl& thread) {
378  std::unique_lock<std::mutex> lock(synchro); //<------- LOCKING HERE
379  syncHitsCount++;
380 
381  // check if this thread is the last one passing the synchronization point
384  lock.unlock(); //<------- UNLOCKING HERE
385  // do not block, wake up the others
386  synchroCvar.notify_all();
387  }
388 
389  else {
390  // Wait while other threads reach this synchronization point or the remaining number of workers drops
391  // (at the end of the task), or pool is terminating.
392  // Do not check if the task aborted here to keep threads synchronized.
393  const int myBound = syncHitsBound;
394  while (!thread.isTerminating && !failFlag && myBound + remainingWorkers - syncHitsCount > 0)
395  synchroCvar.wait(lock);
396 
397  lock.unlock(); //<------- UNLOCKING HERE
398  }
399 
400  if (failFlag)
401  throw AnotherThreadFailed();
402  }
403 
404  typedef struct {
408  } JobContext;
409 
410  TaskThreadImpl** workers; //!< workers instances
411 
412  GraphicPipeline* gpu; //!< THE graphic pipeline to run tasks on GPU
413 
414  std::deque<JobContext> jobs; //!< jobs queue
415  std::deque<std::exception_ptr>
416  exceptions; //!< exceptions thrown in this pool
417 
418  JobContext currentJob; //!< job being run at the moment
420 
421  ThreadIndex threadCount; //!< actual number of workers
422 
425  remainingWorkers; //!< number of workers performing the current task right now
426 
427  std::condition_variable
428  synchroCvar, //!< gets notified about workers synchronization
429  jobsCvar, //!< gets notified about the jobs queue updates
430  workersCvar; //!< gets notified about workers lifecycle updates
431 
432  std::mutex
433  synchro, //!< workers synchronization control within the current task
434  workersAccess, //!< workers lifecycle access control
435  jobsAccess, //!< jobs queue access control
436  exceptionsAccess; //!< exceptions queue access control
437 
438  int
439  syncHitsCount, //!< number of times synchronization is hit so far by all workers together
440  syncHitsBound; //!< last sync hits count when all the workers were synchronized
441 
442  bool
443  isGpuTested, //!< if `true`, there was an attempt to warm up the GPU
444  abortExternally, //!< if `true`, the task is aborted externally
445  abortInternally, //!< if `true`, the task aborts itself: beforeProcessing(), process() or processOnGPU() returned `false`
446  failFlag, //!< communicates to all the threads that the current task is to skip because of a problem
447  repeatFlag; //!< if `true`, the current task is asked to be repeated
448 
450 
451 public:
452  const PoolIndex myIndex; //!< the index of the current pool
453 
454  inline ThreadPool(const PoolIndex index, const ThreadIndex limitThreadCount, EventListener & listener) :
455  gpu(nullptr),
456  currentJob{0, nullptr},
457  jobCounter(1),
458  threadCount(limitThreadCount),
461  isGpuTested(false),
462  abortExternally(false), abortInternally(false),
463  failFlag(false), repeatFlag(false),
465  myIndex(index)
466  {
468  // spawning workers
469  for (ThreadIndex t = 0; t < threadCount; t++)
470  workers[t] = new TaskThreadImpl(t, *this);
471  }
472 
473 
474  inline ~ThreadPool() {
475  // set termination flags
476  {
477  std::lock_guard<std::mutex> lockJobs(jobsAccess), lockWorkers(workersAccess), lockSync(synchro);
478  abortExternally = true;
479  for (ThreadIndex t = 0; t < threadCount; t++)
480  workers[t]->isTerminating = true;
481  }
482  synchroCvar.notify_all();
483  jobsCvar.notify_all();
484  workersCvar.notify_all();
485  // no wait here!
486  for (ThreadIndex t = 0; t < threadCount; t++) {
487  workers[t]->internalThread.join();
488  delete workers[t];
489  }
490  delete[] workers;
491  }
492 
493 
494  /**
495  Checks if the thread pool is doing great.
496  If not, i.e., if there was an exception, rethrows the exception.
497  There might be multiple exceptions in the queue. They are thrown one by one every time this function is called.
498  */
499  inline void check() {
500  std::lock_guard<std::mutex> lock(exceptionsAccess);
501  if (!exceptions.empty()) {
502  std::exception_ptr pointer(exceptions.front());
503  exceptions.pop_front();
504  std::rethrow_exception(pointer);
505  }
506  }
507 
508 
509  /**
510  Resizes the pool.
511  Blocking if there is a task running.
512  \param newThreadCount The new number of worker threads
513  */
514  inline void resize(ThreadIndex newThreadCount) {
515  if (newThreadCount == threadCount)
516  return;
517  std::unique_lock<std::mutex> jobsLock(jobsAccess);
518  // wait for task, if any
519  while (!jobs.empty())
520  jobsCvar.wait(jobsLock);
521 
522  std::unique_lock<std::mutex> workersLock(workersAccess);
523  // set termination flags for threads to be stopped, if any
524  for (ThreadIndex t = newThreadCount; t < threadCount; t++)
525  workers[t]->isTerminating = true;
526 
527  // unlock and notify
528  workersLock.unlock();
529  synchroCvar.notify_all();
530  jobsCvar.notify_all();
531  workersCvar.notify_all();
532 
533  // join
534  for (ThreadIndex t = newThreadCount; t < threadCount; t++) {
535  workers[t]->internalThread.join();
536  delete workers[t];
537  }
538 
539  // spawn new threads if needed
540  workersLock.lock();
541  if (threadCount < newThreadCount) {
542  TaskThreadImpl** newWorkers = new TaskThreadImpl*[newThreadCount];
543  for (ThreadIndex t = 0; t < threadCount; t++)
544  newWorkers[t] = workers[t];
545  for (ThreadIndex t = threadCount; t < newThreadCount; t++)
546  newWorkers[t] = new TaskThreadImpl(t, *this);
547  delete[] workers;
548  workers = newWorkers;
549  }
550  // update thread count
551  threadCount = newThreadCount;
552 
553  workersLock.unlock();
554  jobsLock.unlock();
555  }
556 
557 
558  /**
559  Adds a new task to the jobs queue.
560  */
562  std::unique_lock<std::mutex> lock(jobsAccess);
563  const Job job = jobCounter++;
564 
565  // add new job
566  jobs.emplace_back(JobContext{job, &task, mode});
567 
568  lock.unlock();
569  jobsCvar.notify_all();
570  return job;
571  }
572 
573 
574  /**
575  Ensures a given task executed at least once.
576  \param task The task
577  \param abortCurrent if `true` and the task is currently running, abort signal is sent.
578  */
579  inline Job repeatTask(AbstractTask& task, bool abortCurrent) {
580  std::unique_lock<std::mutex> lock(jobsAccess);
581 
582  // check if the rask is running now, ask for repeat if it is
583  if (!jobs.empty() && jobs.front().task == &task) {
584  repeatFlag = true;
585  if (abortCurrent)
586  abortExternally = true;
587  return jobs.front().id;
588  }
589 
590  // check whether it is in the queue
591  for (const JobContext& _ : jobs)
592  if (_.task == &task) {
593  return _.id;
594  }
595 
596  // otherwise submit the task
597  const Job job = jobCounter++;
598  jobs.emplace_back(JobContext{
599  job,
600  &task,
602  });
603 
604  // unlock the jobs access, notify workers
605  lock.unlock();
606  jobsCvar.notify_all();
607  return job;
608  }
609 
610 
611  /**
612  Blocks until a given job finishes if not yet.
613  */
614  inline void waitForJob(Job job) {
615  std::unique_lock<std::mutex> lock(jobsAccess);
616 #ifdef BEATMUP_DEBUG
618  class BlockingOnPersistentJob : public Exception {
619  public:
620  BlockingOnPersistentJob(): Exception("Waiting for a persistent job to finish: potential deadlock") {}
621  };
622  throw BlockingOnPersistentJob();
623  }
624 #endif
625  while (true) {
626  // check if the job is in the queue
627  bool found = false;
628  for (const JobContext& _ : jobs)
629  if (_.id == job) {
630  found = true;
631  break;
632  }
633 
634  // if not, done
635  if (!found)
636  return;
637 
638  // otherwise wait a round and check again
639  jobsCvar.wait(lock);
640  }
641  }
642 
643 
644  /**
645  Aborts a given submitted job.
646  \return `true` if the job was interrupted while running.
647  */
648  bool abortJob(Job job) {
649  std::unique_lock<std::mutex> lock(jobsAccess);
650 
651  // check if the rask is running now, abort if it is
652  if (currentJob.task && currentJob.id == job) {
653  abortExternally = true;
654  while (!jobs.empty() && currentJob.task && currentJob.id == job)
655  jobsCvar.wait(lock);
656  return true;
657  }
658 
659  for (auto it = jobs.begin(); it != jobs.end(); it++)
660  if (it->id == job) {
661  jobs.erase(it);
662  return false;
663  }
664 
665  return false;
666  }
667 
668 
669  /**
670  Blocks until all the submitted jobs are executed.
671  */
672  void wait() {
673  std::unique_lock<std::mutex> lock(jobsAccess);
674  while (!jobs.empty())
675  jobsCvar.wait(lock);
676  }
677 
678 
679  /**
680  Checks whether the pool has jobs.
681  */
682  inline bool busy() {
683  std::lock_guard<std::mutex> lock(jobsAccess);
684  return !jobs.empty();
685  }
686 
687 
688  /**
689  Returns optimal number of threads depending on the hardware capabilities
690  */
692  unsigned int N = std::thread::hardware_concurrency();
693  RuntimeError::check(N > 0, "Unable to determine hardware concurrency capabilities.");
695  }
696 
697 
698  /**
699  \return `true` if GPU was queried.
700  */
701  inline bool isGpuQueried() const {
702  return isGpuTested;
703  }
704 
705 
706  /**
707  \return `true` if GPU is ready to use.
708  */
709  inline bool isGpuReady() const {
710  return gpu != nullptr;
711  }
712 
713  /**
714  \returns `true` if invoked from the manager thread
715  */
716  inline bool isManagingThread() const {
717  if (!workers)
718  return false;
719  return std::this_thread::get_id() == workers[0]->internalThread.get_id();
720  }
721 
723 };
Task: an operation that can be executed by multiple threads in parallel.
Definition: parallelism.h:90
TaskDeviceRequirement
Specifies which device (CPU and/or GPU) is used to run the task.
Definition: parallelism.h:95
@ CPU_ONLY
this task does not use GPU
@ GPU_ONLY
this task requires GPU, otherwise it cannot run
virtual TaskDeviceRequirement getUsedDevices() const
Communicates devices (CPU and/or GPU) the task is run on.
Definition: parallelism.cpp:54
virtual void beforeProcessing(ThreadIndex threadCount, ProcessingTarget target, GraphicPipeline *gpu)
Instruction called before the task is executed.
Definition: parallelism.cpp:24
virtual bool processOnGPU(GraphicPipeline &gpu, TaskThread &thread)
Executes the task on GPU.
Definition: parallelism.cpp:34
virtual ThreadIndex getMaxThreads() const
Gives the upper limint on the number of threads the task may be performed by.
Definition: parallelism.cpp:40
virtual void afterProcessing(ThreadIndex threadCount, GraphicPipeline *gpu, bool aborted)
Instruction called after the task is executed.
Definition: parallelism.cpp:29
virtual bool process(TaskThread &thread)=0
Executes the task on CPU within a given thread.
Base class for all exceptions.
Definition: exception.h:37
Internal low-level GPU control API.
Definition: pipeline.h:33
void flush()
Waits until all operations submitted to GPU are finished.
Definition: pipeline.cpp:931
static void check(const bool condition, const std::string &message)
Definition: exception.h:64
Thread executing tasks.
Definition: parallelism.h:154
virtual bool taskDone(PoolIndex pool, AbstractTask &task, bool aborted)
Callback function called when a task is done or cancelled; if returns true, the task will be repeated...
virtual void taskFail(PoolIndex pool, AbstractTask &task, const std::exception_ptr exPtr)
Callback function called when an exception is thrown.
virtual void threadCreated(PoolIndex pool)
Callback function called when a new worker thread is created.
Definition: thread_pool.hpp:92
virtual void gpuInitFail(PoolIndex pool, const std::exception_ptr exPtr)
Callback function called when the GPU cannot start.
virtual void threadTerminating(PoolIndex pool)
Callback function called when a worker thread is terminating.
Definition: thread_pool.hpp:98
bool isTerminating
if true, the thread is requested to terminate
Definition: thread_pool.hpp:69
void synchronize()
Blocks until all the other threads running the same task reach the same point.
Definition: thread_pool.hpp:61
bool isRunning
if not, the thread sleeps
Definition: thread_pool.hpp:68
ThreadIndex index
current thread index
Definition: thread_pool.hpp:67
std::thread internalThread
worker thread
Definition: thread_pool.hpp:70
TaskThreadImpl(ThreadIndex index, ThreadPool &pool)
Definition: thread_pool.hpp:45
bool isTaskAborted() const
Returns true if the task is asked to stop from outside.
Definition: thread_pool.hpp:57
A pool of threads running tasks ThreadPool runs AbstractTasks, possibly in multiple threads.
Definition: thread_pool.hpp:36
Job submitTask(AbstractTask &task, const TaskExecutionMode mode)
Adds a new task to the jobs queue.
ThreadPool(const PoolIndex index, const ThreadIndex limitThreadCount, EventListener &listener)
void resize(ThreadIndex newThreadCount)
Resizes the pool.
std::mutex jobsAccess
jobs queue access control
TaskExecutionMode
Way how to the task should be run in the pool.
Definition: thread_pool.hpp:78
@ PERSISTENT
persistent task, run process() until it returns false
@ NORMAL
normal task, should be run it once
ThreadIndex threadCount
actual number of workers
void waitForJob(Job job)
Blocks until a given job finishes if not yet.
std::mutex workersAccess
workers lifecycle access control
bool abortInternally
if true, the task aborts itself: beforeProcessing(), process() or processOnGPU() returned false
bool failFlag
communicates to all the threads that the current task is to skip because of a problem
std::condition_variable synchroCvar
gets notified about workers synchronization
std::deque< JobContext > jobs
jobs queue
void managingThreadFunc(TaskThreadImpl &thread)
Managing (#0) worker thread function.
const PoolIndex myIndex
the index of the current pool
ThreadPool(const ThreadPool &)=delete
TaskThreadImpl ** workers
workers instances
Job repeatTask(AbstractTask &task, bool abortCurrent)
Ensures a given task executed at least once.
ThreadIndex currentWorkerCount
ThreadIndex getThreadCount() const
bool abortJob(Job job)
Aborts a given submitted job.
bool repeatFlag
if true, the current task is asked to be repeated
JobContext currentJob
job being run at the moment
int syncHitsCount
number of times synchronization is hit so far by all workers together
bool busy()
Checks whether the pool has jobs.
std::mutex synchro
workers synchronization control within the current task
void synchronizeThread(TaskThreadImpl &thread)
static ThreadIndex hardwareConcurrency()
Returns optimal number of threads depending on the hardware capabilities.
bool isGpuTested
if true, there was an attempt to warm up the GPU
int syncHitsBound
last sync hits count when all the workers were synchronized
bool isGpuReady() const
bool isManagingThread() const
std::mutex exceptionsAccess
exceptions queue access control
EventListener & eventListener
std::deque< std::exception_ptr > exceptions
exceptions thrown in this pool
bool abortExternally
if true, the task is aborted externally
void workerThreadFunc(TaskThreadImpl &thread)
Ordinary worker thread function.
GraphicPipeline * gpu
THE graphic pipeline to run tasks on GPU.
std::condition_variable jobsCvar
gets notified about the jobs queue updates
void check()
Checks if the thread pool is doing great.
bool isGpuQueried() const
ThreadIndex remainingWorkers
number of workers performing the current task right now
std::condition_variable workersCvar
gets notified about workers lifecycle updates
void wait()
Blocks until all the submitted jobs are executed.
unsigned char ThreadIndex
number of threads / thread index
Definition: parallelism.h:68
static const ThreadIndex MAX_THREAD_INDEX
maximum possible thread index value
Definition: parallelism.h:71
unsigned char PoolIndex
number of tread pools or a pool index
Definition: parallelism.h:67
int Job
Definition: parallelism.h:69
CustomPoint< numeric > min(const CustomPoint< numeric > &a, const CustomPoint< numeric > &b)
Definition: geometry.h:724
jlong jint index
JNIEnv jlong jint jint job
Beatmup::Context::EventListener * listener
Beatmup::IntPoint result
JNIEnv jlong jint mode
Beatmup::NNets::InferenceTask * task