Making and running tasksΒΆ
Define tasks from functions, using decorator @task:
import time
from gromozeka import BrokerPoint, task
# first example function
from gromozeka import Gromozeka, ThreadWorker, ProcessWorker
@task(bind=True)
def test_func_one(self, sleep_time, word):
"""
Args:
self(gromozeka.primitives.Task):
sleep_time(int): Time to sleep
word(str): Word to print
"""
self.logger.info('start working')
time.sleep(sleep_time)
self.logger.info('Job is done. Word is: %s' % word)
# second example function
@task(bind=True)
def test_func_second(self, sleep_time, word):
"""
Args:
self(gromozeka.primitives.Task):
sleep_time(int): Time to sleep
word(str): Word to print
"""
self.logger.info('start working')
time.sleep(sleep_time)
Define application:
app = Gromozeka().config_from_env()
Define RabbitMQ queues:
broker_point_first = BrokerPoint(exchange='first_exchange', exchange_type='direct', queue='first_queue',
routing_key='first')
broker_point_second = BrokerPoint(exchange='second_exchange', exchange_type='direct', queue='second_queue',
routing_key='second')
Register tasks in application:
# You can mix worker types (ThreadWorker,ProcessWorker)
# `ThreadWorker` is good on IO operations
# `ProcessWorker` when you have long CPU operations
app.register(task=test_func_one(), broker_point=broker_point_first, worker_class=ThreadWorker, max_workers=10)
app.register(task=test_func_second(), broker_point=broker_point_second, worker_class=ProcessWorker,
max_workers=10)
Or task can register self:
test_func_one().register(broker_point=broker_point_first, max_workers=10, worker_class=ThreadWorker)
test_func_second().register(broker_point=broker_point_second, max_workers=10, worker_class=ProcessWorker)
You can start application now:
app.start()
Add 10 tasks for each queue:
# Run tasks
for i in range(10):
test_func_one().apply_async(sleep_time=5, word="{}".format(ThreadWorker.__name__))
test_func_second().apply_async(sleep_time=5, word="{}".format(ProcessWorker.__name__))