Tasks

A task defines the behavior or a workflow.

A task can be considered as a simple transaction that changes state of a workflow. There are two types of tasks, human and machine tasks.

Human

Human tasks are represented by a Django View.

A user can change the workflows state via a Django form or a JSON API. Anything you can do in a view you can do in a human task. They only difference to machine tasks is that they require some kind of interaction.

You can use view mixins like the PermissionRequiredMixin or LoginRequiredMixin to create your own tasks that are only available to certain users.

Generic human tasks

Set of reusable human tasks.

class joeflow.tasks.human.StartView(**kwargs)[source]

Start a new workflow by a human with a view.

Starting a workflow with a view allows users to provide initial data.

Similar to Django’s CreateView but does not only create the workflow but also completes a tasks.

class joeflow.tasks.human.UpdateView(**kwargs)[source]

Modify the workflow state and complete a human task.

Similar to Django’s UpdateView but does not only update the workflow but also completes a tasks.

Machine

Machine tasks are represented by simple methods on the Workflow class.

They can change the state and perform any action you can think of. They can decide which task to execute next (exclusive gateway) but also start or wait for multiple other tasks (split/join gateways).

Furthermore tasks can implement things like sending emails or fetching data from an 3rd party API. All tasks are executed asynchronously to avoid blocking IO and locked to prevent raise conditions.

Return values

Machine tasks have three different allowed return values all of which will cause the workflow to behave differently:

None:
If a task returns None or anything at all the workflow will just proceed as planed and follow all outgoing edges and execute the next tasks.
Iterable:

A task can return also an explicit list of tasks that should be executed next. This can be used to create exclusive gateways:

from django.utils import timezone
from joeflow.workflows import Workflow
from joeflow import tasks


class ExclusiveWorkflow(Workflow):
    start = tasks.Start()

    def is_workday(self):
        if timezone.localtime().weekday() < 5:
            return [self.work]
        else:
            return [self.chill]

    def work(self):
        # pass time at the water cooler
        pass

    def chill(self):
        # enjoy life
        pass

    edges = (
        (start, is_workday),
        (is_workday, work),
        (is_workday, chill),
    )

A task can also return am empty list. This will cause the workflow branch to come to a halt and no further stats will be started.

Warning

A task can not be a generator (yield results).

False:

A task can also return a boolean. Should a task return False the workflow will wait until the condition changes to True (or anything but False):

from joeflow import tasks
form joeflow.workflows import Workflow
from django.utils import timezone


class WaitingWorkflow(Workflow):
    start = tasks.Start()

    def wait_for_weekend(self):
        return timezone.now().weekday() >= 5

    def go_home(self):
        # enjoy life
        pass

    edges = (
        (start, wait_for_weekend),
        (wait_for_weekend, go_home),
    )

Exceptions

Should a task raise an exception the tasks will change it status to failed. The exception that caused the task to fail will be recorded on the task itself and further propagated. You can find and rerun failed tasks form Django’s admin interface.

Generic machine tasks

Set of reusable machine tasks.

class joeflow.tasks.machine.Start[source]

Start a new function via a callable.

Creates a new workflow instance and executes a start task. The start task does not do anything beyond creating the workflow.

Sample:

from django.db import models
from joeflow.models import Workflow
from joeflow import tasks


class StartWorkflow(Workflow):
    a_text_field = models.TextField()

    start = tasks.Start()

    def end(self):
        pass

    edges = (
        (start, end),
    )

workflow = StartWorkflow.start(a_text_field="initial data")
class joeflow.tasks.machine.Join(*parents)[source]

Wait for all parent tasks to complete before continuing the workflow.

Parameters:*parents (str) – List of parent task names to wait for.

Sample:

from django.db import models
from joeflow.models import Workflow
from joeflow import tasks


class SplitJoinWorkflow(Workflow):
    parallel_task_value = models.PositiveIntegerField(default=0)

    start = tasks.Start()

    def split(self):
        return [self.batman, self.robin]

    def batman(self):
        self.parallel_task_value += 1
        self.save(update_fields=['parallel_task_value'])

    def robin(self):
        self.parallel_task_value += 1
        self.save(update_fields=['parallel_task_value'])

    join = tasks.Join('batman', 'robin')

    edges = (
        (start, split),
        (split, batman),
        (split, robin),
        (batman, join),
        (robin, join),
    )
class joeflow.tasks.machine.Wait(duration: datetime.timedelta)[source]

Wait for a certain amount of time and then continue with the next tasks.

Parameters:duration (datetime.timedelta) – Time to wait in time delta from creation of task.

Sample:

import datetime

from django.db import models
from joeflow.models import Workflow
from joeflow import tasks


class WaitWorkflow(Workflow):
    parallel_task_value = models.PositiveIntegerField(default=0)

    start = tasks.Start()

    wait = tasks.Wait(datetime.timedelta(hours=3))

    def end(self):
        pass

    edges = (
        (start, wait),
        (wait, end),
    )