Search

Concurrent Execution in Process Instances with Flowable

Updated: Nov 17, 2020

Here's a very common question that we get from Activiti, Camunda and Flowable users:


"Can we run multiple activities concurrently within a single process instance?"


The answer, unfortunately, has always been a qualified no, and it's caused some consternation among those users at times.


In their 6.6.0 release, Flowable has changed that answer to a yes, providing the ability to execute instance logic concurrently with their new FutureJavaDelegates functionality. Since this is potentially a very exciting new innovation in the open-source BPM space, let's dig into it a bit in this blog post.


A Little Background


First, let's provide a little bit of background into how process instances are generally executed by these engines. Consider the following process model:



By default, won't the steps execute concurrently? I mean, we're using a Parallel Gateway, right? The answer is no; although the steps in the lower fork don't rely on the execution of the steps in the upper fork, the process engine will execute each Service Task one at a time*. The engines are built this way to avoid any competition while updating the underlying relational database tables; if we executed the steps concurrently, we could have conflicting writes. More on that shortly!


What if we mark all of the Service Tasks as asynchronous by setting their flowable:async attributes to true? Since the Async Executor will then be executing the task (activity) instances, will they execute in parallel? By default, the answer there is also no; the Async Executor will ensure that it's executing only one activity from each process instance at a time for the same reason cited earlier.


What if we also set the flowable:exclusive attribute to false? In this case, the Async Executor will no longer enforce exclusivity for job execution in a single process instance, making it possible to execute multiple steps concurrently. Great, right? There's a catch... In this scenario, each job would potentially be attempting to update the same process instance simultaneously. Since these jobs would be attempting to write to the same database records, the non-exclusive jobs would result in rather frequent OptimisticLockingException instances. Although these are expected by the engine as part of normal execution and are generally resolved seamlessly, executing instance logic concurrently in this manner won't typically result in better overall performance. Yes, the activities would be executed concurrently, but they wouldn't necessarily be executed efficiently; they might fill up your logs with exceptions, innocuous though they may be from the Flowable engine's viewpoint.


That's the end of the story with Activiti, Camunda and all versions of Flowable prior to 6.6.0; in other words, you're left to either live with the lack of concurrency within individual process instances or use concurrency with asynchronous job execution and deal with the trade-offs.


Enter Flowable's FutureJavaDelegates


With Flowable 6.6.0, however, you can write implementations of FutureJavaDelegate or its subclasses, which use Java 8's CompletableFuture features to allow for smart multi-threading. How does this work, and does it really help with concurrency in process instances?


First, in the models we've discussed thus far, we're using Delegate Expressions in each of the Service Tasks, and we're using field injection to allow logging that indicates which tasks are executing. For example, in Service Task 1-1, we pass one to the branch variable and one to the activity variable. You can see in the JavaDelegate implementation below how those injected fields are being used for logging:



@Component
public class ExclusivityDelegate implements JavaDelegate {

    private final Logger log = 
        LoggerFactory.getLogger(ExclusivityDelegate.class);

    //For field injection.
    private Expression branch;
    private Expression activity;

    @Override
    public void execute(DelegateExecution execution) {
        //Retrieve the injected values. This is executed 
        //  exclusively in the process instance.
        String branchStr = (String)branch.getValue(execution);
        String activityStr = 
            (String)activity.getValue(execution);

        log.info("Executing the \\"execute\\" method for branch " 
            + branchStr + " and activity " + activityStr + ".");

        execution.setVariable("exclusivityVar_" + branchStr + "_" 
            + activityStr, branchStr + "_" + activityStr);
    }

}


To leverage the FutureJavaDelegates functionality, we'll use a very similar process model as shown below:



In that model, we're using a different delegate implementation, which in this case is an implementation of FutureJavaDelegate, as shown below:



@Component
public class ConcurrencyFutureDelegate implements FutureJavaDelegate<Map<String,Object>> {

    private final Logger log = 
        LoggerFactory.getLogger(ConcurrencyFutureDelegate.class);

    //For field injection.
    private Expression branch;
    private Expression activity;

    @Override
    public CompletableFuture<Map<String, Object>> 
        execute(DelegateExecution execution,
            AsyncTaskInvoker taskInvoker) {
        //Retrieve the injected values. This is executed 
        //  exclusively in the process instance.
        String branchStr = (String)branch.getValue(execution);
        String activityStr = 
            (String)activity.getValue(execution);

        log.info("Executing the \\"execute\\" method for branch " 
            + branchStr + " and activity " + activityStr + ".");

        return taskInvoker.submit(() -> {
            //This occurs in a separate thread (asynchronously).
            log.info("Executing the asynchronous code for branch"  
                + branchStr + " and activity " + activityStr + 
                ".");

            Map<String,Object> variables =  
                new HashMap<String,Object>();
            variables.put("exclusivityVar_" + branchStr + "_" +     
                activityStr, branchStr + "_" + activityStr);
            return variables;
        });
    }

    @Override
    public void afterExecution(DelegateExecution execution, 
        Map<String, Object> executionData) {
        //This is executed exclusively in the process instance 
        //  *after* completion of the code executed via the
        //  "submit" method call (using the AsyncTaskInvoker) in 
        //  the "execute" method above.
        String branchStr = (String)branch.getValue(execution);
        String activityStr = 
            (String)activity.getValue(execution);
        log.info("Executing the \\"afterExecution\\" method for" 
            + " branch " + branchStr + " and activity " + 
            activityStr + ".");

        for(String key:executionData.keySet()) {
            execution.setVariable(key, executionData.get(key));
        }
    }
}


There are a few very important characteristics of this delegate, and let's hit them one by one:


  1. The execute method remains, and it is executed as soon as the engine encounters the delegate within the instance.

  2. The logic that is referenced via our lambda expression is the same logic you could place in a Callable implementation's call() method; it will execute via a separate thread - concurrently - while the engine continues with other steps (i.e. in the separate fork).

  3. Behind the scenes, Flowable will maintain a "plan" of future operations, and it will execute the afterExecution method only once the CompletableFuture has been completed (meaning also that the executionData is available).


This gives us a lot of control over how logic is executed in our process instances, and - in our opinion - it makes concurrency easily accessible in our Flowable process models & instances.

Based on our internal performance testing, while there is some apparent and expected overhead associated with the management of multi-threading and the need to wait for our concurrent threads to complete, the performance is very good even when concurrency really isn't needed. So when concurrency is needed, this feature will truly shine.


Closing Thoughts


While the Activiti/Camunda/Flowable family of process engines has always performed very well, we haven't had a good answer for concurrency within individual process instances... until now. As such, this is a very welcome new feature in the open source BPM world, one that could change the game for many clients who have heavyweight, concurrent processing needs.


If you're using Flowable and believe you could benefit from concurrent activity execution in your process instances, we would encourage you to download Flowable 6.6.0 and put this new functionality through its paces in your environment.


If you have any questions on this blog post or how you can use FutureJavaDelegates in your own Flowable applications, please contact us at info@summit58.co.


* - While the steps in each branch execute serially in Camunda BPM (Service Task 1-1, Service Task 1-2, Service Task 1-3, Service Task 2-1, Service Task 2-2, Service Task 2-3), the steps are interleaved in Flowable (Service Task 1-1, Service Task 2-1, Service Task 1-2, Service Task 2-2, Service Task 1-3, Service Task 2-3).

120 views0 comments

Recent Posts

See All