Building task workflows¶
We can build unlimited workflows with only three operators:
- CHAIN “>” operator. Chain of tasks. Tasks will be executed one by one.
- ACHAIN “>>” operator. Argumented chain of tasks. Tasks will be executed one by one, and next one will have result from previous task as first argument.
- GROUP “|” operator. Parallel tasks execution.
Define functions:
@task(bind=True)
def workflow(self):
"""
Args:
self(gromozeka.primitives.Task):
Returns:
"""
(((print_something('hello') >
((reduce(lambda x, y: x | y, [get_something_from_api(name) for name in ('boo', 'gaa', 'goo')])) >>
print_something()))) >
print_something('good by')).apply_async()
@task(bind=True)
def print_something(self, word):
"""
Args:
self(gromozeka.primitives.Task):
word(str): Word to print
"""
self.logger.info(word)
@task(bind=True)
def get_something_from_api(self, api_name):
return api_name
First print_something('hello')
will execute:
Then in workflow function we can see:
reduce(lambda x, y: x | y, [get_something_from_api(name) for name in ('boo', 'gaa', 'goo')])
This operation makes 3 tasks: get_something_from_api('boo'),get_something_from_api('gaa'),get_something_from_api('goo')
and will execute them in parallel and collect results in a group [‘boo’, ‘gaa’, ‘goo’]
Then print_something()
will be executed with argument from previous group result:
and output from this function will be: ['boo', 'gaa', 'goo']
And last operation will be: print_something('good by')