Code::Blocks  SVN r11506
cbthreadpool.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of the Code::Blocks IDE and licensed under the GNU Lesser General Public License, version 3
3  * http://www.gnu.org/licenses/lgpl-3.0.html
4  *
5  * $Revision: 10605 $
6  * $Id: cbthreadpool.cpp 10605 2015-12-05 15:07:00Z ollydbg $
7  * $HeadURL: https://svn.code.sf.net/p/codeblocks/code/trunk/src/sdk/cbthreadpool.cpp $
8  */
9 
10 #include "sdk_precomp.h"
11 
12 #ifndef CB_PRECOMP
13  #include "sdk_events.h"
14  #include "manager.h"
15  #include "logmanager.h"
16 #endif
17 
18 #include "cbthreadpool.h"
19 #include <algorithm>
20 #include <functional>
21 
23 {
24  wxMutexLocker lock(m_Mutex);
25 
26  std::for_each(m_threads.begin(), m_threads.end(), std::mem_fun(&cbWorkerThread::Abort));
27  Broadcast(); // make every waiting thread realise it's time to die
28 
29  std::for_each(m_tasksQueue.begin(), m_tasksQueue.end(), std::mem_fun_ref(&cbThreadedTaskElement::Delete));
30 }
31 
32 void cbThreadPool::SetConcurrentThreads(int concurrentThreads)
33 {
34  // m_concurrentThreads is set here, it should always be a positive integer
35  if (concurrentThreads <= 0)
36  {
37  concurrentThreads = wxThread::GetCPUCount(); // GetCPUCount will return -1 if it failed
38  if (concurrentThreads == -1)
39  m_concurrentThreads = 1; // as a fallback, we set the value to 1
40  }
41 
42  if (concurrentThreads == m_concurrentThreads)
43  {
45  return;
46  }
47 
48  wxMutexLocker lock(m_Mutex);
49  _SetConcurrentThreads(concurrentThreads);
50 }
51 // this function is already wrappered by a mutex
52 void cbThreadPool::_SetConcurrentThreads(int concurrentThreads)
53 {
54  if (!m_workingThreads)// if pool is not running (no thread is running)
55  {
56  std::for_each(m_threads.begin(), m_threads.end(), std::mem_fun(&cbWorkerThread::Abort));
57  Broadcast();
58  m_threads.clear();
59 
60  // set a new Semaphore for the new threads, note the max value is the concurrentThreads
61  m_semaphore = CountedPtr<wxSemaphore>(new wxSemaphore(0, concurrentThreads));
62 
63  m_concurrentThreads = concurrentThreads;
65 
66  for (std::size_t i = 0; i < static_cast<std::size_t>(m_concurrentThreads); ++i)
67  {
68  m_threads.push_back(new cbWorkerThread(this, m_semaphore));
69  m_threads.back()->Create(m_stackSize);
70  m_threads.back()->Run(); // this will run cbWorkerThread::Entry()
71  }
72 
73 // Manager::Get()->GetLogManager()->DebugLog(_T("Concurrent threads for pool set to %d"), m_concurrentThreads);
74  }
75  else
76  m_concurrentThreadsSchedule = concurrentThreads;
77 }
78 
79 void cbThreadPool::AddTask(cbThreadedTask *task, bool autodelete)
80 {
81  if (!task)
82  return;
83 
84  wxMutexLocker lock(m_Mutex);
85 
86  m_tasksQueue.push_back(cbThreadedTaskElement(task, autodelete));
87  m_taskAdded = true;
88 
89  // we are in batch mode, so no need to awake the idle thread
90  // m_workingThreads < m_concurrentThreads means there are some threads in idle mode (no task assigned)
92  AwakeNeeded();
93 }
94 
96 {
97  wxMutexLocker lock(m_Mutex);
98 
99  std::for_each(m_threads.begin(), m_threads.end(), std::mem_fun(&cbWorkerThread::AbortTask));
100  std::for_each(m_tasksQueue.begin(), m_tasksQueue.end(), std::mem_fun_ref(&cbThreadedTaskElement::Delete));
101  m_tasksQueue.clear();
102 }
103 
105 {
106  wxMutexLocker lock(m_Mutex);
107  m_batching = false;
108 
109  AwakeNeeded();
110 }
111 
113 {
114  wxMutexLocker lock(m_Mutex);
115 
116  if (m_tasksQueue.empty())
117  return cbThreadedTaskElement();
118 
119  cbThreadedTaskElement element = m_tasksQueue.front();
120  m_tasksQueue.pop_front();
121 
122  return element;
123 }
124 // a thread is leaving from idle mode, and run a task
126 {
127  wxMutexLocker lock(m_Mutex);
129 }
130 // this thread is finishing the task, and is going to be idle.
131 // if there is no task left, and the total threads is running is 0, then we have all task done
132 // otherwise, just put me to the idle mode by m_semaphore->Post()
134 {
135  wxMutexLocker lock(m_Mutex);
137 
138  if (m_workingThreads <= 0 && m_tasksQueue.empty())
139  {
140  if (m_taskAdded)
141  {
142  // notify the owner that all tasks are done
144  wxPostEvent(m_pOwner, evt);
145  m_taskAdded = false;
146  }
147 
148  // The last active thread is now waiting and there's a pending new number of threads to assign...
150  {
152 
153  return false; // the thread must abort
154  }
155  }
156  else
157  m_semaphore->Post(); // return the resource back to pool
158 
159  return true;
160 }
161 
162 void cbThreadPool::TaskDone(cb_unused cbWorkerThread* thread)
163 {
164  // notify the owner that the task has ended
166  wxPostEvent(m_pOwner, evt);
167 }
168 
169 /* *********************************************** */
170 /* ******** cbWorkerThread IMPLEMENTATION ******** */
171 /* *********************************************** */
172 
174 : m_abort(false),
175  m_pPool(pool),
176  m_semaphore(semaphore),
177  m_pTask(nullptr)
178 {
179  // empty
180 }
181 
183 {
184  bool workingThread = false; // keeps the state of the thread so it knows better what to do
185  // a working thread = true means it is running a task, false means it is waiting a task
186 
187  while (!Aborted())
188  {
189  if (workingThread)
190  {
191  workingThread = false;
192 
193  // normally if pool own some resource, it will release one
194  // If a call to WaitingThread returns false, we must abort
195  if (!m_pPool->WaitingThread())
196  break;
197  // if there are still some tasks in the queue, WaitingThread() function will Post the
198  // semaphore, and we don't delay much here for the Wait() function.
199  // if there are no tasks to do, then WaitingThread() does not Post the semaphore, then
200  // we are going to Idle mode now... Until some one release the resource
201 
202  m_semaphore->Wait(); // nothing to do... so just wait until it get the resource
203  }
204 
205  if (Aborted())
206  break;
207 
208  if (!workingThread)
209  {
210  m_pPool->WorkingThread(); // time to work! thread status from idle to running
211  workingThread = true;
212  }
213 
214  // fetch a task from the task queue
216 
217  {
219  m_pTask = element.task;
220  }
221 
222  // are we done with all tasks?
223  if (!m_pTask)
224  continue;
225 
226  if (!Aborted())
227  {
228  m_pTask->Execute(); // run task's job here
229 
230  {
232  m_pTask = nullptr;
233  element.Delete();
234  }
235 
236  m_pPool->TaskDone(this); // send an notification event that one task is done.
237  }
238  }
239 
240  if (workingThread)
242 
243  return nullptr;
244 }
245 
247 {
248  m_abort = true;
249  AbortTask();
250 }
251 
253 {
254  return m_abort;
255 }
256 
258 {
260 
261  if (m_pTask)
262  m_pTask->Abort();
263 }
TasksQueue m_tasksQueue
Definition: cbthreadpool.h:256
cbThreadedTask * m_pTask
a pointer to the running task
Definition: cbthreadpool.h:197
unsigned int m_stackSize
Definition: cbthreadpool.h:242
bool Aborted() const
Tells whether we should abort or not.
void AwakeNeeded()
Definition: cbthreadpool.h:367
virtual int Execute()=0
Override this function with the task&#39;s job Return value doesn&#39;t matter.
A Thread Pool implementation.
Definition: cbthreadpool.h:19
int m_concurrentThreadsSchedule
Definition: cbthreadpool.h:246
wxEvtHandler * m_pOwner
Definition: cbthreadpool.h:235
cbThreadPool * m_pPool
point to the pool which the thread belong to
Definition: cbthreadpool.h:189
~cbThreadPool()
cbThreadPool dtor
bool WaitingThread()
Mechanism for the threads to tell the Pool they&#39;re done and will go to idle, so we can assign another...
void _SetConcurrentThreads(int concurrentThreads)
EVTIMPORT const wxEventType cbEVT_THREADTASK_ALLDONE
Definition: sdk_events.cpp:127
wxMutex m_Mutex
Definition: cbthreadpool.h:266
A generic Code::Blocks event.
Definition: sdk_events.h:20
wxSemaError Post()
WorkerThreadsArray m_threads
Definition: cbthreadpool.h:250
void wxPostEvent(wxEvtHandler *dest, const wxEvent &event)
cbThreadedTaskElement GetNextTask()
Returns the next task in the queue to run.
CountedPtr< wxSemaphore > m_semaphore
Definition: cbthreadpool.h:273
A Worker Thread class.
Definition: cbthreadpool.h:158
void BatchEnd()
End a batch process.
wxMutex m_taskMutex
to protect the member variable accessing from multiply threads lock the access to the m_pTask cbWorke...
Definition: cbthreadpool.h:204
null_pointer_t nullptr
Definition: nullptr.cpp:16
CountedPtr< wxSemaphore > m_semaphore
a pointer to the wxSemaphore it is a counted semaphore pointer shared with all the cbWorkerThread ...
Definition: cbthreadpool.h:194
All tasks are added to one of these. It&#39;ll also save the autodelete value.
Definition: cbthreadpool.h:210
void SetConcurrentThreads(int concurrentThreads)
Changes the number of threads in the pool.
EVTIMPORT const wxEventType cbEVT_THREADTASK_ENDED
Definition: sdk_events.cpp:126
void Broadcast()
Definition: cbthreadpool.h:357
ExitCode Entry() override
Entry point of this thread. The magic happens here.
void Delete()
It&#39;ll delete the task only if it was set to.
Definition: cbthreadpool.h:220
void Abort()
Tell the thread to abort. It will also tell the task to abort (if any)
void Abort()
This function is called to tell the task to abort (check cbThreadPool::AbortAllTasks) ...
void TaskDone(cbWorkerThread *thread)
Called by a Worker Thread to inform a single task has finished, this will send a cbEVT_THREADTASK_END...
This is what you have to use instead of wxThread to add tasks to the Thread Pool. ...
static int GetCPUCount()
void AbortTask()
Aborts the running task (if any)
cbWorkerThread(cbThreadPool *pool, CountedPtr< wxSemaphore > &semaphore)
cbWorkerThread ctor
void AddTask(cbThreadedTask *task, bool autodelete=true)
Adds a new task to the pool.
bool m_abort
whether is is aborted or not
Definition: cbthreadpool.h:186
int m_workingThreads
how many working threads are running tasks m_workingThreads + thread in Idle = m_concurrentThreads ...
Definition: cbthreadpool.h:264
wxSemaError Wait()
friend class cbWorkerThread
Definition: cbthreadpool.h:289
int m_concurrentThreads
Definition: cbthreadpool.h:241
void WorkingThread()
Mechanism for the threads to tell the Pool they&#39;re running, a thread is switch from the idle mode to ...
void * ExitCode
void AbortAllTasks()
Aborts all running and pending tasks.