Beatmup
Beatmup::ThreadPool Class Reference

A pool of threads running tasks ThreadPool runs AbstractTasks, possibly in multiple threads. More...

#include <thread_pool.hpp>

Classes

class  AnotherThreadFailed
 
class  EventListener
 Event listener class. More...
 
struct  JobContext
 
class  TaskThreadImpl
 

Public Types

enum class  TaskExecutionMode { NORMAL , PERSISTENT }
 Way how to the task should be run in the pool. More...
 

Public Member Functions

 ThreadPool (const PoolIndex index, const ThreadIndex limitThreadCount, EventListener &listener)
 
 ~ThreadPool ()
 
void check ()
 Checks if the thread pool is doing great. More...
 
void resize (ThreadIndex newThreadCount)
 Resizes the pool. More...
 
Job submitTask (AbstractTask &task, const TaskExecutionMode mode)
 Adds a new task to the jobs queue. More...
 
Job repeatTask (AbstractTask &task, bool abortCurrent)
 Ensures a given task executed at least once. More...
 
void waitForJob (Job job)
 Blocks until a given job finishes if not yet. More...
 
bool abortJob (Job job)
 Aborts a given submitted job. More...
 
void wait ()
 Blocks until all the submitted jobs are executed. More...
 
bool busy ()
 Checks whether the pool has jobs. More...
 
bool isGpuQueried () const
 
bool isGpuReady () const
 
bool isManagingThread () const
 
ThreadIndex getThreadCount () const
 

Static Public Member Functions

static ThreadIndex hardwareConcurrency ()
 Returns optimal number of threads depending on the hardware capabilities. More...
 

Public Attributes

const PoolIndex myIndex
 the index of the current pool More...
 

Private Member Functions

 ThreadPool (const ThreadPool &)=delete
 
void managingThreadFunc (TaskThreadImpl &thread)
 Managing (#0) worker thread function. More...
 
void workerThreadFunc (TaskThreadImpl &thread)
 Ordinary worker thread function. More...
 
void synchronizeThread (TaskThreadImpl &thread)
 

Private Attributes

TaskThreadImpl ** workers
 workers instances More...
 
GraphicPipelinegpu
 THE graphic pipeline to run tasks on GPU. More...
 
std::deque< JobContextjobs
 jobs queue More...
 
std::deque< std::exception_ptr > exceptions
 exceptions thrown in this pool More...
 
JobContext currentJob
 job being run at the moment More...
 
Job jobCounter
 
ThreadIndex threadCount
 actual number of workers More...
 
ThreadIndex currentWorkerCount
 
ThreadIndex remainingWorkers
 number of workers performing the current task right now More...
 
std::condition_variable synchroCvar
 gets notified about workers synchronization More...
 
std::condition_variable jobsCvar
 gets notified about the jobs queue updates More...
 
std::condition_variable workersCvar
 gets notified about workers lifecycle updates More...
 
std::mutex synchro
 workers synchronization control within the current task More...
 
std::mutex workersAccess
 workers lifecycle access control More...
 
std::mutex jobsAccess
 jobs queue access control More...
 
std::mutex exceptionsAccess
 exceptions queue access control More...
 
int syncHitsCount
 number of times synchronization is hit so far by all workers together More...
 
int syncHitsBound
 last sync hits count when all the workers were synchronized More...
 
bool isGpuTested
 if true, there was an attempt to warm up the GPU More...
 
bool abortExternally
 if true, the task is aborted externally More...
 
bool abortInternally
 if true, the task aborts itself: beforeProcessing(), process() or processOnGPU() returned false More...
 
bool failFlag
 communicates to all the threads that the current task is to skip because of a problem More...
 
bool repeatFlag
 if true, the current task is asked to be repeated More...
 
EventListenereventListener
 

Detailed Description

A pool of threads running tasks ThreadPool runs AbstractTasks, possibly in multiple threads.

Every time a task is submitted to the thread pool, it is scheduled for execution and gets attributed a corresponding Job number. Jobs are used to cancel the scheduled runs and track whether they have been conducted or not. Exceptions thrown during the tasks execution in different threads are rethrown in the caller threads.

Definition at line 36 of file thread_pool.hpp.

Member Enumeration Documentation

◆ TaskExecutionMode

Way how to the task should be run in the pool.

Enumerator
NORMAL 

normal task, should be run it once

PERSISTENT 

persistent task, run process() until it returns false

Definition at line 78 of file thread_pool.hpp.

78  {
79  NORMAL, //!< normal task, should be run it once
80  PERSISTENT //!< persistent task, run process() until it returns `false`
81  };

Constructor & Destructor Documentation

◆ ThreadPool() [1/2]

Beatmup::ThreadPool::ThreadPool ( const ThreadPool )
privatedelete

◆ ThreadPool() [2/2]

Beatmup::ThreadPool::ThreadPool ( const PoolIndex  index,
const ThreadIndex  limitThreadCount,
EventListener listener 
)
inline

Definition at line 454 of file thread_pool.hpp.

454  :
455  gpu(nullptr),
456  currentJob{0, nullptr},
457  jobCounter(1),
458  threadCount(limitThreadCount),
461  isGpuTested(false),
462  abortExternally(false), abortInternally(false),
463  failFlag(false), repeatFlag(false),
465  myIndex(index)
466  {
467  workers = new TaskThreadImpl*[threadCount];
468  // spawning workers
469  for (ThreadIndex t = 0; t < threadCount; t++)
470  workers[t] = new TaskThreadImpl(t, *this);
471  }
ThreadIndex threadCount
actual number of workers
bool abortInternally
if true, the task aborts itself: beforeProcessing(), process() or processOnGPU() returned false
bool failFlag
communicates to all the threads that the current task is to skip because of a problem
const PoolIndex myIndex
the index of the current pool
TaskThreadImpl ** workers
workers instances
ThreadIndex currentWorkerCount
bool repeatFlag
if true, the current task is asked to be repeated
JobContext currentJob
job being run at the moment
int syncHitsCount
number of times synchronization is hit so far by all workers together
bool isGpuTested
if true, there was an attempt to warm up the GPU
int syncHitsBound
last sync hits count when all the workers were synchronized
EventListener & eventListener
bool abortExternally
if true, the task is aborted externally
GraphicPipeline * gpu
THE graphic pipeline to run tasks on GPU.
ThreadIndex remainingWorkers
number of workers performing the current task right now
unsigned char ThreadIndex
number of threads / thread index
Definition: parallelism.h:68
jlong jint index
Beatmup::Context::EventListener * listener

◆ ~ThreadPool()

Beatmup::ThreadPool::~ThreadPool ( )
inline

Definition at line 474 of file thread_pool.hpp.

474  {
475  // set termination flags
476  {
477  std::lock_guard<std::mutex> lockJobs(jobsAccess), lockWorkers(workersAccess), lockSync(synchro);
478  abortExternally = true;
479  for (ThreadIndex t = 0; t < threadCount; t++)
480  workers[t]->isTerminating = true;
481  }
482  synchroCvar.notify_all();
483  jobsCvar.notify_all();
484  workersCvar.notify_all();
485  // no wait here!
486  for (ThreadIndex t = 0; t < threadCount; t++) {
487  workers[t]->internalThread.join();
488  delete workers[t];
489  }
490  delete[] workers;
491  }
std::thread internalThread
worker thread
Definition: thread_pool.hpp:70
std::mutex jobsAccess
jobs queue access control
std::mutex workersAccess
workers lifecycle access control
std::condition_variable synchroCvar
gets notified about workers synchronization
std::mutex synchro
workers synchronization control within the current task
std::condition_variable jobsCvar
gets notified about the jobs queue updates
std::condition_variable workersCvar
gets notified about workers lifecycle updates

Member Function Documentation

◆ managingThreadFunc()

void Beatmup::ThreadPool::managingThreadFunc ( TaskThreadImpl thread)
inlineprivate

Managing (#0) worker thread function.

Definition at line 141 of file thread_pool.hpp.

141  {
143 
144  // locks
145  std::unique_lock<std::mutex> lock(jobsAccess, std::defer_lock);
146  std::unique_lock<std::mutex> workersLock(workersAccess, std::defer_lock);
147 
148  while (!thread.isTerminating) {
149  // wait while a task is got
150  lock.lock();
151  while (jobs.empty() && !thread.isTerminating)
152  jobsCvar.wait(lock);
153 
154  if (thread.isTerminating) {
155  lock.unlock();
156  break;
157  }
158 
159  // fetch a task
160  syncHitsCount = 0;
161  syncHitsBound = 0;
163  failFlag = false;
164  repeatFlag = false;
165  if (!jobs.empty()) {
166  currentJob = jobs.front();
168  }
169  else
170  currentJob.task = nullptr;
171 
172  // release queue access
173  lock.unlock();
174 
175  // test execution mode
177  bool useGpuForCurrentTask = false;
178  if (currentJob.task) {
179  exTarget = currentJob.task->getUsedDevices();
180 
181  if (exTarget != AbstractTask::TaskDeviceRequirement::CPU_ONLY && myIndex == 0) {
182  // test GPU if not yet
183  if (!isGpuTested) {
184  try {
185  gpu = new GraphicPipeline();
186  }
187  catch (...) {
188  eventListener.gpuInitFail(myIndex, std::current_exception());
189  std::lock_guard<std::mutex> lock(exceptionsAccess);
190  exceptions.push_back(std::current_exception());
191  gpu = nullptr;
192  }
193  isGpuTested = true;
194  }
195 
196  useGpuForCurrentTask = (gpu != nullptr);
197  }
198 
199  // run beforeProcessing
200  try {
201  if (!useGpuForCurrentTask && exTarget == AbstractTask::TaskDeviceRequirement::GPU_ONLY)
202  throw Beatmup::RuntimeError(
203  myIndex == 0 ?
204  "A task requires GPU, but GPU init is failed" :
205  "A task requiring GPU may only be run in the main pool"
206  );
208  }
209  catch (...) {
210  eventListener.taskFail(myIndex, *currentJob.task, std::current_exception());
211  std::lock_guard<std::mutex> lock(exceptionsAccess);
212  exceptions.push_back(std::current_exception());
213  failFlag = true;
214  }
215 
216  // drop the task if failed
217  if (failFlag) {
218  lock.lock();
219  jobs.pop_front();
220  lock.unlock();
221  }
222  }
223 
224  // if failed, go to next iteration
225  if (failFlag) {
226  // send a signal to threads waiting for the task to finish
227  jobsCvar.notify_all();
228  continue;
229  }
230 
231  // wake up workers
232  workersLock.lock();
233  for (ThreadIndex t = 0; t < threadCount; t++)
234  workers[t]->isRunning = true;
235  workersLock.unlock();
236  workersCvar.notify_all(); // go!
237 
238  // do the job
239  if (currentJob.task)
240  try {
241  do {
242  bool result = useGpuForCurrentTask ? currentJob.task->processOnGPU(*gpu, thread) : currentJob.task->process(thread);
243  if (!result) {
244  abortInternally = true;
245  }
247  }
248  catch (AnotherThreadFailed) {
249  // nothing special to do here
250  }
251  catch (...) {
252  eventListener.taskFail(myIndex, *currentJob.task, std::current_exception());
253  std::lock_guard<std::mutex> lock(exceptionsAccess);
254  exceptions.push_back(std::current_exception());
255  failFlag = true;
256  }
257 
258  // finalizing
259  workersLock.lock();
260 
261  // decrease remaining workers count
263 
264  // notify other workers if they're waiting for synchro
265  synchroCvar.notify_all();
266 
267  // wait until all the workers stop
268  while (remainingWorkers > 0 && !thread.isTerminating)
269  workersCvar.wait(workersLock);
270 
271  // call afterProcessing
272  if (currentJob.task)
273  try {
274  currentJob.task->afterProcessing(currentWorkerCount, useGpuForCurrentTask ? gpu : nullptr, abortExternally);
275  }
276  catch (...) {
277  eventListener.taskFail(myIndex, *currentJob.task, std::current_exception());
278  std::lock_guard<std::mutex> lock(exceptionsAccess);
279  exceptions.push_back(std::current_exception());
280  failFlag = true;
281  }
282 
283  workersLock.unlock();
284 
285  // unlock graphic pipeline, if used
286  if (useGpuForCurrentTask && gpu) {
287  gpu->flush();
288  }
289 
290  // call taskDone, ask if want to repeat
291  bool internalRepeatFlag = false;
292  if (!failFlag)
293  internalRepeatFlag = eventListener.taskDone(myIndex, *currentJob.task, abortExternally);
294 
295  // drop the task
296  lock.lock();
297  if (!(repeatFlag || internalRepeatFlag) || failFlag) {
298  jobs.pop_front();
299 
300  // send a signal to threads waiting for the task to finish
301  jobsCvar.notify_all();
302  }
303 
304  lock.unlock();
305  }
307 
308  // deleting graphic pipeline instance
309  if (gpu && myIndex == 0)
310  delete gpu;
311  }
TaskDeviceRequirement
Specifies which device (CPU and/or GPU) is used to run the task.
Definition: parallelism.h:95
@ CPU_ONLY
this task does not use GPU
@ GPU_ONLY
this task requires GPU, otherwise it cannot run
virtual TaskDeviceRequirement getUsedDevices() const
Communicates devices (CPU and/or GPU) the task is run on.
Definition: parallelism.cpp:54
virtual void beforeProcessing(ThreadIndex threadCount, ProcessingTarget target, GraphicPipeline *gpu)
Instruction called before the task is executed.
Definition: parallelism.cpp:24
virtual bool processOnGPU(GraphicPipeline &gpu, TaskThread &thread)
Executes the task on GPU.
Definition: parallelism.cpp:34
virtual ThreadIndex getMaxThreads() const
Gives the upper limint on the number of threads the task may be performed by.
Definition: parallelism.cpp:40
virtual void afterProcessing(ThreadIndex threadCount, GraphicPipeline *gpu, bool aborted)
Instruction called after the task is executed.
Definition: parallelism.cpp:29
virtual bool process(TaskThread &thread)=0
Executes the task on CPU within a given thread.
void flush()
Waits until all operations submitted to GPU are finished.
Definition: pipeline.cpp:931
virtual bool taskDone(PoolIndex pool, AbstractTask &task, bool aborted)
Callback function called when a task is done or cancelled; if returns true, the task will be repeated...
virtual void taskFail(PoolIndex pool, AbstractTask &task, const std::exception_ptr exPtr)
Callback function called when an exception is thrown.
virtual void threadCreated(PoolIndex pool)
Callback function called when a new worker thread is created.
Definition: thread_pool.hpp:92
virtual void gpuInitFail(PoolIndex pool, const std::exception_ptr exPtr)
Callback function called when the GPU cannot start.
virtual void threadTerminating(PoolIndex pool)
Callback function called when a worker thread is terminating.
Definition: thread_pool.hpp:98
@ PERSISTENT
persistent task, run process() until it returns false
std::deque< JobContext > jobs
jobs queue
std::mutex exceptionsAccess
exceptions queue access control
std::deque< std::exception_ptr > exceptions
exceptions thrown in this pool
CustomPoint< numeric > min(const CustomPoint< numeric > &a, const CustomPoint< numeric > &b)
Definition: geometry.h:724
Beatmup::IntPoint result

◆ workerThreadFunc()

void Beatmup::ThreadPool::workerThreadFunc ( TaskThreadImpl thread)
inlineprivate

Ordinary worker thread function.

Definition at line 317 of file thread_pool.hpp.

317  {
319  std::unique_lock<std::mutex> lock(workersAccess);
320  Job myLastJob = (Job)-1;
321  while (!thread.isTerminating) {
322  // wait for a job
323  while ((!thread.isRunning || thread.index >= currentWorkerCount || myLastJob - currentJob.id >= 0)
324  && !thread.isTerminating)
325  {
326  std::this_thread::yield();
327  workersCvar.wait(lock);
328  }
329  if (thread.isTerminating) {
330  lock.unlock();
331  break;
332  }
333 
334  // do the job
335  JobContext job = currentJob;
336  lock.unlock();
337 
338  /* UNLOCKED SECTION */
339  try {
340  do {
341  if (!job.task->process(thread)) {
342  abortInternally = true;
343  }
344  } while (job.mode == TaskExecutionMode::PERSISTENT && !abortInternally && !abortExternally && !thread.isTerminating);
345  }
346  catch (AnotherThreadFailed) {
347  // nothing special to do here
348  }
349  catch (...) {
350  eventListener.taskFail(myIndex, *job.task, std::current_exception());
351  std::lock_guard<std::mutex> lock(exceptionsAccess);
352  exceptions.push_back(std::current_exception());
353  failFlag = true;
354  }
355 
356  // finalizing
357  lock.lock();
358 
359  // decrease remaining workers count
360  myLastJob = job.id;
362 
363  // notify other workers if they're waiting for synchro
364  synchroCvar.notify_all();
365 
366  // stop
367  thread.isRunning = false;
368 
369  // send a signal to other workers
370  workersCvar.notify_all();
371  }
372 
374  }
int Job
Definition: parallelism.h:69
JNIEnv jlong jint jint job

◆ synchronizeThread()

void Beatmup::ThreadPool::synchronizeThread ( TaskThreadImpl thread)
inlineprivate

Definition at line 377 of file thread_pool.hpp.

377  {
378  std::unique_lock<std::mutex> lock(synchro); //<------- LOCKING HERE
379  syncHitsCount++;
380 
381  // check if this thread is the last one passing the synchronization point
384  lock.unlock(); //<------- UNLOCKING HERE
385  // do not block, wake up the others
386  synchroCvar.notify_all();
387  }
388 
389  else {
390  // Wait while other threads reach this synchronization point or the remaining number of workers drops
391  // (at the end of the task), or pool is terminating.
392  // Do not check if the task aborted here to keep threads synchronized.
393  const int myBound = syncHitsBound;
394  while (!thread.isTerminating && !failFlag && myBound + remainingWorkers - syncHitsCount > 0)
395  synchroCvar.wait(lock);
396 
397  lock.unlock(); //<------- UNLOCKING HERE
398  }
399 
400  if (failFlag)
401  throw AnotherThreadFailed();
402  }

◆ check()

void Beatmup::ThreadPool::check ( )
inline

Checks if the thread pool is doing great.

If not, i.e., if there was an exception, rethrows the exception. There might be multiple exceptions in the queue. They are thrown one by one every time this function is called.

Definition at line 499 of file thread_pool.hpp.

499  {
500  std::lock_guard<std::mutex> lock(exceptionsAccess);
501  if (!exceptions.empty()) {
502  std::exception_ptr pointer(exceptions.front());
503  exceptions.pop_front();
504  std::rethrow_exception(pointer);
505  }
506  }

◆ resize()

void Beatmup::ThreadPool::resize ( ThreadIndex  newThreadCount)
inline

Resizes the pool.

Blocking if there is a task running.

Parameters
newThreadCountThe new number of worker threads

Definition at line 514 of file thread_pool.hpp.

514  {
515  if (newThreadCount == threadCount)
516  return;
517  std::unique_lock<std::mutex> jobsLock(jobsAccess);
518  // wait for task, if any
519  while (!jobs.empty())
520  jobsCvar.wait(jobsLock);
521 
522  std::unique_lock<std::mutex> workersLock(workersAccess);
523  // set termination flags for threads to be stopped, if any
524  for (ThreadIndex t = newThreadCount; t < threadCount; t++)
525  workers[t]->isTerminating = true;
526 
527  // unlock and notify
528  workersLock.unlock();
529  synchroCvar.notify_all();
530  jobsCvar.notify_all();
531  workersCvar.notify_all();
532 
533  // join
534  for (ThreadIndex t = newThreadCount; t < threadCount; t++) {
535  workers[t]->internalThread.join();
536  delete workers[t];
537  }
538 
539  // spawn new threads if needed
540  workersLock.lock();
541  if (threadCount < newThreadCount) {
542  TaskThreadImpl** newWorkers = new TaskThreadImpl*[newThreadCount];
543  for (ThreadIndex t = 0; t < threadCount; t++)
544  newWorkers[t] = workers[t];
545  for (ThreadIndex t = threadCount; t < newThreadCount; t++)
546  newWorkers[t] = new TaskThreadImpl(t, *this);
547  delete[] workers;
548  workers = newWorkers;
549  }
550  // update thread count
551  threadCount = newThreadCount;
552 
553  workersLock.unlock();
554  jobsLock.unlock();
555  }

◆ submitTask()

Job Beatmup::ThreadPool::submitTask ( AbstractTask task,
const TaskExecutionMode  mode 
)
inline

Adds a new task to the jobs queue.

Definition at line 561 of file thread_pool.hpp.

561  {
562  std::unique_lock<std::mutex> lock(jobsAccess);
563  const Job job = jobCounter++;
564 
565  // add new job
566  jobs.emplace_back(JobContext{job, &task, mode});
567 
568  lock.unlock();
569  jobsCvar.notify_all();
570  return job;
571  }
JNIEnv jlong jint mode
Beatmup::NNets::InferenceTask * task

◆ repeatTask()

Job Beatmup::ThreadPool::repeatTask ( AbstractTask task,
bool  abortCurrent 
)
inline

Ensures a given task executed at least once.

Parameters
taskThe task
abortCurrentif true and the task is currently running, abort signal is sent.

Definition at line 579 of file thread_pool.hpp.

579  {
580  std::unique_lock<std::mutex> lock(jobsAccess);
581 
582  // check if the rask is running now, ask for repeat if it is
583  if (!jobs.empty() && jobs.front().task == &task) {
584  repeatFlag = true;
585  if (abortCurrent)
586  abortExternally = true;
587  return jobs.front().id;
588  }
589 
590  // check whether it is in the queue
591  for (const JobContext& _ : jobs)
592  if (_.task == &task) {
593  return _.id;
594  }
595 
596  // otherwise submit the task
597  const Job job = jobCounter++;
598  jobs.emplace_back(JobContext{
599  job,
600  &task,
602  });
603 
604  // unlock the jobs access, notify workers
605  lock.unlock();
606  jobsCvar.notify_all();
607  return job;
608  }
@ NORMAL
normal task, should be run it once

◆ waitForJob()

void Beatmup::ThreadPool::waitForJob ( Job  job)
inline

Blocks until a given job finishes if not yet.

Definition at line 614 of file thread_pool.hpp.

614  {
615  std::unique_lock<std::mutex> lock(jobsAccess);
616 #ifdef BEATMUP_DEBUG
618  class BlockingOnPersistentJob : public Exception {
619  public:
620  BlockingOnPersistentJob(): Exception("Waiting for a persistent job to finish: potential deadlock") {}
621  };
622  throw BlockingOnPersistentJob();
623  }
624 #endif
625  while (true) {
626  // check if the job is in the queue
627  bool found = false;
628  for (const JobContext& _ : jobs)
629  if (_.id == job) {
630  found = true;
631  break;
632  }
633 
634  // if not, done
635  if (!found)
636  return;
637 
638  // otherwise wait a round and check again
639  jobsCvar.wait(lock);
640  }
641  }

◆ abortJob()

bool Beatmup::ThreadPool::abortJob ( Job  job)
inline

Aborts a given submitted job.

Returns
true if the job was interrupted while running.

Definition at line 648 of file thread_pool.hpp.

648  {
649  std::unique_lock<std::mutex> lock(jobsAccess);
650 
651  // check if the rask is running now, abort if it is
652  if (currentJob.task && currentJob.id == job) {
653  abortExternally = true;
654  while (!jobs.empty() && currentJob.task && currentJob.id == job)
655  jobsCvar.wait(lock);
656  return true;
657  }
658 
659  for (auto it = jobs.begin(); it != jobs.end(); it++)
660  if (it->id == job) {
661  jobs.erase(it);
662  return false;
663  }
664 
665  return false;
666  }

◆ wait()

void Beatmup::ThreadPool::wait ( )
inline

Blocks until all the submitted jobs are executed.

Definition at line 672 of file thread_pool.hpp.

672  {
673  std::unique_lock<std::mutex> lock(jobsAccess);
674  while (!jobs.empty())
675  jobsCvar.wait(lock);
676  }

◆ busy()

bool Beatmup::ThreadPool::busy ( )
inline

Checks whether the pool has jobs.

Definition at line 682 of file thread_pool.hpp.

682  {
683  std::lock_guard<std::mutex> lock(jobsAccess);
684  return !jobs.empty();
685  }

◆ hardwareConcurrency()

static ThreadIndex Beatmup::ThreadPool::hardwareConcurrency ( )
inlinestatic

Returns optimal number of threads depending on the hardware capabilities.

Definition at line 691 of file thread_pool.hpp.

691  {
692  unsigned int N = std::thread::hardware_concurrency();
693  RuntimeError::check(N > 0, "Unable to determine hardware concurrency capabilities.");
695  }
static void check(const bool condition, const std::string &message)
Definition: exception.h:64
static const ThreadIndex MAX_THREAD_INDEX
maximum possible thread index value
Definition: parallelism.h:71

◆ isGpuQueried()

bool Beatmup::ThreadPool::isGpuQueried ( ) const
inline
Returns
true if GPU was queried.

Definition at line 701 of file thread_pool.hpp.

701  {
702  return isGpuTested;
703  }

◆ isGpuReady()

bool Beatmup::ThreadPool::isGpuReady ( ) const
inline
Returns
true if GPU is ready to use.

Definition at line 709 of file thread_pool.hpp.

709  {
710  return gpu != nullptr;
711  }

◆ isManagingThread()

bool Beatmup::ThreadPool::isManagingThread ( ) const
inline
Returns
true if invoked from the manager thread

Definition at line 716 of file thread_pool.hpp.

716  {
717  if (!workers)
718  return false;
719  return std::this_thread::get_id() == workers[0]->internalThread.get_id();
720  }

◆ getThreadCount()

ThreadIndex Beatmup::ThreadPool::getThreadCount ( ) const
inline

Definition at line 722 of file thread_pool.hpp.

722 { return threadCount; }

Member Data Documentation

◆ workers

TaskThreadImpl** Beatmup::ThreadPool::workers
private

workers instances

Definition at line 410 of file thread_pool.hpp.

◆ gpu

GraphicPipeline* Beatmup::ThreadPool::gpu
private

THE graphic pipeline to run tasks on GPU.

Definition at line 412 of file thread_pool.hpp.

◆ jobs

std::deque<JobContext> Beatmup::ThreadPool::jobs
private

jobs queue

Definition at line 414 of file thread_pool.hpp.

◆ exceptions

std::deque<std::exception_ptr> Beatmup::ThreadPool::exceptions
private

exceptions thrown in this pool

Definition at line 416 of file thread_pool.hpp.

◆ currentJob

JobContext Beatmup::ThreadPool::currentJob
private

job being run at the moment

Definition at line 418 of file thread_pool.hpp.

◆ jobCounter

Job Beatmup::ThreadPool::jobCounter
private

Definition at line 419 of file thread_pool.hpp.

◆ threadCount

ThreadIndex Beatmup::ThreadPool::threadCount
private

actual number of workers

Definition at line 421 of file thread_pool.hpp.

◆ currentWorkerCount

ThreadIndex Beatmup::ThreadPool::currentWorkerCount
private

Definition at line 424 of file thread_pool.hpp.

◆ remainingWorkers

ThreadIndex Beatmup::ThreadPool::remainingWorkers
private

number of workers performing the current task right now

Definition at line 425 of file thread_pool.hpp.

◆ synchroCvar

std::condition_variable Beatmup::ThreadPool::synchroCvar
private

gets notified about workers synchronization

Definition at line 428 of file thread_pool.hpp.

◆ jobsCvar

std::condition_variable Beatmup::ThreadPool::jobsCvar
private

gets notified about the jobs queue updates

Definition at line 429 of file thread_pool.hpp.

◆ workersCvar

std::condition_variable Beatmup::ThreadPool::workersCvar
private

gets notified about workers lifecycle updates

Definition at line 430 of file thread_pool.hpp.

◆ synchro

std::mutex Beatmup::ThreadPool::synchro
private

workers synchronization control within the current task

Definition at line 433 of file thread_pool.hpp.

◆ workersAccess

std::mutex Beatmup::ThreadPool::workersAccess
private

workers lifecycle access control

Definition at line 434 of file thread_pool.hpp.

◆ jobsAccess

std::mutex Beatmup::ThreadPool::jobsAccess
private

jobs queue access control

Definition at line 435 of file thread_pool.hpp.

◆ exceptionsAccess

std::mutex Beatmup::ThreadPool::exceptionsAccess
private

exceptions queue access control

Definition at line 436 of file thread_pool.hpp.

◆ syncHitsCount

int Beatmup::ThreadPool::syncHitsCount
private

number of times synchronization is hit so far by all workers together

Definition at line 439 of file thread_pool.hpp.

◆ syncHitsBound

int Beatmup::ThreadPool::syncHitsBound
private

last sync hits count when all the workers were synchronized

Definition at line 440 of file thread_pool.hpp.

◆ isGpuTested

bool Beatmup::ThreadPool::isGpuTested
private

if true, there was an attempt to warm up the GPU

Definition at line 443 of file thread_pool.hpp.

◆ abortExternally

bool Beatmup::ThreadPool::abortExternally
private

if true, the task is aborted externally

Definition at line 444 of file thread_pool.hpp.

◆ abortInternally

bool Beatmup::ThreadPool::abortInternally
private

if true, the task aborts itself: beforeProcessing(), process() or processOnGPU() returned false

Definition at line 445 of file thread_pool.hpp.

◆ failFlag

bool Beatmup::ThreadPool::failFlag
private

communicates to all the threads that the current task is to skip because of a problem

Definition at line 446 of file thread_pool.hpp.

◆ repeatFlag

bool Beatmup::ThreadPool::repeatFlag
private

if true, the current task is asked to be repeated

Definition at line 447 of file thread_pool.hpp.

◆ eventListener

EventListener& Beatmup::ThreadPool::eventListener
private

Definition at line 449 of file thread_pool.hpp.

◆ myIndex

const PoolIndex Beatmup::ThreadPool::myIndex

the index of the current pool

Definition at line 452 of file thread_pool.hpp.


The documentation for this class was generated from the following file: