145 std::unique_lock<std::mutex> lock(
jobsAccess, std::defer_lock);
146 std::unique_lock<std::mutex> workersLock(
workersAccess, std::defer_lock);
177 bool useGpuForCurrentTask =
false;
190 exceptions.push_back(std::current_exception());
196 useGpuForCurrentTask = (
gpu !=
nullptr);
204 "A task requires GPU, but GPU init is failed" :
205 "A task requiring GPU may only be run in the main pool"
212 exceptions.push_back(std::current_exception());
235 workersLock.unlock();
254 exceptions.push_back(std::current_exception());
279 exceptions.push_back(std::current_exception());
283 workersLock.unlock();
286 if (useGpuForCurrentTask &&
gpu) {
291 bool internalRepeatFlag =
false;
326 std::this_thread::yield();
341 if (!
job.task->process(thread)) {
352 exceptions.push_back(std::current_exception());
378 std::unique_lock<std::mutex> lock(
synchro);
415 std::deque<std::exception_ptr>
427 std::condition_variable
480 workers[t]->isTerminating =
true;
502 std::exception_ptr pointer(
exceptions.front());
504 std::rethrow_exception(pointer);
517 std::unique_lock<std::mutex> jobsLock(
jobsAccess);
519 while (!
jobs.empty())
525 workers[t]->isTerminating =
true;
528 workersLock.unlock();
553 workersLock.unlock();
562 std::unique_lock<std::mutex> lock(
jobsAccess);
580 std::unique_lock<std::mutex> lock(
jobsAccess);
587 return jobs.front().id;
592 if (_.task == &
task) {
615 std::unique_lock<std::mutex> lock(
jobsAccess);
618 class BlockingOnPersistentJob :
public Exception {
620 BlockingOnPersistentJob():
Exception(
"Waiting for a persistent job to finish: potential deadlock") {}
622 throw BlockingOnPersistentJob();
649 std::unique_lock<std::mutex> lock(
jobsAccess);
659 for (
auto it =
jobs.begin(); it !=
jobs.end(); it++)
673 std::unique_lock<std::mutex> lock(
jobsAccess);
674 while (!
jobs.empty())
684 return !
jobs.empty();
692 unsigned int N = std::thread::hardware_concurrency();
710 return gpu !=
nullptr;
Task: an operation that can be executed by multiple threads in parallel.
TaskDeviceRequirement
Specifies which device (CPU and/or GPU) is used to run the task.
@ CPU_ONLY
this task does not use GPU
@ GPU_ONLY
this task requires GPU, otherwise it cannot run
virtual TaskDeviceRequirement getUsedDevices() const
Communicates devices (CPU and/or GPU) the task is run on.
virtual void beforeProcessing(ThreadIndex threadCount, ProcessingTarget target, GraphicPipeline *gpu)
Instruction called before the task is executed.
virtual bool processOnGPU(GraphicPipeline &gpu, TaskThread &thread)
Executes the task on GPU.
virtual ThreadIndex getMaxThreads() const
Gives the upper limint on the number of threads the task may be performed by.
virtual void afterProcessing(ThreadIndex threadCount, GraphicPipeline *gpu, bool aborted)
Instruction called after the task is executed.
virtual bool process(TaskThread &thread)=0
Executes the task on CPU within a given thread.
Base class for all exceptions.
Internal low-level GPU control API.
void flush()
Waits until all operations submitted to GPU are finished.
static void check(const bool condition, const std::string &message)
virtual bool taskDone(PoolIndex pool, AbstractTask &task, bool aborted)
Callback function called when a task is done or cancelled; if returns true, the task will be repeated...
virtual void taskFail(PoolIndex pool, AbstractTask &task, const std::exception_ptr exPtr)
Callback function called when an exception is thrown.
virtual void threadCreated(PoolIndex pool)
Callback function called when a new worker thread is created.
virtual void gpuInitFail(PoolIndex pool, const std::exception_ptr exPtr)
Callback function called when the GPU cannot start.
virtual void threadTerminating(PoolIndex pool)
Callback function called when a worker thread is terminating.
bool isTerminating
if true, the thread is requested to terminate
virtual ~TaskThreadImpl()
void synchronize()
Blocks until all the other threads running the same task reach the same point.
bool isRunning
if not, the thread sleeps
ThreadIndex index
current thread index
std::thread internalThread
worker thread
ThreadIndex numThreads() const
TaskThreadImpl(ThreadIndex index, ThreadPool &pool)
bool isTaskAborted() const
Returns true if the task is asked to stop from outside.
A pool of threads running tasks ThreadPool runs AbstractTasks, possibly in multiple threads.
Job submitTask(AbstractTask &task, const TaskExecutionMode mode)
Adds a new task to the jobs queue.
ThreadPool(const PoolIndex index, const ThreadIndex limitThreadCount, EventListener &listener)
void resize(ThreadIndex newThreadCount)
Resizes the pool.
std::mutex jobsAccess
jobs queue access control
TaskExecutionMode
Way how to the task should be run in the pool.
@ PERSISTENT
persistent task, run process() until it returns false
@ NORMAL
normal task, should be run it once
ThreadIndex threadCount
actual number of workers
void waitForJob(Job job)
Blocks until a given job finishes if not yet.
std::mutex workersAccess
workers lifecycle access control
bool abortInternally
if true, the task aborts itself: beforeProcessing(), process() or processOnGPU() returned false
bool failFlag
communicates to all the threads that the current task is to skip because of a problem
std::condition_variable synchroCvar
gets notified about workers synchronization
std::deque< JobContext > jobs
jobs queue
void managingThreadFunc(TaskThreadImpl &thread)
Managing (#0) worker thread function.
const PoolIndex myIndex
the index of the current pool
ThreadPool(const ThreadPool &)=delete
TaskThreadImpl ** workers
workers instances
Job repeatTask(AbstractTask &task, bool abortCurrent)
Ensures a given task executed at least once.
ThreadIndex currentWorkerCount
ThreadIndex getThreadCount() const
bool abortJob(Job job)
Aborts a given submitted job.
bool repeatFlag
if true, the current task is asked to be repeated
JobContext currentJob
job being run at the moment
int syncHitsCount
number of times synchronization is hit so far by all workers together
bool busy()
Checks whether the pool has jobs.
std::mutex synchro
workers synchronization control within the current task
void synchronizeThread(TaskThreadImpl &thread)
static ThreadIndex hardwareConcurrency()
Returns optimal number of threads depending on the hardware capabilities.
bool isGpuTested
if true, there was an attempt to warm up the GPU
int syncHitsBound
last sync hits count when all the workers were synchronized
bool isManagingThread() const
std::mutex exceptionsAccess
exceptions queue access control
EventListener & eventListener
std::deque< std::exception_ptr > exceptions
exceptions thrown in this pool
bool abortExternally
if true, the task is aborted externally
void workerThreadFunc(TaskThreadImpl &thread)
Ordinary worker thread function.
GraphicPipeline * gpu
THE graphic pipeline to run tasks on GPU.
std::condition_variable jobsCvar
gets notified about the jobs queue updates
void check()
Checks if the thread pool is doing great.
bool isGpuQueried() const
ThreadIndex remainingWorkers
number of workers performing the current task right now
std::condition_variable workersCvar
gets notified about workers lifecycle updates
void wait()
Blocks until all the submitted jobs are executed.
unsigned char ThreadIndex
number of threads / thread index
static const ThreadIndex MAX_THREAD_INDEX
maximum possible thread index value
unsigned char PoolIndex
number of tread pools or a pool index
CustomPoint< numeric > min(const CustomPoint< numeric > &a, const CustomPoint< numeric > &b)
JNIEnv jlong jint jint job
Beatmup::Context::EventListener * listener
Beatmup::NNets::InferenceTask * task