Skip to content

SO 5.8 ByExample Work Stealing

Yauheni Akhotnikau edited this page Jul 10, 2023 · 1 revision

Note. This example is one of the biggest and complex examples in SO-5.8 distribution. For understanding of it a good knowledge of SO-5.8 Basics, SO-5.5 InDepth - Message Delivery Filters, SO-5.8 InDepth - Priorities of Agents, SO-5.8 InDepth - Dispatchers is required.

Introduction

Support of agent's priorities has been added in v.5.5.8. Three new priority-respected dispatches were introduced. They implement various priority handling policies. This example demostrates usage of two of these dispatchers -- prio_one_thread::strictly_ordered and prio_dedicated_threads::one_per_prio.

This example shows ability of prio_one_thread::strictly_ordered dispatcher to process more prioritized events before less prioritiezed ones. An ability of prio_dedicated_threads::one_pre_prio to handle events of different priorities on different threads is also shown. Those abilities are used in imitation of work for generation of image with various sizes. Image generation time is depends to size of image. To provide an appropriate throughtput generation requests are distributed between agents with different priorities. Small images are processed by agents with higher priorities, big images are processed by agents with lower priority. Because all agents work on different threads they do not disturb each another.

There is also implemented simple work stealing scheme: if there is no requests to process an agent with higher priority tries to steal work from agents with lower priority. It means that agent with priority p7 could take some work from agent with priority p6 and so on.

Because this example is long its full source code is not represented here. The full source can be found in the repository.

What Example Does

There is a request generator agent. It randomly sends request for creating images. Every request contains ID, dimension of the image to be generated and some metadata. It is supposed that time for image creation depends from image dimension -- big dimension leads to big amount of time for image creation.

Requests are processed by several agents.

First of all there are two managers. One of them (request_acceptor) receives new requests, calculates priorities for them and stores requests in queues (one queue for each priority).

Second manager (request_scheduler) schedules requests from queues to actual processor agents.

Request_acceptor and request_scheduler works on the same prio_one_thread::strictly_ordered dispatcher. Request_acceptor agent has lower priority than request_scheduler agent. It means that messages for scheduling requests to processor agents are processed before messages about new requests.

There are several processor agents: one for each priority. All processor agents are bound to prio_dedicated_threads::one_per_prio dispatcher. It means that every processor works on separate thread.

Request_scheduler implements work stealing mechanism. When a processor with priority P tells that it is free and ready for processing new image then request_scheduler tries to find next request of the same priority P. If there is no such requests then request_scheduler triest to find a request for lower priority (P-1) and so on.

Request Metadata

Scheme of work described above means that requests are not processed immediately, they can spend some time in requests queues. Also there is a possibility of handling a request by an agent with different priority. To show that details every requests has medatadata object inside. This metadata object collects information about every stage of processing: when request was accepted, when processing started, when processing finished, which priority was set to request, at which priority request was processed.

A metadata object is sent with initial request and then is returned back with positive response.

Overload Protection

There could be a situation when request_acceptor will wait its chance for taking processing time for too long. It is because request_acceptor has lower priority than request_scheduler. So it is possible that there will be too many waiting requests in request_acceptor queue.

To protect request_acceptor from overloading a message limit is used. Processing of all extra messages is rejected and negative response is returned to requests generator immediately. To do that an limit_then_transform is used:

request_acceptor(
    context_t ctx,
    so_5::mbox_t interaction_mbox,
    request_scheduling_data & data )
    :   so_5::agent_t( ctx
            // This agent has minimal priority.
            + so_5::prio::p0
            // If there are to many pending requests then
            // new requests must be rejected.
            + limit_then_transform( 10,
                [this]( const generation_request & req ) {
                    return make_transformed< generation_rejected >(
                            m_interaction_mbox,
                            req.m_id );
                    } ) )
    ,   m_interaction_mbox( std::move( interaction_mbox ) )
    ,   m_data( data )
    {}

There is another place where a message limit is used. A request processor agent can have only one pending requests in its even queue. Another instance of generation_request inside event queue is an error. To detect such kind of error a limit_then_abort is used:

class processor_t final : public so_5::agent_t
    {
    public :
        processor_t(
            context_t ctx,
            so_5::priority_t priority,
            const so_5::mbox_t & interaction_mbox )
            :   so_5::agent_t{ ctx + priority
                    + limit_then_abort< generation_request >( 1 ) }

Enumeration of Priorities

There are several places where some kind of enumeration of available priorities values are used.

The first is the calculation of priority for new request. A constant so_5::prio::total_priorities_count and a special value so_5::priority_t::p_max are used:

// Detecting priority for that request.
auto step = double{ max_dimension + 1 } / total_priorities_count;
// Requests with lowest dimensions must have highest priority.
auto pos = so_5::to_size_t( so_5::priority_t::p_max ) -
        static_cast< std::size_t >( evt->m_dimension / step );

The second is creation of request processor agents. A helper function so_5::prio::for_each_priority() is used:

so_5::introduce_child_coop(
    *this,
    so_5::disp::prio_dedicated_threads::one_per_prio::make_dispatcher(
            so_environment() ).binder(),
    [this]( so_5::coop_t & coop )
    {
        so_5::prio::for_each_priority( [&]( so_5::priority_t p ) {
                create_processor_agent( coop, p );
            } );
    } );

The third is an attempt to steal work from lower priority agent. Helper functions so_5::prio::has_prev() and so_5::prio::prev() are used (there also are so_5::prio::has_next() and so_5::prio::next() functions):

// There is no more work. Try to stole it from
// lower priority.
if( so_5::prio::has_prev( priority ) )
    {
        priority = so_5::prio::prev( priority );
        ++deep;
    }
else
    // There is no more priorities to look.
    break;

Example Results

There is a sample output from running example. It is possible to see that work stealing works. Some requests were received lower priorities but were processed by more prioritized agents.

generated {1}, dimension: 8975
generated {2}, dimension: 308
result {2}: in route: 0ms, waiting(p7): 0ms, processing(p7): 30ms
generated {3}, dimension: 2498
generated {4}, dimension: 297
result {4}: in route: 0ms, waiting(p7): 0ms, processing(p7): 29ms
generated {5}, dimension: 2678
generated {6}, dimension: 1035
generated {7}, dimension: 1430
generated {8}, dimension: 4138
result {3}: in route: 0ms, waiting(p6): 0ms, processing(p6): 249ms
generated {9}, dimension: 1981
result {6}: in route: 0ms, waiting(p7): 0ms, processing(p7): 104ms
generated {10}, dimension: 4662
generated {11}, dimension: 5228
result {5}: in route: 0ms, waiting(p5): 0ms, processing(p5): 267ms
generated {12}, dimension: 9860
result {7}: in route: 0ms, waiting(p6): 56ms, processing(p6): 143ms
generated {13}, dimension: 2118
result {9}: in route: 0ms, waiting(p6): 33ms, processing(p7): 198ms
generated {14}, dimension: 1516
generated {15}, dimension: 2863
result {8}: in route: 0ms, waiting(p4): 0ms, processing(p4): 413ms
generated {16}, dimension: 1613
result {13}: in route: 0ms, waiting(p6): 0ms, processing(p6): 211ms
generated {17}, dimension: 7612
generated {18}, dimension: 746
result {1}: in route: 0ms, waiting(p0): 0ms, processing(p0): 897ms
result {18}: in route: 0ms, waiting(p7): 0ms, processing(p7): 74ms
result {14}: in route: 0ms, waiting(p6): 130ms, processing(p6): 151ms
result {10}: in route: 0ms, waiting(p4): 35ms, processing(p5): 466ms
Clone this wiki locally