Async Task Graph

Execution of Asynchronous Tasks may be required for executing SQL or any other data queries including HTTP queries to fetch external data. udho does not including a database library with itself. However any asynchronous database library using boost asio may benifit from an async activity framework. udho::activities provides an way to describe a task graph that gather data into an heterogenous collector. A task may be associated with its dependencies of other tasks and task related data.

Collector

Different activities using different success and failure data will be involved in the subtask graph. A subtask or even an activity may need to use the data fetched by its dependencies in order to execute its action. Hence a heterogenous collector collects the data submited by the success or failure methods, which can be accessed through an accessor. In the following example data is a collector that collects success or failure data of three activities A1, A2, and A3 respectively. Usually all activities involved in the subtask graph should be passed as a template parameter of collect

auto data = udho::activities::collect<A1, A2, A3>(ctx, "Any string name for the collector");

However under many circumstances the entire collector is not required to be accessed. Instead a subset of what has been collected are accessed. So an accessor<...> is created from that collector.

udho::activities::accessor<A1, A2> access(data);
if(!access.failed<A1>()){
    A1SData pre = access.success<A1>();
}

The collector has to be passed to the constructor of all activities.

Activity

An activity is the definition of the job that has to be performed. A subtask is an instantiation of an activity. Definition of an activity consists of two types SuccessDataT, FailureDataT and a no argument function call operarator operator()() overload. A subtask on the other hand instantiates an activity by specifying its data and execution dependencies thus establishing an order of invocation. The philosophy is that the activity is reusable while the subtask reuses that to model different use cases. The activity is invoked through its operator()() overload after all its prior activities have completed (dependencies are specified by the subtask). The operator()() must call either success() or failure() methods either synchronously or asynchronously with SuccessDataT or FailureDataT in order to signal its completion. An activity A subclasses from udho::activity<A, SuccessA, FailureA> which defines the success() and failure() methods.

In the following example the success and failure data for activity A1 are A1SData and A1FData respectively. A1 starts a timer for 5 seconds. Once the timer finishes the value 42 is set as the success value and the activity finishes.

struct A1SData{
    int value;
};

struct A1FData{
    int reason;
};

struct A1: udho::activities::activity<A1, A1SData, A1FData>{
    typedef udho::activities::activity<A1, A1SData, A1FData> base;

    boost::asio::deadline_timer _timer;

    template <typename CollectorT>
    A1(CollectorT c, boost::asio::io_context& io): base(c), _timer(io){}

    void operator()(){
        _timer.expires_from_now(boost::posix_time::seconds(5));
        _timer.async_wait(boost::bind(&A1::finished, self(), boost::asio::placeholders::error));
    }

    void finished(const boost::system::error_code& e){
        A1SData data;
        data.value = 42; // Fetched 42 from somewhere
        success(data); // successful
    }
};

Subtask

Activities serves the purpose of defining an action. however to action is supposed to fit in some use case depending on the application needs. The application needs may require a specific order of invocation of these activities. So that sufficient data is collected before some activity is invoked.

digraph {
  A1 -> A2;
  A1 -> A3;
  A2 -> A4;
  A3 -> A4;
}

In the above mentioned task graph, Both A2 and A3 depends on A1. Hence A2 and A3 may start as soon as A1 completes. A4 depends on both A2 and A3 and cannot start until both A2 and A3 completes. In this situation A2 and A3 can access the data fetched by A1 activity because that has already been completed (succeded or failed) before A2 or A3 is invoked. Similarly A4 can access the data collected by A1, A2, A3 that has completed before A4 has been invoked.

usho::activities::perform describes the above subtask graph as shown in the following code block.

auto data = udho::activities::collect<A1, A2i, A3i>(ctx, "A");

auto t1 = udho::activities::perform<A1>::with(data, io);
auto t2 = udho::activities::perform<A2>::require<A1>::with(data, io).after(t1);
auto t3 = udho::activities::perform<A3>::require<A1>::with(data, io).after(t1);
auto t4 = udho::activities::perform<A4>::require<A2, A3>::with(data, io).after(t2).after(t3);

The above statements using perform are equivalent to the following.

auto data = udho::activities::collect<A1, A2i, A3i>(ctx, "A");

auto t1 = udho::activities::subtask<A1>::with(data, io);
auto t2 = udho::activities::subtask<A2, A1>::with(data, io).after(t1);
auto t3 = udho::activities::subtask<A3, A1>::with(data, io).after(t1);
auto t4 = udho::activities::subtask<A4, A2, A3>::with(data, io).after(t2).after(t3);

With this arrangement Success/Failure data of A1 is available to A2 and A3 before it starts execution. To access that data the constructors on A2 and A3 may create an accessor as shown below.

struct A2: udho::activities::activity<A2, A2SData, A2FData>{
    typedef udho::activities::activity<A2, A2SData, A2FData> base;

    boost::asio::deadline_timer _timer;
    udho::activities::accessor<A1> _accessor;       // An accessor to access success/failure data of A1

    template <typename CollectorT>
    A2(CollectorT c, boost::asio::io_context& io): base(c), _timer(io), _accessor(c){}

    void operator()(){
        _timer.expires_from_now(boost::posix_time::seconds(10));
        _timer.async_wait(boost::bind(&A2::finished, self(), boost::asio::placeholders::error));
    }

    void finished(const boost::system::error_code& err){
        if(!err && !_accessor.failed<A1>()){        // check whether A1 has failed or not
            A1SData pre = _accessor.success<A1>();  // access the success data collected from A1
            A2SData data;
            data.value = pre.value + 2;             // creates its own success data
            success(data);                          // signal completion with success
        }
    }
};

However in the above mentioned implementation A2 always depends on A1. In an use case where A2 depends on some other activity or even executed independently won’t be feasible because A2 will still have an accessor to A1 and it will try to extract the success data of A1 irrespective of the graph. On the other hand, A2 may still need some data that has to be set in order to perform the activity. that data may be derived from A1 or from some other activity or through some constructor argument.

struct A2i: udho::activities::activity<A2i, A2SData, A2FData>{
    typedef udho::activities::activity<A2i, A2SData, A2FData> base;

    int prevalue;
    boost::asio::deadline_timer _timer;

    template <typename CollectorT>
    A2i(CollectorT c, boost::asio::io_context& io, int p): base(c), prevalue(p), _timer(io){}

    template <typename CollectorT>
    A2i(CollectorT c, boost::asio::io_context& io): base(c), _timer(io){}

    void operator()(){
        _timer.expires_from_now(boost::posix_time::seconds(10));
        _timer.async_wait(boost::bind(&A2i::finished, self(), boost::asio::placeholders::error));
    }

    void finished(const boost::system::error_code& err){
        if(!err){
            A2SData data;
            data.value = prevalue + 2;
            success(data);
        }
    }
};

In the above example prevalue is that piece of information that is required by A2i in order to perform its activity. It doesn’t care who provides that information, whether its A1, or some other activity. A2i has a constructor that takes that prevalue as an argument which can be used when prevalue is known at the time of construction, or when A2i is the starting activity in the graph.

auto t2 = udho::activities::perform<A2i>::require<A1>::with(data, io).after(t1).prepare([data](A2i& a2i){
    udho::activities::accessor<A1> a1_access(data);
    A1SData pre = a1_access.success<A1>();
    a2i.prevalue = pre.value;
});

In the above code block that activity A2 is prepared through its reference passed as the argument right after all its dependencies (only A1 in this case) completes by the callback provided in the prepare function. The collected data is captured inside the lambda function from which the A1 success data is accessed.

Final

Finally we would like do something once the entire subtask graph has completed in a similar fasion. In the following example we do not perform A4. Instead we execute the following piece of code once both A2i and A3i completes.

udho::activities::require<A2i, A3i>::with(data).exec([ctx, name](const udho::activities::accessor<A1, A2i, A3i>& d) mutable{
    std::cout << "Final begin" << std::endl;

    int sum = 0;

    if(!d.failed<A2i>()){
        A2SData pre = d.success<A2i>();
        sum += pre.value;
        std::cout << "A2i " << pre.value << std::endl;
    }

    if(!d.failed<A3i>()){
        A3SData pre = d.success<A3i>();
        sum += pre.value;
        std::cout << "A3i " << pre.value << std::endl;
    }

    ctx.respond(boost::lexical_cast<std::string>(sum), "text/plain");

    std::cout << "Final end" << std::endl;
}).after(t2).after(t3);

Once everything is set up we start the initial task t1()

t1();

Note

By default if one subtask fails then all subtasks that depend on it will be cancelled and the final callback will be called immediately. However the sibling subtasks of the failing subtask will continue executing. To change this behavior use required(false) on the subtask.

auto t1 = udho::activities::perform<A1>::with(data, io).required(false);

In the above example all other subtasks will execute even if t1 fails.

Note

The following shorthands may be used for conveniance. udho::collect udho::accessor udho::activity udho::perform udho::require

Example

API

Data

template<typename ContextT, typename DatasetT>
struct collector
template<typename ...T>
struct udho::activities::accessor : public udho::activities::fixed_key_accessor<udho::cache::shadow<std::string, T::result_type...>>

Access a subset of data from the collector

Public Functions

template<typename V>
bool exists() const

Whether there exists any data for activity V

Template Parameters
  • V: Activity Type

template<typename V>
const V::result_type &get() const

get data associated with activity V

Template Parameters
  • V: activity type

template<typename V>
bool completed() const

Check whether activity V has completed.

Template Parameters
  • V: activity type

template<typename V>
bool canceled() const

Check whether activity V has been canceled.

Template Parameters
  • V: activity type

template<typename V>
bool failed() const

Check whether activity V has failed (only the failure data of V is valid).

Template Parameters
  • V: activity type

template<typename V>
bool okay() const

Check whether activity V is okay.

Template Parameters
  • V: activity type

template<typename V>
V::result_type::success_type success() const

get success data for activity V

Template Parameters
  • V: activity type

template<typename V>
V::result_type::failure_type failure() const

get failure data for activity V

Template Parameters
  • V: activity type

template<typename V, typename F>
void apply(F f) const

Apply a callback on result of V

Template Parameters
  • V: activity type

Parameters
  • f: callback

Activity

template<typename DerivedT, typename SuccessDataT = void, typename FailureDataT = void>
struct udho::activities::activity : public std::enable_shared_from_this<DerivedT>, public udho::activities::result<SuccessDataT, FailureDataT>

An activity A must subclass from activity<A, SuccessA, FailureA> assuming SuccessA and FailureA are the types that contains the relevant information regarding its success or failure. The activity A must overload a no argument operator()() which initiates the activity. After the activity is initiated either success() or failure() methods must be called in order to signal its completion. The activity A must take the collector as the first argument to its constructor, which is passed to the base class activity<A, SuccessA, FailureA>. Hence its prefered to take the first parameter to the constructor as template parameter.

Template Parameters
  • DerivedT: Activity Class

  • SuccessDataT: data associated to the activity if the activity succeeds

  • FailureDataT: data associated to the activity if the activity fails

Public Functions

derived_ptr_type self()

shared_ptr to this

template<typename SuccessT, typename FailureT>
struct udho::activities::result : public udho::activities::result_data<SuccessT, FailureT>

Completion handler for an activity.

Template Parameters
  • SuccessT: success data associated with the activity

  • FailureT: failure data associated with the activity

Public Functions

template<typename StoreT>
result(StoreT &store)

Parameters
  • store: collector

template<typename CombinatorT>
void done(CombinatorT cmb)

attach another subtask as done callback which will be executed once this subtask finishes

Parameters
  • cmb: next subtask

void required(bool flag)

mark the activity as required or optional

Parameters
  • flag:

void cancel_if(cancel_if_ftor f)

Force cancelation of the activity even after it is successful to stop propagating to the next activities

Parameters
  • f: callback which should return true to signal cancelation

template<typename SuccessT, typename FailureT>
struct udho::activities::result_data

Contains Copiable Success or Failure data for an activity.

Template Parameters
  • SuccessT: success data type

  • FailureT: failure data type

Subclassed by udho::activities::result< SuccessT, FailureT >

Public Functions

bool completed() const

either success or failure data set

bool failed() const

whether the activity has failed

bool canceled() const

check whether the activity has been canceled

const success_type &success_data() const

Success data

const failure_type &failure_data() const

Failure data

template<typename CallableT>
void apply(CallableT callback)

Apply a callable to the result data which will be invoked exactly once with appropriate arguments. The invocation of the callback depends on the state as shown below.

  • Incomplete: callback()

  • Canceled: callback(const SuccessT&, const FailureT&)

  • Failed: callback(const FailureT&)

  • Successful: callback(const SuccessT&)

The callback may not have all the overloads.

Parameters
  • callback:

Subtask

template<typename ActivityT, typename ...DependenciesT>
struct udho::activities::subtask

A subtask is an instantiation of an activity. The subtask reuses an activity to model different use cases by attaching dependencies. A subtask contains two shared pointers, one to the activity and another one to the combinator. The subtask cannot be instantiated directly by calling the subtask constructor. Instead call the static with method to instantiate.

Template Parameters
  • ActivityT: The activity

  • DependenciesT: The activities that has to be performed before performing ActivityT

Public Functions

std::shared_ptr<activity_type> activity()

shared pointer to the activity

template<typename V, typename ...DependenciesV>
self_type &done(subtask<V, DependenciesV...> &next)

execute task next after the current one

Parameters
  • next: the next subtask

template<typename V, typename ...DependenciesV>
self_type &after(subtask<V, DependenciesV...> &previous)

t2.after(t1) is equivalent to t1.done(t2)

Parameters
  • previous: the previous subtask

template<typename PreparatorT>
self_type &prepare(PreparatorT prep)

attach a callback which will be called with a reference to the activity after it has been instantiated and all its dependencies have completed.

self_type &required(bool flag)

Set required flag on or off. If a required subtask fails then all intermediate subtask that depend on it fails and the final callback is called immediately. By default all subtasks are required

self_type &cancel_if(typename activity_type::cancel_if_ftor cancelor)

Force cancelation of the activity even after it is successful to stop propagating to the next activities

Parameters
  • f: callback which should return true to signal cancelation

std::shared_ptr<activity_type> operator->()

returns the shared pointer to the actiivity

template<typename FunctionT>
self_type &if_errored(FunctionT ftor)

abort if canceled if ftor returns false. f will be called with the success if it has been canceled due to error

template<typename FunctionT>
self_type &if_failed(FunctionT ftor)

abort if canceled if ftor returns false. f will be called with the failue data if it has been canceled due to failure

Public Static Functions

template<typename CollectorT, typename ...U>
self_type with(CollectorT collector, U&&... u)

Arguments for the constructor of the Activity

template<typename ActivityT>
struct udho::activities::perform

create a subtask to perform activity ActivityT

Template Parameters
  • ActivityT: the activity to perform

Public Static Functions

template<typename ...U>
subtask<ActivityT> with(U&&... u)

arguments for the activity constructor

template<typename ...DependenciesT>
struct require

mention the activities that has to be performed before executing this subtask.

Template Parameters
  • DependenciesT: dependencies

Public Static Functions

template<typename ...U>
subtask<ActivityT, DependenciesT...> with(U&&... u)

arguments for the activity constructor