Independent game developers - making Avoyd

Implementing a lightweight task scheduler

Doug Binks - 22 Aug 2015 - edited 27 Aug 2015


This is the first in a series of articles detailing the inner workings and evolution of the permissively open source multithreading task scheduler enkiTS for C and C++ (including C++ 11).

If you're writing a compute intensive programming task on consumer hardware, and you want to use as much of the systems resources as possible, then you'll need to consider multithreading on the CPU. There are a number of ways to approach this, but the current gold standard approach for developers who want both simplicity and control is a task scheduler which can handle data-parallelism (in the games industry a task is often referred to as a job). Task parallelism allows you to run different types of computation at the same time, whilst data-parallelism enables you to run the same computation over a set of data across different threads at the same time. Note that I won't consider Single Instruction Multiple Data (SIMD) parallelism here, but if you're doing computations you probably should. Additionally, I won't cover multithreading methods for handling large latency, such as waiting on hard disk or socket transactions.

Avoyd being profiled using the enki Task Scheduler microprofile. enkiTS in our game, Avoyd, being profiled using the microprofile and ImGui integration available in enkiTSExamples.

Introducing enkiTS

In February, I extracted the tasking system I implemented for Avoyd from its dependencies on my own code-base, and made it open source under a permissive zlib license - calling it enkiTS (enkisoftware's Task Scheduler). I'd written my own task scheduler because I wanted something lightweight with a simple API. If neither of these is a concern for you, then do check out Intel's Threaded Building Blocks as an excellent solution.

Why simple & lightweight?

  • Simple API - easy to use, doesn't require reading through reams of documentation, examples or header files to figure out how you're supposed to achieve something with the library.

  • Lightweight - easy to integrate, understand, modify and port.

Any piece of software which has seen real world use eventually accumulates extra features, system workarounds and legacy API handling sometimes referred to as technical debt but really just technical grease. For example the C++ 11 version of enkiTS is lighter weight since it doesn't need an OS layer for atomics and threading (however it's currently slower), and the [UserThread version](https://github.com/dougbinks/enkiTS/tree/UserThread) decreases the API simplicity by adding an interface to handle being run on another task/threading system. The goal is to keep these in check as much as possible.

The other goals for enkiTS are:

  • Fast, then scalable - any approach to multithreading sacrifices some single threaded efficiency to gain scalability on a parallel architecture. enkiTS is designed for consumer systems so concentrates on relatively low numbers of threads whilst remaining as scalable as possible.

  • Braided parallelism - the task scheduler can issue tasks from another task as well as from the thread which created the Task System.

  • Zero allocations during scheduling - the task scheduler is up-front allocation friendly, with all task and scheduler allocations able to be made during initialization.

The task interface

The fundamental minimum requirements for a task are a function pointer and the address of a running flag which can be used to find out if the task has finished - the task scheduler sets the flag to 1 when the task is started, and 0 when it's finished. Wrapping these up in a simple structure simplifies the interface and using C++ single inheritance makes development easier. A virtual function interface adds only a very slight overhead to a function pointer but makes usage easier (one extra indirection and hence a potential cache miss but against the scheduler overheads this is very small).

This task interface would look like:

struct ITask
{
    virtual void Execute() = 0;
    volatile int32_t m_RunningFlag;
};

Here the volatile lets the compiler know it should not optimize the read outside loops as other threads may change the value. Handily 32bit reads and writes are atomic on all target systems (I use a struct rather than a class as the default access is public). The C++ 11 branch uses the new std::atomic template:

struct ITask
{
    virtual void Execute() = 0;
    std::atomic<int32_t> m_RunningFlag;
};

For data-parallel tasks I need to specify the size of the data set. Whilst this could be multi-dimensional, a single size value is enough to enable the task scheduler to divide up the data set. Our task function now needs to know where it should start processing, and where it should end, and the running flag is now a count. Thus I have:

struct TaskSetPartition
{
    uint32_t start;
    uint32_t end;
};

struct ITaskSet
{
    virtual void Execute( TaskSetPartition range ) = 0;
    uint32_t m_SetSize;

    volatile m_RunningCount;
};

For example if I wanted to square the values in an input array and output into an output array I could write:

struct ArraySquareTask: ITaskSet
{
    float m_In[1024];
    float m_Out[1024];
    ArraySquareTask()
    {
        m_SetSize = 1024; // can be set via ctor in enkiTS ITaskSet
    }

    virtual void Execute( TaskSetPartition range )
    {
        for( i = range.start; i < range.end; ++i )
        {
            m_Out[i] = m_In[i] * m_In[i];
        }
    }
};

With this interface I can still write non-data parallel tasks by ignoring the set size and range:

struct ArraySquareTask: ITaskSet
{
    virtual void Execute( TaskSetPartition range )
    {
        DoSomethingComplex();
    }
};

More complex code may require synchronization methods to output data, and in some cases these can be avoided if I know which thread I am running on and create an array of output structures the size of the number of threads the scheduler is using. So our final interface for the taskset becomes (with the constructors I left out earlier):

struct ITaskSet
{
    ITaskSet()
        : m_SetSize(1)
        , m_RunningCount(0)
    {}

    ITaskSet( uint32_t setSize_ )
        : m_SetSize( setSize_ )
        , m_RunningCount(0)
    {}

    virtual void Execute( TaskSetPartition range, uint32_t threadnum ) = 0;

    bool GetIsComplete()
    {
        return 0 == m_RunningCount;
    }

private:
    volatile m_RunningCount;
};

View this code on GitHub.

This interface is called ITaskSet as it's the interface to a task which may have a set of values to iterate over. I'll use the term task to refer to it.

The basic scheduler interface

Tasks are fairly useless interfaces without something to run them. The task scheduler does this, and its basic interface is:

class TaskScheduler
{
public:
    void AddTaskSetToPipe( ITaskSet* pTaskSet );
    void WaitForTaskSet( ITaskSet* pTaskSet );
};

View this code on GitHub.

Here AddTaskSetToPipe will attempt to add the task set to a task queue, but if this is full it will run the task. The WaitForTaskSet function checks the completion of the task and if it's not complete runs a task if available, doing so in a loop until the task is complete.

Task Pipes

A task scheduler requires a way to safely push and pull tasks from a container. Whilst a multi-reader multi-writer pipe (or queue) could be used, a neat trick when the number of threads is known is to use one multi-reader single writer pipe per thread which permits reading and writing for the owning thread from the front. This allows other threads to 'steal' tasks from the end, and push and pull tasks from the front of its own pipe. The advantage of this approach is the increased potential for recently added tasks to have their data in L1 cache - this data was just written out, so should still be there.

The lockless implementation of such a pipe is complex, but probably one of the more important parts of the code-base so worthy of detailing. Since it makes a distinct subject on its own I'll cover it in a later post. One point worthy of note here is the implementation performs no allocations in reading and writing, and so is bounded. I will thus need to handle a potentially full pipe.

AddTaskSetToPipe

This is a fairly complex function, so I'll break it down.

Since the task may have data parallelism, I need to split it into several smaller tasks which encompass the full range of the task set size. I hold this data in a simple SubTaskSet structure so it can be added to our lockless task pipe:

struct SubTaskSet
{
    ITaskSet* pTask;
    TaskSetPartition partition;
};

View this code on GitHub.

The lockless task pipe holds an array of SubTaskSet whose size is set at compile time (I may add a feature to set this at initialization time), so no allocations are required during scheduling. This pipe is one of the more complex pieces of code in the scheduler, so I'll leave its description for another blog post in the series.

First I initialize a SubTaskSet structure with the task pointer and a valid range encompassing the complete set size, and initialize the completion count.

void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet )
{
    SubTaskSet subTask;
    subTask.pTask = pTaskSet;
    subTask.partition.start = 0;
    subTask.partition.end = pTaskSet->m_SetSize;

    // set running count to -1 to guarantee it won't be found complete
    // until all subtasks added.
    pTaskSet->m_RunningCount = -1;

View this code on GitHub.

Note that I set the running count to -1 to ensure completion of the task only achieved when all sub-tasks are added. I could also calculate the number of tasks to run and set this value here, but this adds extra math in the case of only one sub-task which this approach avoids.

Next, I calculate the range for the sub-tasks I want to run. Choosing the right 'grain size' or range of values a sub-task should encompass is difficult, but I've found a decent ad-hoc approach is to use a constant number of partitions equal to the number of threads multiplied by the number of threads minus one. If an equal range takes equal time to calculate, this will result in all sub-tasks finishing at about the same time when all threads are available or one thread is busy (for example as in many games where the main thread does significant rendering work due to API restrictions).

    uint32_t rangeToRun = subTask.pTask->m_SetSize / m_NumPartitions;
    if( rangeToRun == 0 ) { rangeToRun = 1; }
    uint32_t rangeLeft = subTask.partition.end - subTask.partition.start;

View this code on GitHub.

Now I loop, hiving off a sub-task and adding it to our pipe, or running it:

    int32_t numAdded = 0;
    while( rangeLeft )
    {
        if( rangeToRun > rangeLeft )
        {
            rangeToRun = rangeLeft;
        }
        subTask.partition.start = pTaskSet->m_SetSize - rangeLeft;
        subTask.partition.end = subTask.partition.start + rangeToRun;
        rangeLeft -= rangeToRun;

        // add the partition to the pipe
        ++numAdded;
        if( !m_pPipesPerThread[ gtl_threadNum ].WriterTryWriteFront( subTask ) )
        {
            subTask.pTask->ExecuteRange( subTask.partition, gtl_threadNum );
            --numAdded;
        }
    }

View this code on GitHub.

Here you can see a task range being hived off, a count of the number added to the pipe being incremented (non-atomically as it's a local variable), and then I try to add the sub-task to the pipe. If this fails (due to it being full) I run the task and decrease the count.

    // increment running count by number added plus one to account
    // for start value.
    AtomicAdd( &pTaskSet->m_RunningCount, numAdded + 1 );

    if( m_NumThreadsActive < m_NumThreadsRunning )
    {
        EventSignal( m_NewTaskEvent );
    }
}

View this code on GitHub.

Finally I increment the running count by the number added to the pipe plus one, and then if I have threads waiting I set a signal to wake them up. The increment of task numbers may occur here after some tasks have completed and decremented the completion count, but since I'm using atomic addition this guarantees m_RunningCount is never zero until all sub-tasks are done.

Anyone familiar with lockless threaded programming will see a few potential issues with the above code.

  • The first is that I didn't put a memory barrier after initializing the running count, so it could be moved until after the tasks are added leading to potential errors in checking completion. I'm fairly certain this is not required here as there is a memory barrier in the pipe WriterTryWriteFront function.

  • Secondly the event to signal a new task is not issued atomically with checking the number of threads active. A rare situation can thus occur where a sleeping thread is not awoken - however this leads to a significant performance hit and the next task to be issued would wake the threads up anyhow. I'm still exploring potential solutions to this which will work well cross platform (for Windows condition variables are a good combination here, but they are not available prior to Vista), for now if you use enkiTS to issue a small number of data parallel tasks there is an extremely small chance you may not get all threads active.

WaitForTaskSet

This is a relatively simple function:

void TaskScheduler::WaitforTaskSet( const ITaskSet* pTaskSet )
{
    uint32_t hintPipeToCheck_io = gtl_threadNum + 1;
    // does not need to be clamped.
    if( pTaskSet )
    {
        while( pTaskSet->m_RunningCount )
        {
            TryRunTask( gtl_threadNum, hintPipeToCheck_io );
            // should add a spin then wait for task completion event.
        }
    }
    else
    {
        TryRunTask( gtl_threadNum, hintPipeToCheck_io );
    }
}

View this code on GitHub.

The variable hintPipeToCheck_io is used to hint at which thread's pipe I should check next when trying to run a task. The variable gtl_threadNum is a global thread local store of the current thread number.

The function TryRunTask takes the current thread number by value along with the pipe to check hint by reference. If a task was run it returns the thread pipe which the task came from in the hint variable, thus ensuring I go back to a potentially hot pipe next time around.

One of the paths in the function handles the case of NULL==pTaskSet by simply running any available task, which can be useful - but not great API design so I'll likely move this overload to a new function (a breaking API change so one requiring care).

The comment about spin and wait is for a nice to have feature, but one which requires a more complex event system to ensure I don't deadlock. The trade off in additional code and performance overhead versus potential power and scheduling benefits isn't as clear here as it is for having the tasking threads wait when there are no tasks available.

Is that it?

So that might all seem super simple, but as always the devil is in the details I've left out. I'd like to cover the following in future posts:

  1. Scheduler internals: TryRunTask, the task threading function, and the new task event.

  2. The lockless multi read single writer pipe.

  3. The C++ 11 implementation.

  4. The C API - and how it can be improved.

  5. A task system to bind them all - using enkiTS with other tasking / threading systems (the UserThread branch).

  6. Using enkiTS - deep dive into examples, plus a few new ones.

  7. Performance measurements - turn off some cores, fix the frequencies and fire up some benchmarking!

  8. Future improvements:

    • do I need to be able to control the grain size?

    • would a task dependency list be of benefit versus the current wait approach?

Feedback on which of these to tackle first would be great, since I'm unlikely to cover them all soon. Additionally, I'd love any corrections or other feedback - lockless multithreading is hard and subtle errors are easy to make. Hit me up on Twitter @dougbinks or in the comments below!

[Edit 23 Aug 2015 - For those interested there's some discussion of this article on Hacker News and Reddit /r/cpp.]
[Edit 24 Aug 2015 - Fixed some inconsistencies in naming, thanks @gpakosz.]
[Edit 27 Aug 2015 - Fixed inconsistency in naming, thanks @ngaloppo. Added links code on GitHub.]


comments powered by Disqus
 › 2017
 › Avoyd Editor Prototype
 › 2016
 › Black triangles and Peter Highspot
 › Colour palettes and lighting
 › Concept art by Rebecca Michalak
 › 2015
 › Internals of a lightweight task scheduler
 ›› Implementing a lightweight task scheduler 
 › Feral Vector
 › Normal generation in the pixel shader
 › 2014
 › Python Google App Engine debugging with PyCharm CE
 › Lighting voxel octrees and procedural texturing
 › Patterns and spheres
 › Python Google App Engine debugging with PyTools
 › Interview
 › Domain masking using Google App Engine
 › Octree streaming - part 4
 › Black triangles and nervous_testpilot
 › Presskit for Google App Engine
 › Octree streaming - part 3
 › Octree streaming - part 2
 › Octree streaming
 › 2013
 › LAN discovery with multiple adapters
 › Playing with material worlds
 › Developer Diary archive
 › Website redesign
 › First Person Editor
 › First Avoyd tech update video
 › Implementing a static website in Google App Engine
 › Multiplayer editing
 › First screenshots
 › Thoughts on gameplay modes
 › Back in 1999
 › 2002
 › ECTS 2002
 › Avoyd Version 1.6.1 out
 › Avoyd Version 1.6 out
 › 2001
 › Biting the bullet
 › Avoyd version 1.5 out
 › Monday Mayhem
 › Avoyd version 1.5 alpha 1 out
 › Avoyd version 1.4 out
 › ECTS 2001
 › Fun with Greek letters
 › Closer just a little closer
 › Back already
 › Artificial Humanity
 › Products and promises
 › Ecommerce
 › Explosions galore
 › Spring fixes
 › Open source and ports to other operating systems
 › Avoyd LAN Demo Version 1.1 is out
 › Thanks for the support
 › Avoyd LAN Demo Ready
 › Game Tech
 › Internals of a lightweight task scheduler
 ›› Implementing a lightweight task scheduler 
 › Normal generation in the pixel shader
 › Lighting voxel octrees and procedural texturing
 › Octree streaming - part 4
 › Octree streaming - part 3
 › Octree streaming - part 2
 › Octree streaming
 › LAN discovery with multiple adapters
 › enkiTS
 › Internals of a lightweight task scheduler
 ›› Implementing a lightweight task scheduler 
 › Web Tech
 › Python Google App Engine debugging with PyCharm CE
 › Python Google App Engine debugging with PyTools
 › Domain masking using Google App Engine
 › Presskit for Google App Engine
 › Implementing a static website in Google App Engine
 › Avoyd
 › Avoyd Editor Prototype
 › Black triangles and Peter Highspot
 › Colour palettes and lighting
 › Concept art by Rebecca Michalak
 › Feral Vector
 › Patterns and spheres
 › Interview
 › Black triangles and nervous_testpilot
 › Playing with material worlds
 › Website redesign
 › First Person Editor
 › First Avoyd tech update video
 › Multiplayer editing
 › First screenshots
 › Thoughts on gameplay modes
 › Back in 1999
 › Avoyd 1999
 › Developer Diary archive
 › Back in 1999
 › ECTS 2002
 › Avoyd Version 1.6.1 out
 › Avoyd Version 1.6 out
 › Biting the bullet
 › Avoyd version 1.5 out
 › Monday Mayhem
 › Avoyd version 1.5 alpha 1 out
 › Avoyd version 1.4 out
 › ECTS 2001
 › Fun with Greek letters
 › Closer just a little closer
 › Back already
 › Artificial Humanity
 › Products and promises
 › Ecommerce
 › Explosions galore
 › Spring fixes
 › Open source and ports to other operating systems
 › Avoyd LAN Demo Version 1.1 is out
 › Thanks for the support
 › Avoyd LAN Demo Ready