Code::Blocks  SVN r11506
cbthreadpool.h
Go to the documentation of this file.
1 /*
2  * This file is part of the Code::Blocks IDE and licensed under the GNU Lesser General Public License, version 3
3  * http://www.gnu.org/licenses/lgpl-3.0.html
4  */
5 
6 #ifndef CBTHREADPOOL_H
7 #define CBTHREADPOOL_H
8 
9 #include <wx/thread.h>
10 #include <wx/event.h>
11 #include <vector>
12 #include <list>
13 
14 #include "cbthreadedtask.h"
15 #include "settings.h"
16 #include "prep.h"
17 
20 {
21  public:
28  cbThreadPool(wxEvtHandler *owner, int id = -1, int concurrentThreads = -1, unsigned int stackSize = 0);
29 
31  ~cbThreadPool();
32 
38  void SetConcurrentThreads(int concurrentThreads);
39 
46  int GetConcurrentThreads() const;
47 
49  int GetId() const { return m_ID; }
50 
56  void AddTask(cbThreadedTask *task, bool autodelete = true);
57 
62  void AbortAllTasks();
63 
68  bool Done() const;
69 
78  void BatchBegin();
79 
85  void BatchEnd();
86 
87  private:
88 
90  /* class for counted reference semantics
91  * - deletes the object to which it refers when the last CountedPtr
92  * that refers to it is destroyed
93  */
94  template <typename T>
95  class CountedPtr
96  {
97  private:
98  T *ptr; // pointer to the value
99  long *count; // shared number of owners
100 
101  public:
102  // initialize pointer with existing pointer
103  // - requires that the pointer p is a return value of new
104  explicit CountedPtr(T *p = nullptr) : ptr(p), count(new long(1)) {}
105  // copy pointer (one more owner)
106  CountedPtr(const CountedPtr<T> &p) : ptr(p.ptr), count(p.count)
107  {
108  ++*count;
109  }
110 
111  // destructor (delete value if this was the last owner)
113  {
114  dispose();
115  }
117  CountedPtr<T> &operator = (const CountedPtr<T> &p)
118  {
119  if (this != &p)
120  {
121  dispose();
122  ptr = p.ptr;
123  count = p.count;
124  ++*count;
125  }
126 
127  return *this;
128  }
129 
131  T &operator * () const
132  {
133  return *ptr;
134  }
135 
136  T *operator -> () const
137  {
138  return ptr;
139  }
140 
141  private:
143  void dispose()
144  {
145  if (--*count == 0)
146  {
147  delete count;
148  delete ptr;
149  }
150  }
151  };
152 
158  class cbWorkerThread : public wxThread
159  {
160  public:
168 
170  ExitCode Entry() override;
171 
173  void Abort();
174 
179  bool Aborted() const;
180 
182  void AbortTask();
183 
184  private:
186  bool m_abort;
187 
190 
195 
198 
205  };
206 
207  typedef std::vector<cbWorkerThread *> WorkerThreadsArray;
208 
211  {
212  cbThreadedTaskElement(cbThreadedTask *_task = nullptr, bool _autodelete = false)
213  : task(_task),
214  autodelete(_autodelete)
215  {
216  // empty
217  }
218 
220  void Delete()
221  {
222  if (autodelete)
223  {
224  delete task;
225  task = nullptr; // better safe than sorry
226  }
227  }
228 
231  };
232 
233  typedef std::list<cbThreadedTaskElement> TasksQueue;
234 
235  wxEvtHandler *m_pOwner; // events notification will send to this guy
236  int m_ID; // id used to fill the ID field of the event
237  bool m_batching; // whether in batch mode of adding tasks
238 
239  // current number of concurrent threads, this is the maximum value of the m_workingThreads
240  // this variable should always be positive, 0 and -1 is not allowed.
242  unsigned int m_stackSize; // stack size for every threads
243 
244  // if we cannot apply the new value of concurrent threads, keep it here, usually the time to
245  // apply a scheduled value is when all the tasks is done.
247 
248  // the total threads(cbWorkerThread) are stored here, this contains all the threads either is
249  // currently running or in idle() mode.
250  WorkerThreadsArray m_threads;
251 
252  // the pending tasks (cbThreadedTaskElement), usually we have many tasks to run in the pool, but
253  // we have limited number of threads to run those tasks, so tasks which don't have the chance to
254  // run will be put in the queue. Once a thread finishes a task, it will fetch a new task from
255  // this task queue.
256  TasksQueue m_tasksQueue;
257 
258  // true if any task added, reset to false if all the tasks is done
260 
265 
266  mutable wxMutex m_Mutex; // we better be safe, protect the change of member variables
267 
268  // used to synchronize the Worker Threads, the counted value is that how many threads are
269  // sharing this semaphore. The semaphore's initial value is the thread number we can used to
270  // run the tasks.
271  // initial counted value = m_concurrentThreads
272  // the value of semaphore = the number of threads in Idle mode
274 
275  void _SetConcurrentThreads(int concurrentThreads); // like SetConcurrentThreads, but non-thread safe
276 
277  // awakes all threads, so they will leave from the Idle mode to working mode
278  // this is used when we are going to abort all the threads, there are two
279  // cases we need to call Broadcast(), one is the destructor, the other is the user need to
280  // change the concurrent thread numbers, so we abort all the threads, and re-create them again.
281  void Broadcast();
282 
283  // awakes only a few threads, this usually happens when we add some tasks, and there are some
284  // threads which is currently in idle mode, so we can awake these idle threads to run tasks.
285  void AwakeNeeded();
286 
287 
288  protected:
289  friend class cbWorkerThread;
290 
295  cbThreadedTaskElement GetNextTask();
296 
301  void WorkingThread();
302 
311  bool WaitingThread();
312 
317  void TaskDone(cbWorkerThread *thread);
318 };
319 
320 /* ************************************************ */
321 /* **************** INLINE MEMBERS **************** */
322 /* ************************************************ */
323 
324 inline cbThreadPool::cbThreadPool(wxEvtHandler *owner, int id, int concurrentThreads, unsigned int stackSize)
325 : m_pOwner(owner),
326  m_ID(id),
327  m_batching(false),
328  m_concurrentThreads(-1),
329  m_stackSize(stackSize),
330  m_concurrentThreadsSchedule(0),
331  m_taskAdded(false),
332  m_workingThreads(0),
333  m_semaphore(new wxSemaphore)
334 {
335  // m_concurrentThreads will be set to a positive integer value.
336  SetConcurrentThreads(concurrentThreads);
337 }
338 
340 {
341  wxMutexLocker lock(m_Mutex);
343 }
344 
345 inline bool cbThreadPool::Done() const
346 {
347  wxMutexLocker lock(m_Mutex);
348  return m_workingThreads == 0;
349 }
350 
352 {
353  wxMutexLocker lock(m_Mutex);
354  m_batching = true;
355 }
356 
358 {
359  // if m_concurrentThreads == -1, which means the pool is not initialized yet
360  if (m_concurrentThreads == -1)
361  return;
362  // let the idle(pending) worker thread to execute tasks, those worker threads are waiting for semaphore
363  for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads - m_workingThreads); ++i)
364  m_semaphore->Post();
365 }
366 
368 {
369  // if m_concurrentThreads == -1, which means the pool is not initialized yet
370  if (m_concurrentThreads == -1)
371  return;
372 
373  // the thread number to awake should be less than the idle thread number and the tasks queue's size
374  std::size_t awakeThreadNumber = std::min<std::size_t>(m_tasksQueue.size(),
376  for (std::size_t i = 0; i < awakeThreadNumber; ++i)
377  m_semaphore->Post();
378 }
379 
380 #endif //CBTHREADPOOL_H
TasksQueue m_tasksQueue
Definition: cbthreadpool.h:256
cbThreadedTask * m_pTask
a pointer to the running task
Definition: cbthreadpool.h:197
bool Done() const
Tells if the pool has finished its job.
Definition: cbthreadpool.h:345
unsigned int m_stackSize
Definition: cbthreadpool.h:242
void AwakeNeeded()
Definition: cbthreadpool.h:367
A Thread Pool implementation.
Definition: cbthreadpool.h:19
int m_concurrentThreadsSchedule
Definition: cbthreadpool.h:246
wxEvtHandler * m_pOwner
Definition: cbthreadpool.h:235
std::vector< cbWorkerThread * > WorkerThreadsArray
Definition: cbthreadpool.h:207
cbThreadedTaskElement(cbThreadedTask *_task=nullptr, bool _autodelete=false)
Definition: cbthreadpool.h:212
cbThreadPool * m_pPool
point to the pool which the thread belong to
Definition: cbthreadpool.h:189
wxMutex m_Mutex
Definition: cbthreadpool.h:266
wxSemaError Post()
WorkerThreadsArray m_threads
Definition: cbthreadpool.h:250
#define DLLIMPORT
Definition: settings.h:16
CountedPtr(const CountedPtr< T > &p)
Definition: cbthreadpool.h:106
CountedPtr< wxSemaphore > m_semaphore
Definition: cbthreadpool.h:273
A Worker Thread class.
Definition: cbthreadpool.h:158
wxMutex m_taskMutex
to protect the member variable accessing from multiply threads lock the access to the m_pTask cbWorke...
Definition: cbthreadpool.h:204
int GetConcurrentThreads() const
Gets the current number of threads in the pool.
Definition: cbthreadpool.h:339
CountedPtr< wxSemaphore > m_semaphore
a pointer to the wxSemaphore it is a counted semaphore pointer shared with all the cbWorkerThread ...
Definition: cbthreadpool.h:194
All tasks are added to one of these. It&#39;ll also save the autodelete value.
Definition: cbthreadpool.h:210
cbThreadPool(wxEvtHandler *owner, int id=-1, int concurrentThreads=-1, unsigned int stackSize=0)
cbThreadPool ctor
Definition: cbthreadpool.h:324
Josuttis&#39; implementation of CountedPtr.
Definition: cbthreadpool.h:95
void SetConcurrentThreads(int concurrentThreads)
Changes the number of threads in the pool.
void Broadcast()
Definition: cbthreadpool.h:357
void Delete()
It&#39;ll delete the task only if it was set to.
Definition: cbthreadpool.h:220
int GetId() const
return the pool ID
Definition: cbthreadpool.h:49
CountedPtr(T *p=nullptr)
Definition: cbthreadpool.h:104
This is what you have to use instead of wxThread to add tasks to the Thread Pool. ...
void BatchBegin()
Begin a batch process.
Definition: cbthreadpool.h:351
bool m_abort
whether is is aborted or not
Definition: cbthreadpool.h:186
int m_workingThreads
how many working threads are running tasks m_workingThreads + thread in Idle = m_concurrentThreads ...
Definition: cbthreadpool.h:264
std::list< cbThreadedTaskElement > TasksQueue
Definition: cbthreadpool.h:233
const wxString ptr(_T("*"))
int m_concurrentThreads
Definition: cbthreadpool.h:241
void dispose()
decrease the counter, and if it get 0, destroy both counter and value
Definition: cbthreadpool.h:143
void * ExitCode