Here we are going a cover a case need in Apache ignite , what if you want to do distributed compute jobs that do data computations or external service calls using Apache Ignite distributed closures that has map reduce nature and fail fast once of the computations fail or it has the unexpected results , how to do that ? below we are going to explain that .
- The main node will submit a collection of Ignite callable plus the custom fail fast reducer that we will explain into details later
- The list of jobs will be distributed between the server nodes in the current cluster topology with the same cluster group for actual execution and to use the distributed parallel map reduce nature execution of Ignite compute grid in synchronous or asynchronous non blocking way
- each single Job will return the result or error to the fail fast reducer which upon receiving the results of each single compute task , it will determine if it can keep collection other results before reducing the final aggregated result or fail fast immediately once one of the jobs failed or has the unexpected results
So how it is implemented ?
- The fail fast Ignite compute grid reducer :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* a fail fast map reducer to decide if it should keep waiting for other jobs to final reduce or it should terminate | |
* and fail fast with the current responses if any failed | |
*/ | |
@Component | |
@Scope("prototype") | |
public class FailFastReducer implements IgniteReducer<ServiceResponse, MapReduceResponse> { | |
private final Map<String, ServiceResponse> responseMap = new ConcurrentHashMap<>(); | |
/** | |
* @param serviceCallResponse the job response | |
* @return return a boolean to decide it is time to reduce or not | |
*/ | |
@Override | |
public boolean collect(ServiceResponse serviceCallResponse) { | |
if (serviceCallResponse != null) { | |
if (serviceCallResponse.isSuccess()) { | |
responseMap.put(serviceCallResponse.getServiceOrigin(), serviceCallResponse); | |
return true; | |
} else { | |
responseMap.put(serviceCallResponse.getServiceOrigin(), serviceCallResponse); | |
return false; | |
} | |
} | |
return false; | |
} | |
/** | |
* @return the final generic reduced response containing the list of jobs responses and global status | |
*/ | |
@Override | |
public MapReduceResponse reduce() { | |
return MapReduceResponse.builder().success(checkStatus()).reducedResponses(responseMap).build(); | |
} | |
/** | |
* @return the generic reduced response status based into the single status of each single collected jobs response | |
*/ | |
public boolean checkStatus() { | |
boolean status = true; | |
for (Map.Entry<String, ServiceResponse> key : responseMap.entrySet()) { | |
status = status && responseMap.get(key.getKey()).isSuccess(); | |
} | |
return status; | |
} | |
} |
- Generic Ignite compute utility to trigger the map reduce tasks in synchronous or asynchronous non blocking :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* generic utility class for map reduce call | |
*/ | |
@Component | |
public class DataGridCompute { | |
@Autowired | |
private Ignite ignite; | |
/** | |
* @param jobs the list of jobs to be distributed into the data grid nodes from the master node | |
* @param igniteReducer the ignite reducer which will be used to determine the reduction and collection logic | |
* @param callback the callback to be invoked upon receiving the reduced final response | |
* @param <R> generic response type from the jobs | |
* @param <E> generic map reduced response type | |
* @throws IgniteException | |
* | |
* a generic async map reduced call inside ignite compute grid | |
*/ | |
public <R, E> void executeMapReduceFailFast(Collection<IgniteCallable<R>> jobs, IgniteReducer<R, E> igniteReducer, Consumer<E> callback) throws IgniteException { | |
// you need to define your cluster group and if any defined in your data grid | |
IgniteCompute igniteCompute = ignite.compute(ignite.cluster().forPredicate(clusterNode -> !clusterNode.isClient())); | |
//execute the list of jobs in map reduce fashion and pass the custom reducer as well | |
IgniteFuture<E> future=igniteCompute.callAsync(jobs, igniteReducer); | |
// then async listen for the result to invoke your post call back | |
future.listen(result -> callback.accept(result.get())); | |
} | |
/** | |
* @param jobs the list of jobs to be distributed into the data grid nodes from the master node | |
* @param igniteReducer the ignite reducer which will be used to determine the reduction and collection logic | |
* @param <R> generic response type from the jobs | |
* @param <E> generic map reduced response type | |
* @throws IgniteException | |
* @return <E> generic map reduced response type | |
* a generic sync map reduced call inside ignite compute grid | |
*/ | |
public <R, E> E executeMapReduceFailFastSync(Collection<IgniteCallable<R>> jobs, IgniteReducer<R, E> igniteReducer) throws IgniteException { | |
// you need to define your cluster group and if any defined in your data grid | |
IgniteCompute igniteCompute = ignite.compute(ignite.cluster().forPredicate(clusterNode -> !clusterNode.isClient())); | |
//execute the list of jobs in map reduce fashion and pass the custom reducer as well | |
return igniteCompute.call(jobs, igniteReducer); | |
} | |
} |
- The custom aggregated reducer response class:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* the generic reduce response that contain all single collected jobs responses | |
*/ | |
@Builder | |
@Getter | |
@ToString | |
@EqualsAndHashCode | |
public class MapReduceResponse implements Serializable { | |
private Map<String, ServiceResponse> reducedResponses; | |
boolean success; | |
} |
- The single task response class:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @param <T> the service call response type | |
*/ | |
@Getter | |
@Setter | |
@ToString | |
@EqualsAndHashCode | |
@Builder | |
public class ServiceResponse<T> implements Serializable { | |
private T response; | |
private boolean success ; | |
private String serviceOrigin; | |
} |
- Example service for calling the Ignite compute grid with the distributed closures and we will use the synchronous way for testing the execution :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* sample service for how to call map reduce jobs in parallel asynchronous with fail fast reducer | |
*/ | |
@Service | |
public class ComputeService { | |
private static final Logger logger = LoggerFactory.getLogger(AlertsService.class); | |
private final DataGridCompute dataGridCompute; | |
@Autowired | |
private FailFastReducer failFastReducer; | |
@Autowired | |
public ComputeService(DataGridCompute dataGridCompute) { | |
this.dataGridCompute = dataGridCompute; | |
} | |
/** | |
* call to ignite compute grid with list if jobs in parallel asynchronous | |
*/ | |
public void validateWithAllServicesInParallelAsync(List<IgniteCallable<ServiceResponse>> jobs){ | |
// execute the jobs with the fail fast reducer in parallel and async the just log the final aggregated response | |
dataGridCompute.executeMapReduceFailFast(jobs,failFastReducer, | |
mapReduceResponse -> logger.debug(mapReduceResponse.toString())); | |
} | |
/** | |
* call to ignite compute grid with list if jobs in parallel synchronous | |
*/ | |
public MapReduceResponse validateWithAllServicesInParallelSync(List<IgniteCallable<ServiceResponse>> jobs){ | |
// execute the jobs with the fail fast reducer in parallel and sync the just log the final aggregated response | |
return dataGridCompute.executeMapReduceFailFastSync(jobs,failFastReducer); | |
} | |
} |
- Unit test for fail fast and successful cases using spring boot integration test:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testMapReducedJobsWithFailFastSync(){ | |
// example of ignite jobs, first one succeeded , second fail, third succeeded , but the reducer will fail fast once he collect the failed job | |
IgniteCallable validationServiceJob1=() -> ServiceResponse.<String>builder().response("Job 1 is valid").serviceOrigin("job1") | |
.success(true).build(); | |
IgniteCallable validationServiceJob2=() -> ServiceResponse.<String>builder().response("Job 2 is failed").serviceOrigin("job2") | |
.success(false).build(); | |
IgniteCallable validationServiceJob3=() -> ServiceResponse.<String>builder().response("Job 3 is valid").serviceOrigin("job3") | |
.success(true).build(); | |
final MapReduceResponse mapReduceResponse = computeService.validateWithAllServicesInParallelSync( | |
Arrays.asList(validationServiceJob1,validationServiceJob2,validationServiceJob3) | |
); | |
boolean status=true; | |
for(ServiceResponse serviceResponse: mapReduceResponse.getReducedResponses().values()){ | |
status=status && serviceResponse.isSuccess(); | |
} | |
// make sure the aggregated status is failed | |
assertEquals(status,false); | |
assertEquals(mapReduceResponse.isSuccess(),false); | |
} | |
@Test | |
public void testMapReducedJobsWithFailFastSyncFirstAllSuccess(){ | |
// example of ignite jobs, all succeeded , so the reducer collect all and return successfully | |
IgniteCallable validationServiceJob1=() -> ServiceResponse.<String>builder().serviceOrigin("job1") | |
.response("Job 1 is valid").success(true).build(); | |
IgniteCallable validationServiceJob2=() -> ServiceResponse.<String>builder().serviceOrigin("job2") | |
.response("Job 2 is valid").success(true).build(); | |
IgniteCallable validationServiceJob3=() -> ServiceResponse.<String>builder().serviceOrigin("job3") | |
.response("Job 3 is valid").success(true).build(); | |
final MapReduceResponse mapReduceResponse = computeService.validateWithAllServicesInParallelSync( | |
Arrays.asList(validationServiceJob1,validationServiceJob2,validationServiceJob3) | |
); | |
boolean status=true; | |
for(ServiceResponse serviceResponse: mapReduceResponse.getReducedResponses().values()){ | |
status=status && serviceResponse.isSuccess(); | |
} | |
// make sure the aggregated status is success | |
assertEquals(status,true); | |
assertEquals(mapReduceResponse.isSuccess(),true); | |
} |
References :
- Ignite compute grid : https://apacheignite.readme.io/docs/compute-grid
- The code is on GitHub : https://github.com/Romeh/spring-boot-ignite