Running tasks from outer application

We can run tasks from outer application or even other language, using task protocol. For now it’s simply json. Later other serializers will be available.

Define function:

# first example function
@task(bind=True)
def test_func_one(self, word):
    """

    Args:
        self(gromozeka.primitives.Task):
        word(str): Word to print
     """
    self.logger.info('Job is done. Word is: %s' % word)

Start application:

app = Gromozeka().config_from_env()

broker_point = BrokerPoint(exchange='first_exchange', exchange_type='direct', queue='first_queue',
                           routing_key='first')

app.register(task=test_func_one(), broker_point=broker_point)

# Start application
app.start()

Then make protobuf message:

# You can apply task from other application or other programming language, using RabbitMQ. You must copy task.proto
# from gromozeka.promitives directory
message = ProtoTask(task_id='__main__.test_func_one', kwargs=json.dumps({'word': 'hello'}))
# You can apply task with custom uuid
# message = Task(uuid='custom task_uuid', task_id='__main__.test_func_one',
#                kwargs=json.dumps({'word': 'hello'}),
#                reply_to=ProtoReplyToBrokerPoint(exchange='first_exchange', routing_key='first'))
# If you do not want to copy task.proto for some reasons you can use custom proto file and use deserializator.
# For example we have examples/custom_task.proto for example (You can find it in examples directory):

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672/%2F'))
channel = connection.channel()
channel.basic_publish(exchange='first_exchange', routing_key='first', body=message.SerializeToString())
connection.close()

You can use custom proto files with custom deserealization:

class CustomDeserializator(TaskDeserializator):
    def deserialize(self, raw_task):
        t = Task()
        t.ParseFromString(raw_task)
        # You must return task_uuid, task_id, graph_uuid, args, kwargs, retries, delay, reply_to_exchange,reply_to_routing_key
        # Required fields are: task_id.
        return None, t.task_id, None, [t.word.word], None, None, None, None, None  # args - list, kwargs - dict


message = Task(task_id='__main__.test_func_one', word=Word(word='hello'))
app = Gromozeka().config_from_env()
broker_point = BrokerPoint(exchange='first_exchange', exchange_type='direct', queue='first_queue',
                           routing_key='first')

app.register(task=test_func_one(), broker_point=broker_point, deserializator=CustomDeserializator())

# Start application
app.start()

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672/%2F'))
channel = connection.channel()
channel.basic_publish(exchange='first_exchange', routing_key='first', body=message.SerializeToString())
connection.close()

See primitives.task.proto for more information