Skip to main content

Task Execution API

Task Execution API


Each business process you're automating contains one or more tasks. Task - is a special module, it's usually atomic business-operation such as "Read mails from mailbox" or "Save file to Data Base". Logic of your specific business-process should be described inside the special module which is called AP (Automation Process). AP is manipulating with Tasks and their input/outputs, to route the process to the correct flow.


AP (Automation Process)



AP module - is the special class which extends ApModule. The entry point of your AP class is method "run", which contains the high level logic of your business process, manipulates the tasks and their input/output and describes the process execution flow. 

The AP class must be also annotated by @ApModuleEntry to provide the name of your AP, which will be displayed on Control Server.

Below there is an example of what AP class looks like:

MyDemoAp.java
import eu.ibagroup.easyrpa.engine.annotation.ApModuleEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApModule;
import eu.ibagroup.easyrpa.engine.apflow.TaskOutput;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ApModuleEntry(name = "My First Demo AP")
public class MyDemoAp extends ApModule {
	
	public TaskOutput run(){
		//automation process logic here
	}
	
}


Task



Task module - is the special class which extends ApTask. The entry point of your Task class is method "execute". Inside this method you have the access to task input and task output objects. Usually task contains some atomic business operation logic, like "Read mails from mailbox" or "Save file to Data Base". Also inside the task you can instantiate the driver object to work with some application which requires UI manipulations.

The Task classs must be also annotated by @ApTaskEntry to provide the name of your task which will be displayed on Control Server.

Example of Task class:

MyFirstTask.java
import eu.ibagroup.easyrpa.engine.annotation.ApTaskEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApTask;
import lombok.extern.slf4j.Slf4j;

@ApTaskEntry(name = "My First Task")
@Slf4j
public class MyFirstTask extends ApTask {

	@Override
	public void execute() {
		//task logic here
	}
	
}

Task execution


Here are brief steps of the task execution:

Check the annotations article for the detailed task steps.

In the following examples we will use four tasks to demonstrates different ways of tasks execution. They're pretty the same, the only difference is that the first task sleeps longer that 2nd, 2nd sleeps longer than 3rd and 3rd sleeps longer than 4th:

Task1.java
import eu.ibagroup.easyrpa.engine.annotation.ApTaskEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApTask;
import lombok.extern.slf4j.Slf4j;


@ApTaskEntry(name = "Task 1")
@Slf4j
public class Task1 extends ApTask {

	private static final int SLEEP_MILLIS = 4000;

	@Override
	public void execute() throws InterruptedException {
		Thread.sleep(SLEEP_MILLIS);
		log.info("Task 1 finished. Provided input: {}. It was sleeping {} millis", getInput().get("test_var"), SLEEP_MILLIS);
		getOutput().put("test_var", "Output from Task1");
	}
}
Task2.java
import eu.ibagroup.easyrpa.engine.annotation.ApTaskEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApTask;
import lombok.extern.slf4j.Slf4j;


@ApTaskEntry(name = "Task 2")
@Slf4j
public class Task2 extends ApTask {

	private static final int SLEEP_MILLIS = 3000;

	@Override
	public void execute() throws InterruptedException {
		Thread.sleep(SLEEP_MILLIS);
		log.info("Task 2 finished. Provided input: {}. It was sleeping {} millis", getInput().get("test_var"), SLEEP_MILLIS);
		getOutput().put("test_var", "Output from Task2");
	}
}
Task3.java
import eu.ibagroup.easyrpa.engine.annotation.ApTaskEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApTask;
import lombok.extern.slf4j.Slf4j;


@ApTaskEntry(name = "Task 3")
@Slf4j
public class Task3 extends ApTask {

	private static final int SLEEP_MILLIS = 2000;

	@Override
	public void execute() throws InterruptedException {
		Thread.sleep(SLEEP_MILLIS);
		log.info("Task 3 finished. Provided input: {}. It was sleeping {} millis", getInput().get("test_var"), SLEEP_MILLIS);
		getOutput().put("test_var", "Output from Task3");
	}
}
Task4.java
import eu.ibagroup.easyrpa.engine.annotation.ApTaskEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApTask;
import lombok.extern.slf4j.Slf4j;


@ApTaskEntry(name = "Task 4")
@Slf4j
public class Task4 extends ApTask {

	private static final int SLEEP_MILLIS = 1000;

	@Override
	public void execute() throws InterruptedException {
		Thread.sleep(SLEEP_MILLIS);
		log.info("Task 4 finished. Provided input: {}. It was sleeping {} millis", getInput().get("test_var"), SLEEP_MILLIS);
		getOutput().put("test_var", "Output from Task4");
	}
}

All ApModule execute and split methods will start execution of your tasks asynchronously, so task execution will be started, but the main thread of your AP will not wait for result and will continue processing the next line of code. It's useful when you don't need the output results immediately from that task and your AP can continue performing the following instructions.

Executing single task

This is a simple method to execute your single task with some provided input. 

execute
CompletableFuture<TaskOutput> execute(TaskInput input, Class<? extends ApTask> task)

Where:

  • input - your input object which came from local runner in case of local execution or input object which is provided on Control Server in case of executionmergeStrategy on the server.
  • task- class of the task your want to execute, e.g. "MyFirstTask.class".

To get the output from that object, you need to call method get() или join() of CompletableFuture class - result will be either returned immediately if task completed or thread will be blocked waiting for the task to be completed if not. 

Also instead of TaskInput object this method may accept TaskOutput object which allows you to use the output from previously executed task as an input for the next task:

CompletableFuture execute(TaskOutput prevTaskOutput, Class<? extends ApTask> task)


Executing multiple tasks sequentially (pipeline)

Below you'll find the complete example of AP which executes four tasks where all the outputs are used as the inputs for the following tasks:

ExecuteDemoAp.java
package eu.ibagroup.easyrpa.taskexecution;

import eu.ibagroup.easyrpa.engine.annotation.ApModuleEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApModule;
import eu.ibagroup.easyrpa.engine.apflow.TaskOutput;
import eu.ibagroup.easyrpa.engine.boot.ApModuleRunner;
import eu.ibagroup.easyrpa.taskexecution.task.Task1;
import eu.ibagroup.easyrpa.taskexecution.task.Task2;
import eu.ibagroup.easyrpa.taskexecution.task.Task3;
import eu.ibagroup.easyrpa.taskexecution.task.Task4;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ApModuleEntry(name = "Task Execution Demo")
public class ExecuteDemoAp extends ApModule {

	public TaskOutput run() throws Exception {
		log.info("execute method test started");
		TaskOutput previousOutput = execute(getInput(), Task1.class).get();
		previousOutput = execute(previousOutput, Task2.class).get();
		previousOutput = execute(previousOutput, Task3.class).get();
		previousOutput = execute(previousOutput, Task4.class).get();

		log.info("execute method test finished. Final output: {}", previousOutput.get("test_var"));
		return previousOutput;
	}
}

After execution is finished we will see the following logs:

2020-08-26 19:15:28,740 INFO	e.i.e.taskexecution.ExecuteDemoAp [main] - execute method test started
2020-08-26 19:15:32,805 INFO	e.i.easyrpa.taskexecution.task.Task1 [main] - Task 1 finished. Provided input: null. It was sleeping 4000 millis
2020-08-26 19:15:35,822 INFO	e.i.easyrpa.taskexecution.task.Task2 [main] - Task 2 finished. Provided input: Output from Task1. It was sleeping 3000 millis
2020-08-26 19:15:37,838 INFO	e.i.easyrpa.taskexecution.task.Task3 [main] - Task 3 finished. Provided input: Output from Task2. It was sleeping 2000 millis
2020-08-26 19:15:38,860 INFO	e.i.easyrpa.taskexecution.task.Task4 [main] - Task 4 finished. Provided input: Output from Task3. It was sleeping 1000 millis
2020-08-26 19:15:38,866 INFO	e.i.e.taskexecution.ExecuteDemoAp [main] - execute method test finished. Final output: Output from Task4


Same results can be achieved with alternative way - CompletableFuture API provides possibility to combine executions in chains, where output from previous task will be used as an input for the next task automatically.  For that you can call the method "thenCompose" from CompletableFuture object combined with the "execute" method, which returns the java.util.function.Function object. For that you should use the following methods signatures:

Function<TaskOutput, CompletableFuture<TaskOutput>> execute(Class<? extends ApTask> class)

As you may see, the only different here is that those methods don't accept the input as argument, because it will be provided automatically.

So the example of using it is the following:

ExecuteAsyncChainDemoAp.java
@Slf4j
@ApModuleEntry(name = "Execute Async Chain Demo")
public class ExecuteAsyncChainDemoAp extends ApModule {

	public static void main(String[] args) {
		ApModuleRunner.localLaunch(ExecuteAsyncChainDemoAp.class);
	}

	public TaskOutput run() throws Exception {
		log.error("executeAsync method chain test started");

		CompletableFuture<TaskOutput> finalCompletableFuture = execute(getInput(), Task1.class)
		.thenCompose(execute(Task2.class))
		.thenCompose(execute(Task3.class))
		.thenCompose(execute(Task4.class));

		TaskOutput finalOutput = finalCompletableFuture.get();

		log.error("executeAsync method chain test finished. Final output: {}; ", finalOutput.get("test_var"));

		return finalOutput;
	}
}

After the execution is finished will see the following logs:

2020-08-26 19:51:07,212 INFO e.i.e.t.ExecuteAsyncChainDemoAp [main] - executeAsync method chain test started
2020-08-26 19:51:11,268 INFO	e.i.easyrpa.taskexecution.task.Task1 [pool-1-thread-1] - Task 1 finished. Provided input: null. It was sleeping 4000 millis
2020-08-26 19:51:14,284 INFO	e.i.easyrpa.taskexecution.task.Task2 [pool-1-thread-2] - Task 2 finished. Provided input: Output from Task1. It was sleeping 3000 millis
2020-08-26 19:51:16,298 INFO	e.i.easyrpa.taskexecution.task.Task3 [pool-1-thread-1] - Task 3 finished. Provided input: Output from Task2. It was sleeping 2000 millis
2020-08-26 19:51:17,312 INFO	e.i.easyrpa.taskexecution.task.Task4 [pool-1-thread-2] - Task 4 finished. Provided input: Output from Task3. It was sleeping 1000 millis
2020-08-26 19:51:17,317 INFO e.i.e.t.ExecuteAsyncChainDemoAp [main] - executeAsync method chain test finished. Final output: Output from Task4; 


Executing multiple tasks parallelly

If there is no need in pipeline - tasks can be executed parallelly thus making total AP run time significantly smaller.

Below you'll find the complete example of AP which executes four tasks parallelly and prints the output from all of them separately:

ExecuteAsyncDemoAp.java
package eu.ibagroup.easyrpa.taskexecution;

import eu.ibagroup.easyrpa.engine.annotation.ApModuleEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApModule;
import eu.ibagroup.easyrpa.engine.apflow.TaskOutput;
import eu.ibagroup.easyrpa.engine.boot.ApModuleRunner;
import eu.ibagroup.easyrpa.taskexecution.task.Task1;
import eu.ibagroup.easyrpa.taskexecution.task.Task2;
import eu.ibagroup.easyrpa.taskexecution.task.Task3;
import eu.ibagroup.easyrpa.taskexecution.task.Task4;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;

@Slf4j
@ApModuleEntry(name = "Execute Async Demo")
public class ExecuteAsyncDemoAp extends ApModule {

	public static void main(String[] args) {
		ApModuleRunner.localLaunch(ExecuteAsyncDemoAp.class);
	}

	public TaskOutput run() throws Exception {
		log.info("executeAsync method test started");

		CompletableFuture<TaskOutput> completableFuture1 = execute(getInput(), Task1.class);
		CompletableFuture<TaskOutput> completableFuture2 = execute(getInput(), Task2.class);
		CompletableFuture<TaskOutput> completableFuture3 = execute(getInput(), Task3.class);
		CompletableFuture<TaskOutput> completableFuture4 = execute(getInput(), Task4.class);

		log.info("executeAsync method test finished. Final output Task1: {}; Task2: {}; Task3: {}; Task4: {}", completableFuture1.get().get("test_var"),
		completableFuture2.get().get("test_var"), completableFuture3.get().get("test_var"), completableFuture4.get().get("test_var"));

		return completableFuture4.get();
	}
}

After the execution is finished we will see the following logs:

2020-08-26 19:37:55,503 INFO e.i.e.t.ExecuteAsyncDemoAp [main] - executeAsync method test started
2020-08-26 19:37:56,574 INFO	e.i.easyrpa.taskexecution.task.Task4 [pool-1-thread-4] - Task 4 finished. Provided input: null. It was sleeping 1000 millis
2020-08-26 19:37:57,574 INFO	e.i.easyrpa.taskexecution.task.Task3 [pool-1-thread-3] - Task 3 finished. Provided input: Output from Task4. It was sleeping 2000 millis
2020-08-26 19:37:58,574 INFO	e.i.easyrpa.taskexecution.task.Task2 [pool-1-thread-2] - Task 2 finished. Provided input: Output from Task3. It was sleeping 3000 millis
2020-08-26 19:37:59,573 INFO	e.i.easyrpa.taskexecution.task.Task1 [pool-1-thread-1] - Task 1 finished. Provided input: Output from Task2. It was sleeping 4000 millis
2020-08-26 19:37:59,578 INFO e.i.e.t.ExecuteAsyncDemoAp [main] - executeAsync method test finished. Final output Task1: Output from Task1; Task2: Output from Task2; Task3: Output from Task3; Task4: Output from Task4


Executing multiple tasks parallelly with merge strategy

If required parallel task results can finally be merged using some merge strategy. It can be done with the split + merge methods combo like in the example below:

ExecuteAllAsyncDemoAp.java
import eu.ibagroup.easyrpa.engine.annotation.ApModuleEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApModuleBase;
import eu.ibagroup.easyrpa.engine.apflow.TaskOutput;
import eu.ibagroup.easyrpa.taskexecution.task.Task1;
import eu.ibagroup.easyrpa.taskexecution.task.Task2;
import eu.ibagroup.easyrpa.taskexecution.task.Task3;
import eu.ibagroup.easyrpa.taskexecution.task.Task4;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;

@Slf4j
@ApModuleEntry(name = "Execute All Async Demo")
public class ExecuteAllAsyncDemoAp extends ApModuleBase {

	public TaskOutput run() throws Exception {
		log.info("executeSubtasks method test started");
		CompletableFuture<TaskOutput> completableFutureCombined = split(getInput(), Task1.class, Task2.class, Task3.class, Task4.class).merge();
		TaskOutput output = completableFutureCombined.get();
		log.info("executeSubtasks method test finished. Final output: {};", output.get("RESULT",List.class).stream().map(o->((TaskOutput)o).get("test_var")).collect(Collectors.joining(";")));
		return output;
	}

}

The final output will be the result of merging of all outputs into single RESULT property.

After the execution is finished we will see the following logs:

2020-08-26 20:07:43,537 INFO	e.i.e.t.ExecuteAllAsyncDemoAp [main] - executeSubtasks method test started
2020-08-26 20:07:44,644 INFO	e.i.easyrpa.taskexecution.task.Task4 [pool-1-thread-5] - Task 4 finished. Provided input: null. It was sleeping 1000 millis
2020-08-26 20:07:45,634 INFO	e.i.easyrpa.taskexecution.task.Task3 [pool-1-thread-4] - Task 3 finished. Provided input: null. It was sleeping 2000 millis
2020-08-26 20:07:46,638 INFO	e.i.easyrpa.taskexecution.task.Task2 [pool-1-thread-3] - Task 2 finished. Provided input: null. It was sleeping 3000 millis
2020-08-26 20:07:47,627 INFO	e.i.easyrpa.taskexecution.task.Task1 [pool-1-thread-2] - Task 1 finished. Provided input: null. It was sleeping 4000 millis
2020-08-26 20:07:47,636 INFO	e.i.e.t.ExecuteAllAsyncDemoAp [main] - executeSubtasks method test finished. Final output:	Output from Task1;Output from Task2;Output from Task3;Output from Task4;


It's possible to define your own output merge strategy. For that you should use the following method sugnature:

CompletableFuture<TaskOutput> merge(Class<? extends ParallelGateway> mergeStrategy)

Where:

  • mergeStrategy - is a ParrallelGateway sibling where you can define your own merge strategy.

Returns: CompletableFuture object on which you can call "get()" method to get TaskOutput object.

Let's see the following example where we define our own merge strategy. In case of conflicts (output variable with the same name) - it combines the value using the pipe ("|") symbol:

ExecuteAllAsyncWithMergeStrategyDemoAp.java
package eu.ibagroup.easyrpa.taskexecution;

import eu.ibagroup.easyrpa.engine.annotation.ApModuleEntry;
import eu.ibagroup.easyrpa.engine.apflow.ApModule;
import eu.ibagroup.easyrpa.engine.apflow.TaskOutput;
import eu.ibagroup.easyrpa.engine.boot.ApModuleRunner;
import eu.ibagroup.easyrpa.engine.service.ExecutionService;
import eu.ibagroup.easyrpa.taskexecution.task.Task1;
import eu.ibagroup.easyrpa.taskexecution.task.Task2;
import eu.ibagroup.easyrpa.taskexecution.task.Task3;
import eu.ibagroup.easyrpa.taskexecution.task.Task4;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;

@Slf4j
@ApModuleEntry(name = "Execute All Async with merge strategy Demo")
public class ExecuteAllAsyncWithMergeStrategyDemoAp extends ApModule {	 	

	@ApTaskEntry(name = "CombineWithPipeMergeStrategy", description = "CombineWithPipeMergeStrategy")
	private static final class CombineWithPipeMergeStrategy extends ParallelGateway {
		@Override
		protected TaskOutput merge(TaskOutput finalTaskOutput, List<TaskOutput> finishedTaskOutputList) {
			String pipedResult = finishedTaskOutputList.stream().map(o -> o.get("test_var")).collect(Collectors.joining("|"));
			finalTaskOutput.set("test_var", pipedResult);
			return finalTaskOutput;
		}
	}

	public TaskOutput run() throws Exception {
		log.info("executeAllAsync with merge strategy method test started");

		CompletableFuture<TaskOutput> completableFutureCombined = split(getInput(), Task1.class, Task2.class, Task3.class, Task4.class).merge(CombineWithPipeMergeStrategy.class);
		TaskOutput output = completableFutureCombined.get();
		log.info("executeAllAsync with merge strategy method test finished. Final output: {};", output.get("test_var"));
		return output;	 }
}

After the execution we will see the following logs:

2020-08-26 20:17:53,658 INFO	e.i.e.t.ExecuteAllAsyncWithMergeStrategyDemoAp [main] - executeSubtasks with merge strategy method test started
2020-08-26 20:17:54,747 INFO	e.i.easyrpa.taskexecution.task.Task4 [pool-1-thread-5] - Task 4 finished. Provided input: null. It was sleeping 1000 millis
2020-08-26 20:17:55,753 INFO	e.i.easyrpa.taskexecution.task.Task3 [pool-1-thread-4] - Task 3 finished. Provided input: null. It was sleeping 2000 millis
2020-08-26 20:17:56,758 INFO	e.i.easyrpa.taskexecution.task.Task2 [pool-1-thread-3] - Task 2 finished. Provided input: null. It was sleeping 3000 millis
2020-08-26 20:17:57,743 INFO	e.i.easyrpa.taskexecution.task.Task1 [pool-1-thread-2] - Task 1 finished. Provided input: null. It was sleeping 4000 millis
2020-08-26 20:17:57,754 INFO	e.i.e.t.ExecuteAllAsyncWithMergeStrategyDemoAp [main] - executeSubtasks with merge strategy method test finished. Final output: Output from Task1|Output from Task2|Output from Task3|Output from Task4;

Executing multiple tasks parallelly against list of data

Finally there is one more split method signature:

execute
Merge split(TaskInput input, List<T> data, BiFunction<TaskInput, T, CompletableFuture<TaskOutput>> flow)

Where:

  • input - task input
  • data - list of objects to be translated into multiple CompletableFuture<TaskOutput>
  • flow - BiFunction that translates single data object into a CompletableFuture<TaskOutput>.

Lets consider following scenario - there is a string list with some ids that should be added to each task input as a "test_var" property before task is executed. Sample below implements the solution with the help of the split method accepting custom BiFunction 

ExecuteAllAsyncWithMergeStrategyDemoAp.java
@Slf4j
@ApModuleEntry(name = "Execute All Async with data list Demo")
public class ExecuteAllAsyncWithDataListDemoAp extends ApModule {


	 public TaskOutput run() throws Exception { 		
		log.info("executeAllAsync with list of data method test started");

		List<String> ids = asList("id1", "id2");

		CompletableFuture<TaskOutput> completableFutureCombined = split(getInput(), ids, (ti, id) -> {
			ti.set("test_var", id);
			return execute(ti, Task1.class).thenCompose(execute(Task2.class));
		}).merge();

		TaskOutput output = completableFutureCombined.get();
		log.info("executeAllAsync with list of data method test finished. Final output: {};",
				output.get("test_var"));
		return output;
	} 
}

After the execution we will see the following logs:

2022-08-23 15:23:14,812 INFO	e.i.t.a.ExecuteAllAsyncWithMergeStrategyDemo2 [main] [] [] - executeAllAsync with list of data method test started
2022-08-23 15:23:18,978 INFO	eu.ibagroup.training1.tasks.Task1 [pool-1-thread-3] - Task 1 finished. Provided input: id2. It was sleeping 4000 millis
2022-08-23 15:23:18,978 INFO	eu.ibagroup.training1.tasks.Task1 [pool-1-thread-4] - Task 1 finished. Provided input: id1. It was sleeping 4000 millis
2022-08-23 15:23:22,121 INFO	eu.ibagroup.training1.tasks.Task2 [pool-1-thread-3] - Task 2 finished. Provided input: Output from Task1. It was sleeping 3000 millis
2022-08-23 15:23:22,121 INFO	eu.ibagroup.training1.tasks.Task2 [pool-1-thread-6] - Task 2 finished. Provided input: Output from Task1. It was sleeping 3000 millis
2022-08-23 15:23:22,201 INFO	e.i.t.a.ExecuteAllAsyncWithMergeStrategyDemo2 [main] [] [] - executeAllAsync with list of data method test finished. Final output: Output from Task2;