Doug Binks - 22 Aug 2015 - edited 28 Jun 2019
This is the first in a series of articles detailing the inner workings and evolution of the permissively open source multithreading task scheduler enkiTS for C and C++ (including C++ 11). The interface has evolved somewhat since this article, and whilst I've made minor changes to update it, keeping it fully up to date is difficult, so please check out the full code on github.
enkiTS - Code and basic examples for the task scheduler described this article.
enkiTSExamples - Further enkiTS examples adding profiling instrumentation.
If you're writing a compute intensive programming task on consumer hardware, and you want to use as much of the systems resources as possible, then you'll need to consider multithreading on the CPU. There are a number of ways to approach this, but the current gold standard approach for developers who want both simplicity and control is a task scheduler which can handle data-parallelism (in the games industry a task is often referred to as a job). Task parallelism allows you to run different types of computation at the same time, whilst data-parallelism enables you to run the same computation over a set of data across different threads at the same time. Note that I won't consider Single Instruction Multiple Data (SIMD) parallelism here, but if you're doing computations you probably should. Additionally, I won't cover multithreading methods for handling large latency, such as waiting on hard disk or socket transactions.
enkiTS in our game, Avoyd, being profiled using the microprofile and Dear ImGui integration available in enkiTSExamples. This is an old image of the game and the current codebase uses newer enkiTS API features such as priorities and pinned tasks making for a much more complex task graph.
In February of 2015, I extracted the tasking system I implemented for Avoyd from its dependencies on my own code-base, and made it open source under a permissive zlib license - calling it enkiTS (enkisoftware's Task Scheduler). I'd written my own task scheduler because I wanted something lightweight with a simple API. If neither of these is a concern for you, then do check out Intel's Threaded Building Blocks as an excellent solution.
Why simple & lightweight?
Simple API - easy to use, doesn't require reading through reams of documentation, examples or header files to figure out how you're supposed to achieve something with the library.
Lightweight - easy to integrate, understand, modify and port.
Any piece of software which has seen real world use eventually accumulates extra features, system workarounds and legacy API handling sometimes referred to as technical debt but really just technical grease. The goal is to keep these in check as much as possible.
The other goals for enkiTS are:
Fast, then scalable - any approach to multithreading sacrifices some single threaded efficiency to gain scalability on a parallel architecture. enkiTS is designed for consumer systems so concentrates on relatively low numbers of threads whilst remaining as scalable as possible.
Braided parallelism - the task scheduler can issue tasks from another task as well as from the thread which created the Task System.
Zero allocations during scheduling - the task scheduler is up-front allocation friendly, with all task and scheduler allocations able to be made during initialization.
The fundamental minimum requirements for a task are a function pointer and the address of a running flag which can be used to find out if the task has finished - the task scheduler sets the flag to 1 when the task is started, and 0 when it's finished. Wrapping these up in a simple structure simplifies the interface and using C++ single inheritance makes development easier. A virtual function interface adds only a very slight overhead to a function pointer but makes usage easier (one extra indirection and hence a potential cache miss but against the scheduler overheads this is very small).
This task interface would look like:
struct ITask { virtual void Execute() = 0; std::atomic<int32_t> m_RunningFlag; };
Here the running flag uses the C++11 std::atomic template to ensure any thread checking this value will see a change made to it from another thread with certain order guarantees.
For data-parallel tasks I need to specify the size of the data set. Whilst this could be multi-dimensional, a single size value is enough to enable the task scheduler to divide up the data set. Our task function now needs to know where it should start processing, and where it should end, and the running flag is now a count. Thus I have:
struct TaskSetPartition { uint32_t start; uint32_t end; }; struct ITaskSet { virtual void Execute( TaskSetPartition range ) = 0; uint32_t m_SetSize; std::atomic<int32_t> m_RunningCount; };
For example if I wanted to square the values in an input array and output into an output array I could write:
struct ArraySquareTask: ITaskSet { float m_In[1024]; float m_Out[1024]; ArraySquareTask() { m_SetSize = 1024; // can be set via ctor in enkiTS ITaskSet } virtual void Execute( TaskSetPartition range ) { for( i = range.start; i < range.end; ++i ) { m_Out[i] = m_In[i] * m_In[i]; } } };
With this interface I can still write non-data parallel tasks by ignoring the set size and range:
struct ArraySquareTask: ITaskSet { virtual void Execute( TaskSetPartition range ) { DoSomethingComplex(); } };
In addition to this task interface, enkiTS has support for pinned tasks which can be run on a given thread rather than distributed. We won't discuss these here, but they change our interface to ITaskSet as we use a base class for completion tests which looks something like:
class ICompletable { public: ICompletable() : m_RunningCount(0) {} bool GetIsComplete() const { return 0 == m_RunningCount.load( std::memory_order_acquire ); } virtual ~ICompletable() {} private: friend class TaskScheduler; std::atomicm_RunningCount; };
More complex code may require synchronization methods to output data, and in some cases these can be avoided if I know which thread I am running on and create an array of output structures the size of the number of threads the scheduler is using. So our final interface for the taskset becomes (with the constructors I left out earlier):
struct ITaskSet : public ICompletable { ITaskSet() : m_SetSize(1) , m_RunningCount(0) {} ITaskSet( uint32_t setSize_ ) : m_SetSize( setSize_ ) , m_RunningCount(0) {} virtual void Execute( TaskSetPartition range, uint32_t threadnum ) = 0; };
This interface is called ITaskSet as it's the interface to a task which may have a set of values to iterate over. I'll use the term task to refer to it.
Tasks are fairly useless interfaces without something to run them. The task scheduler does this, and its basic interface is:
class TaskScheduler { public: void AddTaskSetToPipe( ITaskSet* pTaskSet_ ); void WaitForTask( const ITaskSet* pCompletable_ ); };
Here AddTaskSetToPipe will attempt to add the task set to a task queue, but if this is full it will run the task. The WaitForTaskSet function checks the completion of the task and if it's not complete runs a task if available, doing so in a loop until the task is complete.
A task scheduler requires a way to safely push and pull tasks from a container. Whilst a multi-reader multi-writer pipe (or queue) could be used, a neat trick when the number of threads is known is to use one multi-reader single writer pipe per thread which permits reading and writing for the owning thread from the front. This allows other threads to 'steal' tasks from the end, and push and pull tasks from the front of its own pipe. The advantage of this approach is the increased potential for recently added tasks to have their data in L1 cache - this data was just written out, so should still be there.
The lockless implementation of such a pipe is complex, but probably one of the more important parts of the code-base so worthy of detailing. Since it makes a distinct subject on its own I'll cover it in a later post. One point worthy of note here is the implementation performs no allocations in reading and writing, and so is bounded. I will thus need to handle a potentially full pipe.
This is a fairly complex function, so I'll break it down.
Since the task may have data parallelism, I need to split it into several smaller tasks which encompass the full range of the task set size. I hold this data in a simple SubTaskSet structure so it can be added to our lockless task pipe:
struct SubTaskSet { ITaskSet* pTask; TaskSetPartition partition; };
The lockless task pipe holds an array of SubTaskSet whose size is set at compile time (I may add a feature to set this at initialization time), so no allocations are required during scheduling. This pipe is one of the more complex pieces of code in the scheduler, so I'll leave its description for another blog post in the series.
First I initialize a SubTaskSet structure with the task pointer and a valid range encompassing the complete set size, and initialize the completion count.
void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet ) { SubTaskSet subTask; subTask.pTask = pTaskSet; subTask.partition.start = 0; subTask.partition.end = pTaskSet->m_SetSize; // set running count to -1 to guarantee it won't be found complete // until all subtasks added. pTaskSet->m_RunningCount = -1;
Note that I set the running count to -1 to ensure completion of the task only achieved when all sub-tasks are added. I could also calculate the number of tasks to run and set this value here, but this adds extra math in the case of only one sub-task which this approach avoids.
Next, I calculate the range for the sub-tasks I want to run. Choosing the right 'grain size' or range of values a sub-task should encompass is difficult, but I've found a decent ad-hoc approach is to use a constant number of partitions equal to the number of threads multiplied by the number of threads minus one. If an equal range takes equal time to calculate, this will result in all sub-tasks finishing at about the same time when all threads are available or one thread is busy (for example as in many games where the main thread does significant rendering work due to API restrictions).
uint32_t rangeToRun = subTask.pTask->m_SetSize / m_NumPartitions; if( rangeToRun == 0 ) { rangeToRun = 1; } uint32_t rangeLeft = subTask.partition.end - subTask.partition.start;
Now I loop, hiving off a sub-task and adding it to our pipe, or running it:
int32_t numAdded = 0; while( rangeLeft ) { if( rangeToRun > rangeLeft ) { rangeToRun = rangeLeft; } subTask.partition.start = pTaskSet->m_SetSize - rangeLeft; subTask.partition.end = subTask.partition.start + rangeToRun; rangeLeft -= rangeToRun; // add the partition to the pipe ++numAdded; if( !m_pPipesPerThread[ gtl_threadNum ].WriterTryWriteFront( subTask ) ) { subTask.pTask->ExecuteRange( subTask.partition, gtl_threadNum ); --numAdded; } }
Here you can see a task range being hived off, a count of the number added to the pipe being incremented (non-atomically as it's a local variable), and then I try to add the sub-task to the pipe. If this fails (due to it being full) I run the task and decrease the count.
// increment running count by number added plus one to account // for start value. AtomicAdd( &pTaskSet->m_RunningCount, numAdded + 1 ); if( m_NumThreadsActive < m_NumThreadsRunning ) { EventSignal( m_NewTaskEvent ); } }
Finally I increment the running count by the number added to the pipe plus one, and then if I have threads waiting I set a signal to wake them up. The increment of task numbers may occur here after some tasks have completed and decremented the completion count, but since I'm using atomic addition this guarantees m_RunningCount is never zero until all sub-tasks are done.
Anyone familiar with lockless threaded programming will see a few potential issues with the above code.
The first is that I didn't put a memory barrier after initializing the running count, so it could be moved until after the tasks are added leading to potential errors in checking completion. I'm fairly certain this is not required here as there is a memory barrier in the pipe WriterTryWriteFront function.
Secondly the event to signal a new task is not issued atomically with checking the number of threads active. A rare situation can thus occur where a sleeping thread is not awoken - however this leads to a significant performance hit and the next task to be issued would wake the threads up anyhow. I'm still exploring potential solutions to this which will work well cross platform (for Windows condition variables are a good combination here, but they are not available prior to Vista), for now if you use enkiTS to issue a small number of data parallel tasks there is an extremely small chance you may not get all threads active.
This is a relatively simple function:
void TaskScheduler::WaitforTaskSet( const ITaskSet* pTaskSet ) { uint32_t hintPipeToCheck_io = gtl_threadNum + 1; // does not need to be clamped. if( pTaskSet ) { while( pTaskSet->m_RunningCount ) { TryRunTask( gtl_threadNum, hintPipeToCheck_io ); // should add a spin then wait for task completion event. } } else { TryRunTask( gtl_threadNum, hintPipeToCheck_io ); } }
The variable hintPipeToCheck_io is used to hint at which thread's pipe I should check next when trying to run a task. The variable gtl_threadNum is a global thread local store of the current thread number.
The function TryRunTask takes the current thread number by value along with the pipe to check hint by reference. If a task was run it returns the thread pipe which the task came from in the hint variable, thus ensuring I go back to a potentially hot pipe next time around.
One of the paths in the function handles the case of NULL==pTaskSet by simply running any available task, which can be useful - but not great API design so I'll likely move this overload to a new function (a breaking API change so one requiring care).
The comment about spin and wait is for a nice to have feature, but one which requires a more complex event system to ensure I don't deadlock. The trade off in additional code and performance overhead versus potential power and scheduling benefits isn't as clear here as it is for having the tasking threads wait when there are no tasks available.
So that might all seem super simple, but as always the devil is in the details I've left out. If you'd like to know more check out the following post:
Feedback on which of these to tackle first would be great, since I'm unlikely to cover them all soon. Additionally, I'd love any corrections or other feedback - lockless multithreading is hard and subtle errors are easy to make.
[Edit 23 Aug 2015 - For those interested there's some discussion of this article on Hacker News and Reddit /r/cpp.]
[Edit 24 Aug 2015 - Fixed some inconsistencies in naming, thanks @gpakosz.]
[Edit 27 Aug 2015 - Fixed inconsistency in naming, thanks @ngaloppo. Added links to code on GitHub.]
[Edit 28 Jun 2019 - Updated to reflect the v1.0 API better.]