Independent game developers - making Avoyd

Internals of a lightweight task scheduler

Doug Binks - 05 Sep 2015


This is the second in a series of articles detailing the inner workings and evolution of the permissively open source multithreading task scheduler enkiTS for C and C++ (including C++ 11). In the first article of this series I covered the external interfaces and their implementation. This post will cover the task threading function, running tasks, and waiting for tasks.

Image of Avoyd with thread and task activity. Figure 1: Screenshot of Avoyd being profiled with microprofile and ImGui integration available in enkiTSExamples. Solid bars above named tasks show when threads are active - the wait functionality allows the core to idle or other threads to run.

The task thread function

The task scheduler creates several threads, by default the number of cores minus one (to account for the main thread), each of which runs a static member function TaskScheduler::TaskingThreadFunction.

THREADFUNC_DECL TaskScheduler::TaskingThreadFunction( void* pArgs )
{
    ThreadArgs args = *(ThreadArgs*)pArgs;
    uint32_t threadNum = args.threadNum;
    TaskScheduler* pTS = args.pTaskScheduler;
    gtl_threadNum = threadNum;
    AtomicAdd( &pTS->m_NumThreadsActive, 1 );

    SafeCallback( pTS->m_ProfilerCallbacks.threadStart, threadNum );

View this code on GitHub.

The task function starts by unpacking its arguments - the thread number and a pointer to the task scheduler. The thread number is stored in gtl_threadNum is a thread local global variable. Storing the thread number in a thread local variable allows user functions to call the task scheduler without needing to pass it in, which cuts down on sources of errors and simplifies the interface.

I then atomically increment the number of threads active count and call any user provided profiler callback to let user code know a thread has been created.

    uint32_t spinCount = 0;
    uint32_t hintPipeToCheck_io = threadNum + 1;
    // does not need to be clamped.
    while( pTS->m_bRunning )
    {
        if( !pTS->TryRunTask( threadNum, hintPipeToCheck_io ) )
        {
            // no tasks, will spin then wait
            ++spinCount;
            if( spinCount > SPIN_COUNT )
            {
                pTS->WaitForTasks( threadNum );
            }
        }
        else
        {
            spinCount = 0;
        }
    }

View this code on GitHub.

The main action of the task thread function is to call TryRunTask in a loop, and if none are available call WaitForTasks. Both of these functions are described below.

I first initialize a spin count, and a hint index which will be described later. Then I loop until the task scheduler sets a running boolean variable to false - this is declared as volatile so the compiler knows not to optimize the read from memory out of the loop (an std::atomic type is used in the C++11 branch for the same purpose). Note that the exact order of the memory read with respect to others isn't critical here, as I just want to be able to exit the loop at some point.

Next I call TryRunTask. This function needs the current thread number and a hint for which pipe should be checked (as an in-out non const reference). TryRunTask will return true if it was able to find and run a task, false if not. If no task is found I increment a spin counter, otherwise I set the spin counter to zero. If I've passed the SPIN_COUNT limit without any task having run, I call WaitForTasks to prevent unneeded spinning in this loop.

    AtomicAdd( &pTS->m_NumThreadsRunning, -1 );
    SafeCallback( pTS->m_ProfilerCallbacks.threadStop, threadNum );

    return 0;
}

View this code on GitHub.

Finally I close the function by atomically decrementing the count of threads running, call the profiler callback if present, and return from the function which exits the thread.

Running Tasks with TryRunTask

The function TaskScheduler::TryRunTask contains the algorithm for finding a task and running it.

bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t& hintPipeToCheck_io_ )
{
    // check for tasks
    SubTaskSet subTask;
    bool bHaveTask = m_pPipesPerThread[ threadNum ].WriterTryReadFront( &subTask );

View this code on GitHub.

I start by first checking for subtasks from the front of the current threads task pipe. Subtasks (described in detail in AddTaskSetToPipe) are the unit of execution of the task scheduler, and they include a task pointer and a range used for data parallel processing. The front of the pipe is checked by WriterTryReadFront(), which returns whether a task has been found and initializes the subTask variable. If a subtask is found, it will be removed from the pipe.

Since subtasks are added to the front of a thread task pipe this means I first try to run a task which has been added recently. This approach is good for L1 cache coherence.

    uint32_t threadToCheck = hintPipeToCheck_io_;
    uint32_t checkCount = 0;
    while( !bHaveTask && checkCount < m_NumThreads )
    {
        threadToCheck = ( hintPipeToCheck_io_ + checkCount ) % m_NumThreads;
        if( threadToCheck != threadNum )
        {
            bHaveTask = m_pPipesPerThread[ threadToCheck ].ReaderTryReadBack( &subTask );
        }
        ++checkCount;
    }

View this code on GitHub.

Next I check other threads task pipes, looping through from the last known 'hot' pipe using the hint parameter. This hint significantly boosts the performance of the typical situation where one subtask generates a number of other subtasks. I use a loop rather than a random choice, as this is simpler.

By reading from the back of other thread pipes I leave control of the front of a pipe to the pipe's owning thread, improving performance for writing.

    if( bHaveTask )
    {
        // update hint, will preserve value unless actually got task from another thread.
        hintPipeToCheck_io_ = threadToCheck;

        // the task has already been divided up by AddTaskSetToPipe, so just run it
        subTask.pTask->ExecuteRange( subTask.partition, threadNum );
        AtomicAdd( &subTask.pTask->m_RunningCount, -1 );
    }

    return bHaveTask;

}

View this code on GitHub.

If I have a subtask, I update the pipe hint and execute the task using the subtask range then atomically decrement the running count. The atomic decrement functions used generate a full memory barrier to ensure that the count is not decremented prior to the subtask being executed. The function then exits returning whether a task was executed.

Waiting for tasks

The task waiting function uses OS synchronization functions to put the thread to sleep until a new task event is generated. The task scheduler would function without waiting, with potentially slightly higher performance in some cases, but at a high cost to both CPU power wasted and the performance of other processes and threads. Waiting can also improve performance, as the OS can potentially schedule other work when a tasking thread is idle rather than interrupting randomly.

Image of Avoyd with thread activity. Figure 2: This microprofile of Avoyd shows the same time period as Figure 1 but with only the thread status being displayed (context switch tracking on). By using OS synchronization methods to wait for a new task event the scheduler can free up cores and permit other threads to run or the core to idle.

void TaskScheduler::WaitForTasks( uint32_t threadNum )
{
    bool bHaveTasks = false;
    for( uint32_t thread = 0; thread < m_NumThreads; ++thread )
    {
        if( !m_pPipesPerThread[ thread ].IsPipeEmpty() )
        {
            bHaveTasks = true;
            break;
        }
    }
    if( !bHaveTasks )
    {
        SafeCallback( m_ProfilerCallbacks.waitStart, threadNum );
        AtomicAdd( &m_NumThreadsActive, -1 );
        EventWait( m_NewTaskEvent, EVENTWAIT_INFINITE );
        AtomicAdd( &m_NumThreadsActive, +1 );
        SafeCallback( m_ProfilerCallbacks.waitStop, threadNum );
    }
}

View this code on GitHub.

This function is simple enough to list in one go. First all the thread task pipes are checked to see if they are empty. If a pipe is non empty I exit, otherwise I decrement the active thread count and wait on the new task event. On wake up the active thread count is incremented. User profiler callbacks surround the wait (this is how I mark the wait functions in a profiler such as microprofile).

As noted in the previous post I'm not using a lock with my synchronization, which means that the pipe check and count of active threads isn't atomically set along with the event. This could result in a thread waiting when it should be awake if a new task event is fired during the function before the wait. Since the code will still function in the case of one task thread being inactive (the main thread doesn't wait, so can still perform work), this trades usual case performance for a very rare (so far not observed) potential decrease in worst case performance.

Up next

The next most important part of the task scheduler internals to discuss is the multi-reader, single writer pipe. Since this is fairly complex, both in terms of lines of code and the approach, I'll leave it for the next post. Following on from that I hope to take a look at performance and comparisons with other tasking systems, as I'll likely leave explaining the task scheduler initialization code as homework for the reader.


comments powered by Disqus
 › 2017
 › Speeding up Runtime Compiled C++ compile times in MSVC with d2cgsummary
 › Multiplayers toxic last hit kill and how to heal it
 › Avoyd Editor Prototype
 › 2016
 › Black triangles and Peter Highspot
 › Colour palettes and lighting
 › Concept art by Rebecca Michalak
 › 2015
 ›› Internals of a lightweight task scheduler 
 › Implementing a lightweight task scheduler
 › Feral Vector
 › Normal generation in the pixel shader
 › 2014
 › Python Google App Engine debugging with PyCharm CE
 › Lighting voxel octrees and procedural texturing
 › Patterns and spheres
 › Python Google App Engine debugging with PyTools
 › Interview
 › Domain masking using Google App Engine
 › Octree streaming - part 4
 › Black triangles and nervous_testpilot
 › Presskit for Google App Engine
 › Octree streaming - part 3
 › Octree streaming - part 2
 › Octree streaming
 › 2013
 › LAN discovery with multiple adapters
 › Playing with material worlds
 › Developer Diary archive
 › Website redesign
 › First Person Editor
 › First Avoyd tech update video
 › Implementing a static website in Google App Engine
 › Multiplayer editing
 › First screenshots
 › Thoughts on gameplay modes
 › Back in 1999
 › 2002
 › ECTS 2002
 › Avoyd Version 1.6.1 out
 › Avoyd Version 1.6 out
 › 2001
 › Biting the bullet
 › Avoyd version 1.5 out
 › Monday Mayhem
 › Avoyd version 1.5 alpha 1 out
 › Avoyd version 1.4 out
 › ECTS 2001
 › Fun with Greek letters
 › Closer just a little closer
 › Back already
 › Artificial Humanity
 › Products and promises
 › Ecommerce
 › Explosions galore
 › Spring fixes
 › Open source and ports to other operating systems
 › Avoyd LAN Demo Version 1.1 is out
 › Thanks for the support
 › Avoyd LAN Demo Ready
 › Game Tech
 › Speeding up Runtime Compiled C++ compile times in MSVC with d2cgsummary
 ›› Internals of a lightweight task scheduler 
 › Implementing a lightweight task scheduler
 › Normal generation in the pixel shader
 › Lighting voxel octrees and procedural texturing
 › Octree streaming - part 4
 › Octree streaming - part 3
 › Octree streaming - part 2
 › Octree streaming
 › LAN discovery with multiple adapters
 › enkiTS
 ›› Internals of a lightweight task scheduler 
 › Implementing a lightweight task scheduler
 › RCC++
 › Speeding up Runtime Compiled C++ compile times in MSVC with d2cgsummary
 › Web Tech
 › Python Google App Engine debugging with PyCharm CE
 › Python Google App Engine debugging with PyTools
 › Domain masking using Google App Engine
 › Presskit for Google App Engine
 › Implementing a static website in Google App Engine
 › Avoyd
 › Multiplayers toxic last hit kill and how to heal it
 › Avoyd Editor Prototype
 › Black triangles and Peter Highspot
 › Colour palettes and lighting
 › Concept art by Rebecca Michalak
 › Feral Vector
 › Patterns and spheres
 › Interview
 › Black triangles and nervous_testpilot
 › Playing with material worlds
 › Website redesign
 › First Person Editor
 › First Avoyd tech update video
 › Multiplayer editing
 › First screenshots
 › Thoughts on gameplay modes
 › Back in 1999
 › Avoyd 1999
 › Developer Diary archive
 › Back in 1999
 › ECTS 2002
 › Avoyd Version 1.6.1 out
 › Avoyd Version 1.6 out
 › Biting the bullet
 › Avoyd version 1.5 out
 › Monday Mayhem
 › Avoyd version 1.5 alpha 1 out
 › Avoyd version 1.4 out
 › ECTS 2001
 › Fun with Greek letters
 › Closer just a little closer
 › Back already
 › Artificial Humanity
 › Products and promises
 › Ecommerce
 › Explosions galore
 › Spring fixes
 › Open source and ports to other operating systems
 › Avoyd LAN Demo Version 1.1 is out
 › Thanks for the support
 › Avoyd LAN Demo Ready