In this post we will share a starting project to use Apache ignite data grid an event and snapshot store to mix the benefits of the event sourcing and the data grid .
The implementation is based into the Journal plugin TCK specs provided by Akka persistence.
This is mainly using Apache ignite with akka persistence to provide journal and snapshot store by using the partitioned caches and benefit from the distributed highly available data grid features plus the nice query and data computations features in Ignite that can be used to have normalized views from the event store and do analytical jobs over them despite it is advised to keep write nodes separate from read nodes for better scalability.
Akka and Ignite used versions:
Akka version :2.5.7+ , Ignite Version :2.3.0+
Journal plugin
- All operations required by the Akka Persistence journal plugin API are fully supported.
- It use apache ignite partitioned cache with default number of backups to 1 , that can be changed into reference.conf file.
Snapshot store plugin
- Implements the Akka Persistence snapshot store plugin API.
How to use
Enable the plugins into your akka cluster configuration:
akka.persistence.journal.plugin = "akka.persistence.journal.ignite"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot.ignite"
Configure Ignite data grid properties , default configured on localhost.
ignite {
//to start client or server node to connect to Ignite data cluster
isClientNode = false
// for ONLY testing we use localhost
// used for grid cluster connectivity
tcpDiscoveryAddresses = "localhost"
metricsLogFrequency = 0
// thread pools used by Ignite , should based into target machine specs
queryThreadPoolSize = 4
dataStreamerThreadPoolSize = 1
managementThreadPoolSize = 2
publicThreadPoolSize = 4
systemThreadPoolSize = 2
rebalanceThreadPoolSize = 1
asyncCallbackPoolSize = 4
peerClassLoadingEnabled = false
// to enable or disable durable memory persistance
enableFilePersistence = true
// used for grid cluster connectivity, change it to suit your configuration
igniteConnectorPort = 11211
// used for grid cluster connectivity , change it to suit your configuration
igniteServerPortRange = "47500..47509"
//durable memory persistance storage file system path , change it to suit your configuration
ignitePersistenceFilePath = "./data"
}
and you will have ignite enabled as your journal and snapshot plugins , you can enable it by starting server node or client based into the configuration above .
Technical details :
the main journal implementation is IgniteWriteJournal :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* the main ignite journal plugin implementation based into AsyncWriteJournal | |
*/ | |
@Slf4j | |
public class IgniteWriteJournal extends AsyncWriteJournal { | |
private final Serializer serializer; | |
private final Store<JournalItem> storage; | |
private final IgniteCache<Long, JournalItem> cache; | |
private final IgniteCache<String, Long> sequenceNumberTrack; | |
private final BiFunction<Config, ActorSystem, JournalCaches> journalCacheProvider = new JournalCacheProvider(); | |
/** | |
* @param config akka configuration | |
* @throws NotSerializableException | |
*/ | |
public IgniteWriteJournal(Config config) throws NotSerializableException { | |
ActorSystem actorSystem = context().system(); | |
serializer = SerializationExtension.get(actorSystem).serializerFor(PersistentRepr.class); | |
storage = new Store<>(actorSystem); | |
JournalCaches journalCaches = journalCacheProvider.apply(config, actorSystem); | |
sequenceNumberTrack = journalCaches.getSequenceCache(); | |
cache = journalCaches.getJournalCache(); | |
} | |
private static Stream<Long> listsToStreamLong(List<List<?>> list) { | |
return list.stream().flatMap(Collection::stream).filter(o -> o instanceof Long).map(o -> (Long) o); | |
} | |
@Override | |
public Future<Void> doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Consumer<PersistentRepr> replayCallback) { | |
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> { | |
if (log.isDebugEnabled()) { | |
log.debug("doAsyncReplayMessages with params persistenceId: '{}' :fromSequenceNr {} :toSequenceNr {} :max {}" | |
, persistenceId, fromSequenceNr, toSequenceNr, max); | |
} | |
try (QueryCursor<Cache.Entry<Long, JournalItem>> query = cache | |
.query(new SqlQuery<Long, JournalItem>(JournalItem.class, "sequenceNr >= ? AND sequenceNr <= ? AND persistenceId=?") | |
.setArgs(fromSequenceNr, toSequenceNr, persistenceId))) { | |
final List<Cache.Entry<Long, JournalItem>> all = query.getAll(); | |
if (log.isDebugEnabled()) { | |
log.debug("replyMessage results {} {} {}", query.toString(), all.toString(), all.size()); | |
} | |
if (null != all && !all.isEmpty()) { | |
if (all.size() < max) { | |
for (Cache.Entry<Long, JournalItem> entry : all) { | |
if (log.isDebugEnabled()) { | |
log.debug("replay message persistenceId '{}' getKey {}", persistenceId, entry.getKey()); | |
} | |
replayCallback.accept(convert(entry.getValue())); | |
} | |
} else { | |
all.subList(0, (int) max).forEach(longJournalItemEntry -> { | |
if (log.isDebugEnabled()) { | |
log.debug("replay message persistenceId'{}' getKey {}", persistenceId, longJournalItemEntry.getKey()); | |
} | |
replayCallback.accept(convert(longJournalItemEntry.getValue())); | |
}); | |
} | |
} | |
} | |
return null; | |
}); | |
} | |
@Override | |
public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { | |
return Futures.future(() -> { | |
if (log.isDebugEnabled()) { | |
log.debug("doAsyncReadHighestSequenceNr '{}' – {}", persistenceId, fromSequenceNr); | |
} | |
if (sequenceNumberTrack.containsKey(persistenceId)) { | |
long highestSequenceNr = sequenceNumberTrack.get(persistenceId); | |
if (highestSequenceNr != 0) { | |
if (log.isDebugEnabled()) { | |
log.debug("doAsyncReadHighestSequenceNr '{}' {} -> {}", persistenceId, fromSequenceNr, highestSequenceNr); | |
} | |
return highestSequenceNr; | |
} else { | |
if (log.isDebugEnabled()) { | |
log.debug("doAsyncReadHighestSequenceNr '{}' {} -> {}", persistenceId, fromSequenceNr, fromSequenceNr); | |
} | |
return fromSequenceNr; | |
} | |
} | |
return fromSequenceNr; | |
}, storage.getDispatcher()); | |
} | |
@Override | |
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> messages) { | |
return Futures.sequence( | |
StreamSupport.stream(messages.spliterator(), false) | |
.map(this::writeBatch) | |
.collect(Collectors.toList()), storage.getDispatcher() | |
); | |
} | |
private Future<Optional<Exception>> writeBatch(AtomicWrite atomicWrite) { | |
return storage.execute(atomicWrite.persistenceId(), cache, (entityIdParam, cacheParam) -> { | |
try { | |
Map<Long, JournalItem> batch = JavaConverters | |
.seqAsJavaListConverter(atomicWrite.payload()) | |
.asJava().stream() | |
.map(this::convert) | |
.collect(Collectors.toMap(JournalItem::getSequenceNr, item -> item)); | |
cache.putAll(batch); | |
if (log.isDebugEnabled()) { | |
log.debug("doAsyncWriteMessages persistenceId'{}': batch {}", atomicWrite.persistenceId(), batch); | |
} | |
return Optional.empty(); | |
} catch (Exception e) { | |
log.error(e.getMessage(), e); | |
return Optional.of(e); | |
} | |
}); | |
} | |
@Override | |
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { | |
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> { | |
if (log.isDebugEnabled()) { | |
log.debug("doAsyncDeleteMessagesTo persistenceId'{}' toSequenceNr : {}", persistenceId, toSequenceNr); | |
} | |
List<List<?>> seq = cache | |
.query(new SqlFieldsQuery("select sequenceNr from JournalItem where sequenceNr <= ? and persistenceId=?") | |
.setArgs(toSequenceNr, persistenceId)) | |
.getAll(); | |
Set<Long> keys = listsToStreamLong(seq).collect(Collectors.toSet()); | |
if (log.isDebugEnabled()) { | |
log.debug("remove keys {}", keys); | |
} | |
cache.removeAll(keys); | |
return null; | |
}); | |
} | |
private JournalItem convert(PersistentRepr p) { | |
return new JournalItem(p.sequenceNr(), p.persistenceId(), serializer.toBinary(p)); | |
} | |
private PersistentRepr convert(JournalItem item) { | |
return (PersistentRepr) serializer.fromBinary(item.getPayload()); | |
} | |
} |
the main snapshot implementation class is IgniteSnapshotStore :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* the main ignite snapshot implementation based into SnapshotStore | |
*/ | |
@Slf4j | |
public class IgniteSnapshotStore extends SnapshotStore { | |
private final Serializer serializer; | |
private final Store<SnapshotItem> storage; | |
private final IgniteCache<Long, SnapshotItem> cache; | |
private final BiFunction<Config, ActorSystem, IgniteCache<Long, SnapshotItem>> snapshotCacheProvider = | |
new SnapshotCacheProvider(); | |
public IgniteSnapshotStore(Config config) throws NotSerializableException { | |
ActorSystem actorSystem = context().system(); | |
storage = new Store<>(actorSystem); | |
serializer = SerializationExtension.get(actorSystem).serializerFor(Snapshot.class); | |
cache = snapshotCacheProvider.apply(config, actorSystem); | |
} | |
private static Set<Long> listsToSetLong(List<List<?>> list) { | |
return list.stream().flatMap(Collection::stream).filter(o -> o instanceof Long).map(o -> (Long) o).collect(Collectors.toSet()); | |
} | |
@Override | |
public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { | |
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> { | |
if (log.isDebugEnabled()) { | |
log.debug("doLoadAsync '{}' {} {}", persistenceId, criteria.minSequenceNr(), criteria.toString()); | |
} | |
try (QueryCursor<Cache.Entry<Long, SnapshotItem>> query = cache | |
.query(new SqlQuery<Long, SnapshotItem>(SnapshotItem.class, "sequenceNr >= ? AND sequenceNr <= ? AND timestamp >= ? AND timestamp <= ? and persistenceId=?") | |
.setArgs(criteria.minSequenceNr(), criteria.maxSequenceNr(), criteria.minTimestamp(), criteria.maxTimestamp(), persistenceId))) { | |
List<Cache.Entry<Long, SnapshotItem>> iterator = query.getAll(); | |
final Optional<Cache.Entry<Long, SnapshotItem>> max = iterator.stream().max((o1, o2) -> { | |
if (o1.getValue().getSequenceNr() > o2.getValue().getSequenceNr()) { | |
return 1; | |
} else if (o1.getValue().getTimestamp() > o2.getValue().getTimestamp()) { | |
return 1; | |
} else { | |
return –1; | |
} | |
}); | |
return Optional.ofNullable(max.isPresent() ? convert(persistenceId, max.get().getValue()) : null); | |
} | |
}); | |
} | |
@Override | |
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) { | |
return storage.execute(metadata.persistenceId(), cache, (entityIdParam, cacheParam) -> { | |
if (log.isDebugEnabled()) { | |
log.debug("doSaveAsync '{}' ({})", metadata.persistenceId(), metadata.sequenceNr()); | |
} | |
SnapshotItem item = convert(metadata, snapshot); | |
cache.put(item.getSequenceNr(), item); | |
return null; | |
}); | |
} | |
@Override | |
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) { | |
return storage.execute(metadata.persistenceId(), cache, (entityIdParam, cacheParam) -> { | |
if (log.isDebugEnabled()) { | |
log.debug("doDeleteAsync '{}' ({})", metadata.persistenceId(), metadata.sequenceNr()); | |
} | |
cache.remove(metadata.sequenceNr()); | |
return null; | |
}); | |
} | |
@Override | |
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { | |
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> { | |
if (log.isDebugEnabled()) { | |
log.debug("doDeleteAsync '{}' ({}; {})", persistenceId, criteria.minSequenceNr(), criteria.maxSequenceNr()); | |
} | |
List<List<?>> seq = cache | |
.query(new SqlFieldsQuery("select sequenceNr from SnapshotItem where sequenceNr >= ? AND sequenceNr <= ? AND timestamp >= ? AND timestamp <= ? and persistenceId=?") | |
.setArgs(criteria.minSequenceNr(), criteria.maxSequenceNr(), criteria.minTimestamp(), criteria.maxTimestamp(), persistenceId)) | |
.getAll(); | |
Set<Long> keys = listsToSetLong(seq); | |
if (log.isDebugEnabled()) { | |
log.debug("remove keys {}", keys); | |
} | |
cache.removeAll(keys); | |
return null; | |
}); | |
} | |
private SnapshotItem convert(SnapshotMetadata metadata, Object snapshot) { | |
return new SnapshotItem(metadata.sequenceNr(), metadata.persistenceId(), metadata.timestamp(), serializer.toBinary(new Snapshot(snapshot))); | |
} | |
private SelectedSnapshot convert(String persistenceId, SnapshotItem item) { | |
SnapshotMetadata metadata = new SnapshotMetadata(persistenceId, item.getSequenceNr(), item.getTimestamp()); | |
Snapshot snapshot = (Snapshot) serializer.fromBinary(item.getPayload()); | |
return SelectedSnapshot.create(metadata, snapshot.data()); | |
} | |
} |
For more details feel free to dive into the code based , it is a small code base for now !.
Summary :
- This is working in progress any contribution would be really helpful , please check the project on GitHub and give a hand !
- project in GitHub : https://github.com/Romeh/akka-persistance-ignite
Thanks Romeh for the project. Looks like this project works fine for Single Node and not for cluster integration and we are facing below issues
1. Single node:
We are able to persist the yang tree data to ODL – Good
On restart of the server (Ignite) OSGi Karaf container – Data is not available in ODL to read.
Are we missing something, why data is not loaded in-memory. But the binary and ignite has those journal and snapshot files
2. Cluster Node:
Data replication at the ignite is working fine after forming the ignite cluster. But looks like data which is persisted in Node1 Yang tree data is not available to read it in Node2. Restconf yang page shows “No data to read” issue..
LikeLike
Hey !
Do you have sample code i can check plus ignite configuration for cluster ?
Thanks a lot,
Romeh
LikeLike