Beatmup
custom_pipeline.cpp
Go to the documentation of this file.
1 /*
2  Beatmup image and signal processing library
3  Copyright (C) 2019, lnstadrum
4 
5  This program is free software: you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18 
19 #include "../exception.h"
20 #include "custom_pipeline.h"
21 #include <algorithm>
22 #include <chrono>
23 
24 using namespace Beatmup;
25 
27 private:
28  std::vector<TaskHolder*>::iterator currentTask;
29  std::vector<TaskHolder*> tasks; //!< the list of tasks
30  std::mutex tasksAccess; //!< task list access control
35  bool measured; //!< if `true`, the execution mode and the thread count are determined
36  bool abort; //!< if `true`, one of threads executing the current task caused its aborting
37 
38 public:
39  Impl():
40  measured(false)
41  {}
42 
43  virtual ~Impl() {
44  // destroying taskholders
45  for (auto task : tasks)
46  delete task;
47  }
48 
50  return **currentTask;
51  }
52 
53  const TaskHolder& getCurrentTask() const {
54  return **currentTask;
55  }
56 
57  void runTask() {
58  auto startTime = std::chrono::high_resolution_clock::now();
60  task.getTask().beforeProcessing(
61  task.threadCount,
63  gpu
64  );
65 
66  // wait for other workers
68 
69  // perform the task
70  bool result;
72  result = task.getTask().processOnGPU(*gpu, *thread);
73  else
74  result = task.getTask().process(*thread);
75 
76  if (!result)
77  abort = true;
78 
79  // wait for other workers
81 
82  task.getTask().afterProcessing(
83  task.threadCount,
84  task.executionMode != AbstractTask::TaskDeviceRequirement::CPU_ONLY && gpu ? gpu : nullptr,
85  !abort
86  );
87 
88  auto endTime = std::chrono::high_resolution_clock::now();
89  task.time = std::chrono::duration<float, std::milli>(endTime - startTime).count();
90  }
91 
92  void goToNextTask() {
93  currentTask++;
94  }
95 
96  bool allTasksDone() const {
97  return currentTask >= tasks.end();
98  }
99 
100  bool allTasksAborted() const {
101  return abort && thread->isTaskAborted();
102  }
103 
104 
105  int getTaskCount() {
106  std::lock_guard<std::mutex> lock(tasksAccess);
107  return (int) tasks.size();
108  }
109 
111  std::lock_guard<std::mutex> lock(tasksAccess);
112  BEATMUP_ASSERT_DEBUG(0 <= index && index < tasks.size());
113  return tasks[index];
114  }
115 
116  int getTaskIndex(const TaskHolder* holder) {
117  std::lock_guard<std::mutex> lock(tasksAccess);
118  const auto& it = std::find(tasks.cbegin(), tasks.cend(), holder);
119  if (it == tasks.cend())
120  return -1;
121  return (int) (it - tasks.cbegin());
122  }
123 
125  std::lock_guard<std::mutex> lock(tasksAccess);
126  measured = false;
127  tasks.push_back(taskHolder);
128  }
129 
130  void insertTask(TaskHolder* newbie, const TaskHolder* before) {
131  std::lock_guard<std::mutex> lock(tasksAccess);
132  const auto& nextHolder = std::find(tasks.cbegin(), tasks.cend(), before);
133  if (nextHolder == tasks.cend())
134  throw RuntimeError("Reference task holder is not found in the task list");
135  measured = false;
136  tasks.insert(nextHolder , newbie);
137  }
138 
139  bool removeTask(const TaskHolder* target) {
140  std::lock_guard<std::mutex> lock(tasksAccess);
141  const auto& pointer = std::find(tasks.cbegin(), tasks.cend(), target);
142  if (pointer == tasks.cend())
143  return false;
144  delete *pointer;
145  tasks.erase(pointer);
146  return true;
147  }
148 
149  /**
150  * Determining execution mode (GPU or CPU) and thread count for each task
151  */
152  void measure() {
153  std::lock_guard<std::mutex> lock(tasksAccess);
155  maxThreadCount = 0;
156  for (auto& it : tasks) {
157  switch (it->executionMode = it->getTask().getUsedDevices()) {
160  break;
164  break;
165  default:
166  break;
167  }
168  it->threadCount = it->getTask().getMaxThreads();
169  maxThreadCount = std::max(maxThreadCount, it->threadCount);
170  }
171  measured = true;
172  }
173 
175  if (!measured)
176  throw PipelineNotReady("Pipeline not measured; call measure() first.");
177  return executionMode;
178  }
179 
181  if (!measured)
182  throw PipelineNotReady("Pipeline not measured; call measure() first.");
183  return maxThreadCount;
184  }
185 
187  this->gpu = gpu;
188  abort = false;
189  }
190 
191  /**
192  * Processing entry point
193  */
195  // managing worker thread
196  if (thread.isManaging()) {
197  this->thread = &thread;
198  std::lock_guard<std::mutex> lock(tasksAccess);
199  currentTask = tasks.begin();
200  pipeline.route(*this);
201  }
202 
203  // secondary worker thread
204  else {
205  do {
207  if (!allTasksDone() && !allTasksAborted() && thread.currentThread() < (*currentTask)->threadCount)
208  if (!(*currentTask)->getTask().process(thread))
209  abort = true;
211  } while (!allTasksDone() && !allTasksAborted());
212  }
213 
214  return !abort;
215  }
216 };
217 
218 
220  return impl->getUsedDevices();
221 }
222 
224  return impl->getMaxThreads();
225 }
226 
228  impl->beforeProcessing(gpu);
229 }
230 
231 void CustomPipeline::afterProcessing(ThreadIndex threadCount, GraphicPipeline* gpu, bool aborted) {
232  AbstractTask::afterProcessing(threadCount, gpu, aborted);
233 }
234 
236  return impl->process(nullptr, thread, *this);
237 }
238 
240  return impl->process(&gpu, thread, *this);
241 }
242 
244  impl(new Impl())
245 {}
246 
248  delete impl;
249 }
250 
252  return impl->getTaskCount();
253 }
254 
256  return *impl->getTask(index);
257 }
258 
260  return impl->getTaskIndex(&task);
261 }
262 
264  TaskHolder* holder = createTaskHolder(task);
265  impl->addTask(holder);
266  return *holder;
267 }
268 
270  TaskHolder* holder = createTaskHolder(task);
271  impl->insertTask(holder, &before);
272  return *holder;
273 }
274 
276  return impl->removeTask(&task);
277 }
278 
280  impl->measure();
281 }
282 
284  task(holder.task),
285  executionMode(holder.executionMode),
286  threadCount(holder.threadCount),
287  time(0)
288 {}
289 
290 
292  return &left == &right;
293 }
Task: an operation that can be executed by multiple threads in parallel.
Definition: parallelism.h:90
TaskDeviceRequirement
Specifies which device (CPU and/or GPU) is used to run the task.
Definition: parallelism.h:95
@ GPU_OR_CPU
this task uses GPU if it is available, but CPU fallback is possible
@ CPU_ONLY
this task does not use GPU
@ GPU_ONLY
this task requires GPU, otherwise it cannot run
virtual void afterProcessing(ThreadIndex threadCount, GraphicPipeline *gpu, bool aborted)
Instruction called after the task is executed.
Definition: parallelism.cpp:29
A task within a pipeline.
Custom pipeline: a sequence of tasks to be executed as a whole.
bool removeTask(const TaskHolder &task)
Removes a task from the pipeline.
TaskDeviceRequirement getUsedDevices() const
Communicates devices (CPU and/or GPU) the task is run on.
void afterProcessing(ThreadIndex threadCount, GraphicPipeline *gpu, bool aborted)
Instruction called after the task is executed.
TaskHolder & getTask(int) const
Retrieves a task by its index.
int getTaskIndex(const TaskHolder &)
Retrieves task index if it is in the pipeline; returns -1 otherwise.
TaskHolder & insertTask(AbstractTask &task, const TaskHolder &before)
Inserts a task in a specified position of the pipeline before another task.
TaskHolder & addTask(AbstractTask &)
Adds a new task to the end of the pipeline.
bool processOnGPU(GraphicPipeline &gpu, TaskThread &thread)
Executes the task on GPU.
bool process(TaskThread &thread)
Executes the task on CPU within a given thread.
ThreadIndex getMaxThreads() const
Gives the upper limint on the number of threads the task may be performed by.
void measure()
Determines pipeline execution mode and required thread count.
virtual TaskHolder * createTaskHolder(AbstractTask &task)=0
void beforeProcessing(ThreadIndex threadCount, ProcessingTarget target, GraphicPipeline *gpu)
Instruction called before the task is executed.
Internal low-level GPU control API.
Definition: pipeline.h:33
void afterProcessing(ThreadIndex threadCount, GraphicPipeline *gpu, bool aborted) override
Instruction called after the task is executed.
bool process(TaskThread &thread) override
Executes the task on CPU within a given thread.
void beforeProcessing(ThreadIndex threadCount, ProcessingTarget target, GraphicPipeline *gpu) override
Instruction called before the task is executed.
bool processOnGPU(GraphicPipeline &gpu, TaskThread &thread) override
Executes the task on GPU.
Interface managing the execution of a sequence of tasks.
Thread executing tasks.
Definition: parallelism.h:154
bool isManaging() const
Definition: parallelism.h:172
virtual void synchronize()=0
Blocks until all the other threads running the same task reach the same point.
ThreadIndex currentThread() const
Definition: parallelism.h:165
virtual bool isTaskAborted() const =0
Returns true if the task is asked to stop from outside.
const TaskHolder & getCurrentTask() const
std::vector< TaskHolder * > tasks
the list of tasks
AbstractTask::TaskDeviceRequirement getUsedDevices() const
void insertTask(TaskHolder *newbie, const TaskHolder *before)
std::mutex tasksAccess
task list access control
bool measured
if true, the execution mode and the thread count are determined
void addTask(TaskHolder *taskHolder)
void beforeProcessing(GraphicPipeline *gpu)
bool allTasksAborted() const
Returns true if the current session is aborted.
bool removeTask(const TaskHolder *target)
void runTask()
Executes the pointed task.
void goToNextTask()
Goes to the next task in the list.
std::vector< TaskHolder * >::iterator currentTask
bool allTasksDone() const
Returns true if all tasks are done.
AbstractTask::TaskDeviceRequirement executionMode
TaskHolder & getCurrentTask()
Returns currently pointed task.
TaskHolder * getTask(int index)
void measure()
Determining execution mode (GPU or CPU) and thread count for each task.
int getTaskIndex(const TaskHolder *holder)
bool abort
if true, one of threads executing the current task caused its aborting
ThreadIndex getMaxThreads() const
bool process(GraphicPipeline *gpu, TaskThread &thread, CustomPipeline &pipeline)
Processing entry point.
#define BEATMUP_ASSERT_DEBUG(C)
Definition: exception.h:163
bool operator==(const CustomRectangle< numeric > &lhs, const CustomRectangle< numeric > &rhs)
Checks whether two rectangles are actually the same.
Definition: geometry.h:622
unsigned char ThreadIndex
number of threads / thread index
Definition: parallelism.h:68
ProcessingTarget
Definition: basic_types.h:55
CustomPoint< numeric > max(const CustomPoint< numeric > &a, const CustomPoint< numeric > &b)
Definition: geometry.h:728
jlong jint index
return pipeline getTaskIndex * taskHolder
Beatmup::CustomPipeline::TaskHolder * newbie
return $pool getJavaReference & pipeline(index)
Beatmup::IntPoint result
jlong jlong jint time
jlong jlong jint jint jint jint jint left
Beatmup::NNets::InferenceTask * task