Tasks¶
A task defines the behavior or a workflow.
A task can be considered as a simple transaction that changes state of a workflow. There are two types of tasks, human and machine tasks.
Human¶
Human tasks are represented by a Django View
.
A user can change the workflows state via a Django form or a JSON API. Anything you can do in a view you can do in a human task. They only difference to machine tasks is that they require some kind of interaction.
You can use view mixins like the
PermissionRequiredMixin
or
LoginRequiredMixin
to create your own tasks that are only available to certain users.
Generic human tasks¶
Set of reusable human tasks.
-
class
joeflow.tasks.human.
StartView
(**kwargs)[source]¶ Start a new workflow by a human with a view.
Starting a workflow with a view allows users to provide initial data.
Similar to Django’s
CreateView
but does not only create the workflow but also completes a tasks.
-
class
joeflow.tasks.human.
UpdateView
(**kwargs)[source]¶ Modify the workflow state and complete a human task.
Similar to Django’s
UpdateView
but does not only update the workflow but also completes a tasks.
Machine¶
Machine tasks are represented by simple methods on the Workflow
class.
They can change the state and perform any action you can think of. They can decide which task to execute next (exclusive gateway) but also start or wait for multiple other tasks (split/join gateways).
Furthermore tasks can implement things like sending emails or fetching data from an 3rd party API. All tasks are executed asynchronously to avoid blocking IO and locked to prevent raise conditions.
Return values¶
Machine tasks have three different allowed return values all of which will cause the workflow to behave differently:
None
:- If a task returns
None
or anything at all the workflow will just proceed as planed and follow all outgoing edges and execute the next tasks. Iterable
:A task can return also an explicit list of tasks that should be executed next. This can be used to create exclusive gateways:
from django.utils import timezone from joeflow.workflows import Workflow from joeflow import tasks class ExclusiveWorkflow(Workflow): start = tasks.Start() def is_workday(self): if timezone.localtime().weekday() < 5: return [self.work] else: return [self.chill] def work(self): # pass time at the water cooler pass def chill(self): # enjoy life pass edges = ( (start, is_workday), (is_workday, work), (is_workday, chill), )
A task can also return am empty list. This will cause the workflow branch to come to a halt and no further stats will be started.
Warning
A task can not be a generator (yield results).
False
:A task can also return a boolean. Should a task return
False
the workflow will wait until the condition changes toTrue
(or anything butFalse
):from joeflow import tasks form joeflow.workflows import Workflow from django.utils import timezone class WaitingWorkflow(Workflow): start = tasks.Start() def wait_for_weekend(self): return timezone.now().weekday() >= 5 def go_home(self): # enjoy life pass edges = ( (start, wait_for_weekend), (wait_for_weekend, go_home), )
Exceptions¶
Should a task raise an exception the tasks will change it status to failed. The exception that caused the task to fail will be recorded on the task itself and further propagated. You can find and rerun failed tasks form Django’s admin interface.
Generic machine tasks¶
Set of reusable machine tasks.
-
class
joeflow.tasks.machine.
Start
[source]¶ Start a new function via a callable.
Creates a new workflow instance and executes a start task. The start task does not do anything beyond creating the workflow.
Sample:
from django.db import models from joeflow.models import Workflow from joeflow import tasks class StartWorkflow(Workflow): a_text_field = models.TextField() start = tasks.Start() def end(self): pass edges = ( (start, end), ) workflow = StartWorkflow.start(a_text_field="initial data")
-
class
joeflow.tasks.machine.
Join
(*parents)[source]¶ Wait for all parent tasks to complete before continuing the workflow.
Parameters: *parents (str) – List of parent task names to wait for. Sample:
from django.db import models from joeflow.models import Workflow from joeflow import tasks class SplitJoinWorkflow(Workflow): parallel_task_value = models.PositiveIntegerField(default=0) start = tasks.Start() def split(self): return [self.batman, self.robin] def batman(self): self.parallel_task_value += 1 self.save(update_fields=['parallel_task_value']) def robin(self): self.parallel_task_value += 1 self.save(update_fields=['parallel_task_value']) join = tasks.Join('batman', 'robin') edges = ( (start, split), (split, batman), (split, robin), (batman, join), (robin, join), )
-
class
joeflow.tasks.machine.
Wait
(duration: datetime.timedelta)[source]¶ Wait for a certain amount of time and then continue with the next tasks.
Parameters: duration (datetime.timedelta) – Time to wait in time delta from creation of task. Sample:
import datetime from django.db import models from joeflow.models import Workflow from joeflow import tasks class WaitWorkflow(Workflow): parallel_task_value = models.PositiveIntegerField(default=0) start = tasks.Start() wait = tasks.Wait(datetime.timedelta(hours=3)) def end(self): pass edges = ( (start, wait), (wait, end), )