Building task workflows

We can build unlimited workflows with only three operators:

  • CHAIN “>” operator. Chain of tasks. Tasks will be executed one by one.
  • ACHAIN “>>” operator. Argumented chain of tasks. Tasks will be executed one by one, and next one will have result from previous task as first argument.
  • GROUP “|” operator. Parallel tasks execution.

Define functions:

@task(bind=True)
def workflow(self):
    """

    Args:
        self(gromozeka.primitives.Task):

    Returns:

    """
    (((print_something('hello') >
       ((reduce(lambda x, y: x | y, [get_something_from_api(name) for name in ('boo', 'gaa', 'goo')])) >>
        print_something()))) >
     print_something('good by')).apply_async()


@task(bind=True)
def print_something(self, word):
    """

    Args:
        self(gromozeka.primitives.Task):
        word(str): Word to print
     """
    self.logger.info(word)


@task(bind=True)
def get_something_from_api(self, api_name):
    return api_name

First print_something('hello') will execute:

Then in workflow function we can see:

reduce(lambda x, y: x | y, [get_something_from_api(name) for name in ('boo', 'gaa', 'goo')])

This operation makes 3 tasks: get_something_from_api('boo'),get_something_from_api('gaa'),get_something_from_api('goo') and will execute them in parallel and collect results in a group [‘boo’, ‘gaa’, ‘goo’]

Then print_something() will be executed with argument from previous group result: and output from this function will be: ['boo', 'gaa', 'goo']

And last operation will be: print_something('good by')