Spring boot with Apache Ignite fail fast distributed map reduce closures

Here we are going a cover a case need in Apache ignite , what if you want to do distributed compute jobs that do data computations or external service calls using Apache Ignite distributed closures that has map reduce nature and fail fast once of the computations fail or it has the unexpected results , how to do that ? below we are going to explain that .

mapReduce

  1. The main node will submit a collection of Ignite callable plus the custom fail fast reducer that we will explain into details later
  2. The list of jobs will be distributed between the server nodes in the current cluster topology with the same cluster group for actual execution and to use the distributed parallel map reduce nature execution of Ignite compute grid in synchronous or asynchronous non blocking way
  3. each single Job will return the result or error to the fail fast reducer which upon receiving the results of each single compute task , it will determine if it can keep collection other results before reducing the final aggregated result or fail fast immediately once one of the jobs failed or has the unexpected  results

So how it is  implemented ?

  • The fail fast Ignite compute grid reducer :


/**
* a fail fast map reducer to decide if it should keep waiting for other jobs to final reduce or it should terminate
* and fail fast with the current responses if any failed
*/
@Component
@Scope("prototype")
public class FailFastReducer implements IgniteReducer<ServiceResponse, MapReduceResponse> {
private final Map<String, ServiceResponse> responseMap = new ConcurrentHashMap<>();
/**
* @param serviceCallResponse the job response
* @return return a boolean to decide it is time to reduce or not
*/
@Override
public boolean collect(ServiceResponse serviceCallResponse) {
if (serviceCallResponse != null) {
if (serviceCallResponse.isSuccess()) {
responseMap.put(serviceCallResponse.getServiceOrigin(), serviceCallResponse);
return true;
} else {
responseMap.put(serviceCallResponse.getServiceOrigin(), serviceCallResponse);
return false;
}
}
return false;
}
/**
* @return the final generic reduced response containing the list of jobs responses and global status
*/
@Override
public MapReduceResponse reduce() {
return MapReduceResponse.builder().success(checkStatus()).reducedResponses(responseMap).build();
}
/**
* @return the generic reduced response status based into the single status of each single collected jobs response
*/
public boolean checkStatus() {
boolean status = true;
for (Map.Entry<String, ServiceResponse> key : responseMap.entrySet()) {
status = status && responseMap.get(key.getKey()).isSuccess();
}
return status;
}
}

  • Generic Ignite compute utility to trigger the map reduce tasks in synchronous or asynchronous non blocking :


/**
* generic utility class for map reduce call
*/
@Component
public class DataGridCompute {
@Autowired
private Ignite ignite;
/**
* @param jobs the list of jobs to be distributed into the data grid nodes from the master node
* @param igniteReducer the ignite reducer which will be used to determine the reduction and collection logic
* @param callback the callback to be invoked upon receiving the reduced final response
* @param <R> generic response type from the jobs
* @param <E> generic map reduced response type
* @throws IgniteException
*
* a generic async map reduced call inside ignite compute grid
*/
public <R, E> void executeMapReduceFailFast(Collection<IgniteCallable<R>> jobs, IgniteReducer<R, E> igniteReducer, Consumer<E> callback) throws IgniteException {
// you need to define your cluster group and if any defined in your data grid
IgniteCompute igniteCompute = ignite.compute(ignite.cluster().forPredicate(clusterNode -> !clusterNode.isClient()));
//execute the list of jobs in map reduce fashion and pass the custom reducer as well
IgniteFuture<E> future=igniteCompute.callAsync(jobs, igniteReducer);
// then async listen for the result to invoke your post call back
future.listen(result -> callback.accept(result.get()));
}
/**
* @param jobs the list of jobs to be distributed into the data grid nodes from the master node
* @param igniteReducer the ignite reducer which will be used to determine the reduction and collection logic
* @param <R> generic response type from the jobs
* @param <E> generic map reduced response type
* @throws IgniteException
* @return <E> generic map reduced response type
* a generic sync map reduced call inside ignite compute grid
*/
public <R, E> E executeMapReduceFailFastSync(Collection<IgniteCallable<R>> jobs, IgniteReducer<R, E> igniteReducer) throws IgniteException {
// you need to define your cluster group and if any defined in your data grid
IgniteCompute igniteCompute = ignite.compute(ignite.cluster().forPredicate(clusterNode -> !clusterNode.isClient()));
//execute the list of jobs in map reduce fashion and pass the custom reducer as well
return igniteCompute.call(jobs, igniteReducer);
}
}

  • The custom aggregated reducer response class:


/**
* the generic reduce response that contain all single collected jobs responses
*/
@Builder
@Getter
@ToString
@EqualsAndHashCode
public class MapReduceResponse implements Serializable {
private Map<String, ServiceResponse> reducedResponses;
boolean success;
}

  • The single task response class:


/**
* @param <T> the service call response type
*/
@Getter
@Setter
@ToString
@EqualsAndHashCode
@Builder
public class ServiceResponse<T> implements Serializable {
private T response;
private boolean success ;
private String serviceOrigin;
}

  • Example service for calling the Ignite compute grid with the distributed closures and we will use the synchronous way for testing the execution :


/**
* sample service for how to call map reduce jobs in parallel asynchronous with fail fast reducer
*/
@Service
public class ComputeService {
private static final Logger logger = LoggerFactory.getLogger(AlertsService.class);
private final DataGridCompute dataGridCompute;
@Autowired
private FailFastReducer failFastReducer;
@Autowired
public ComputeService(DataGridCompute dataGridCompute) {
this.dataGridCompute = dataGridCompute;
}
/**
* call to ignite compute grid with list if jobs in parallel asynchronous
*/
public void validateWithAllServicesInParallelAsync(List<IgniteCallable<ServiceResponse>> jobs){
// execute the jobs with the fail fast reducer in parallel and async the just log the final aggregated response
dataGridCompute.executeMapReduceFailFast(jobs,failFastReducer,
mapReduceResponse -> logger.debug(mapReduceResponse.toString()));
}
/**
* call to ignite compute grid with list if jobs in parallel synchronous
*/
public MapReduceResponse validateWithAllServicesInParallelSync(List<IgniteCallable<ServiceResponse>> jobs){
// execute the jobs with the fail fast reducer in parallel and sync the just log the final aggregated response
return dataGridCompute.executeMapReduceFailFastSync(jobs,failFastReducer);
}
}

  • Unit test for fail fast and successful cases using spring boot integration test:


@Test
public void testMapReducedJobsWithFailFastSync(){
// example of ignite jobs, first one succeeded , second fail, third succeeded , but the reducer will fail fast once he collect the failed job
IgniteCallable validationServiceJob1=() -> ServiceResponse.<String>builder().response("Job 1 is valid").serviceOrigin("job1")
.success(true).build();
IgniteCallable validationServiceJob2=() -> ServiceResponse.<String>builder().response("Job 2 is failed").serviceOrigin("job2")
.success(false).build();
IgniteCallable validationServiceJob3=() -> ServiceResponse.<String>builder().response("Job 3 is valid").serviceOrigin("job3")
.success(true).build();
final MapReduceResponse mapReduceResponse = computeService.validateWithAllServicesInParallelSync(
Arrays.asList(validationServiceJob1,validationServiceJob2,validationServiceJob3)
);
boolean status=true;
for(ServiceResponse serviceResponse: mapReduceResponse.getReducedResponses().values()){
status=status && serviceResponse.isSuccess();
}
// make sure the aggregated status is failed
assertEquals(status,false);
assertEquals(mapReduceResponse.isSuccess(),false);
}
@Test
public void testMapReducedJobsWithFailFastSyncFirstAllSuccess(){
// example of ignite jobs, all succeeded , so the reducer collect all and return successfully
IgniteCallable validationServiceJob1=() -> ServiceResponse.<String>builder().serviceOrigin("job1")
.response("Job 1 is valid").success(true).build();
IgniteCallable validationServiceJob2=() -> ServiceResponse.<String>builder().serviceOrigin("job2")
.response("Job 2 is valid").success(true).build();
IgniteCallable validationServiceJob3=() -> ServiceResponse.<String>builder().serviceOrigin("job3")
.response("Job 3 is valid").success(true).build();
final MapReduceResponse mapReduceResponse = computeService.validateWithAllServicesInParallelSync(
Arrays.asList(validationServiceJob1,validationServiceJob2,validationServiceJob3)
);
boolean status=true;
for(ServiceResponse serviceResponse: mapReduceResponse.getReducedResponses().values()){
status=status && serviceResponse.isSuccess();
}
// make sure the aggregated status is success
assertEquals(status,true);
assertEquals(mapReduceResponse.isSuccess(),true);
}

view raw

UnitTest.java

hosted with ❤ by GitHub

References :

  1. Ignite compute grid : https://apacheignite.readme.io/docs/compute-grid
  2. The code is on GitHub : https://github.com/Romeh/spring-boot-ignite

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s