Akka Persistence with Apache ignite

In this post we will share a starting project to use Apache ignite data grid an event and snapshot store to mix the benefits of the event sourcing and the data grid .

The implementation is based into the Journal plugin TCK specs provided by Akka persistence.

This is mainly using Apache ignite with akka persistence to provide journal and snapshot store by using the partitioned caches and benefit from the distributed highly available data grid features plus the nice query and data computations features in Ignite that can be used to have normalized views from the event store and do analytical jobs over them despite it is advised to keep write nodes separate from read nodes for better scalability.

Final

Akka and Ignite used versions:

Akka version :2.5.7+ , Ignite Version :2.3.0+

Journal plugin

  • All operations required by the Akka Persistence journal plugin API are fully supported.
  • It use apache ignite partitioned cache with default number of backups to 1 , that can be changed into reference.conf file.

Snapshot store plugin

How to use

Enable the plugins into your akka cluster configuration:

akka.persistence.journal.plugin = "akka.persistence.journal.ignite"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot.ignite"

Configure Ignite data grid properties , default configured on localhost.

ignite {
  //to start client or server node to connect to Ignite data cluster 
  isClientNode = false
  // for ONLY testing we use localhost
  // used for grid cluster connectivity
  tcpDiscoveryAddresses = "localhost"
  metricsLogFrequency = 0
  // thread pools used by Ignite , should based into target machine specs
  queryThreadPoolSize = 4
  dataStreamerThreadPoolSize = 1
  managementThreadPoolSize = 2
  publicThreadPoolSize = 4
  systemThreadPoolSize = 2
  rebalanceThreadPoolSize = 1
  asyncCallbackPoolSize = 4
  peerClassLoadingEnabled = false
  // to enable or disable durable memory persistance
  enableFilePersistence = true
  // used for grid cluster connectivity, change it to suit your configuration 
  igniteConnectorPort = 11211
  // used for grid cluster connectivity , change it to suit your configuration 
  igniteServerPortRange = "47500..47509"
  //durable memory persistance storage file system path , change it to suit your configuration 
  ignitePersistenceFilePath = "./data"
}

 

and you will have ignite enabled as your journal and snapshot plugins , you can enable it by starting server node or client based into the configuration  above .

Technical details :

the main journal implementation is IgniteWriteJournal :


/**
* the main ignite journal plugin implementation based into AsyncWriteJournal
*/
@Slf4j
public class IgniteWriteJournal extends AsyncWriteJournal {
private final Serializer serializer;
private final Store<JournalItem> storage;
private final IgniteCache<Long, JournalItem> cache;
private final IgniteCache<String, Long> sequenceNumberTrack;
private final BiFunction<Config, ActorSystem, JournalCaches> journalCacheProvider = new JournalCacheProvider();
/**
* @param config akka configuration
* @throws NotSerializableException
*/
public IgniteWriteJournal(Config config) throws NotSerializableException {
ActorSystem actorSystem = context().system();
serializer = SerializationExtension.get(actorSystem).serializerFor(PersistentRepr.class);
storage = new Store<>(actorSystem);
JournalCaches journalCaches = journalCacheProvider.apply(config, actorSystem);
sequenceNumberTrack = journalCaches.getSequenceCache();
cache = journalCaches.getJournalCache();
}
private static Stream<Long> listsToStreamLong(List<List<?>> list) {
return list.stream().flatMap(Collection::stream).filter(o -> o instanceof Long).map(o -> (Long) o);
}
@Override
public Future<Void> doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Consumer<PersistentRepr> replayCallback) {
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doAsyncReplayMessages with params persistenceId: '{}' :fromSequenceNr {} :toSequenceNr {} :max {}"
, persistenceId, fromSequenceNr, toSequenceNr, max);
}
try (QueryCursor<Cache.Entry<Long, JournalItem>> query = cache
.query(new SqlQuery<Long, JournalItem>(JournalItem.class, "sequenceNr >= ? AND sequenceNr <= ? AND persistenceId=?")
.setArgs(fromSequenceNr, toSequenceNr, persistenceId))) {
final List<Cache.Entry<Long, JournalItem>> all = query.getAll();
if (log.isDebugEnabled()) {
log.debug("replyMessage results {} {} {}", query.toString(), all.toString(), all.size());
}
if (null != all && !all.isEmpty()) {
if (all.size() < max) {
for (Cache.Entry<Long, JournalItem> entry : all) {
if (log.isDebugEnabled()) {
log.debug("replay message persistenceId '{}' getKey {}", persistenceId, entry.getKey());
}
replayCallback.accept(convert(entry.getValue()));
}
} else {
all.subList(0, (int) max).forEach(longJournalItemEntry -> {
if (log.isDebugEnabled()) {
log.debug("replay message persistenceId'{}' getKey {}", persistenceId, longJournalItemEntry.getKey());
}
replayCallback.accept(convert(longJournalItemEntry.getValue()));
});
}
}
}
return null;
});
}
@Override
public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
return Futures.future(() -> {
if (log.isDebugEnabled()) {
log.debug("doAsyncReadHighestSequenceNr '{}' – {}", persistenceId, fromSequenceNr);
}
if (sequenceNumberTrack.containsKey(persistenceId)) {
long highestSequenceNr = sequenceNumberTrack.get(persistenceId);
if (highestSequenceNr != 0) {
if (log.isDebugEnabled()) {
log.debug("doAsyncReadHighestSequenceNr '{}' {} -> {}", persistenceId, fromSequenceNr, highestSequenceNr);
}
return highestSequenceNr;
} else {
if (log.isDebugEnabled()) {
log.debug("doAsyncReadHighestSequenceNr '{}' {} -> {}", persistenceId, fromSequenceNr, fromSequenceNr);
}
return fromSequenceNr;
}
}
return fromSequenceNr;
}, storage.getDispatcher());
}
@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> messages) {
return Futures.sequence(
StreamSupport.stream(messages.spliterator(), false)
.map(this::writeBatch)
.collect(Collectors.toList()), storage.getDispatcher()
);
}
private Future<Optional<Exception>> writeBatch(AtomicWrite atomicWrite) {
return storage.execute(atomicWrite.persistenceId(), cache, (entityIdParam, cacheParam) -> {
try {
Map<Long, JournalItem> batch = JavaConverters
.seqAsJavaListConverter(atomicWrite.payload())
.asJava().stream()
.map(this::convert)
.collect(Collectors.toMap(JournalItem::getSequenceNr, item -> item));
cache.putAll(batch);
if (log.isDebugEnabled()) {
log.debug("doAsyncWriteMessages persistenceId'{}': batch {}", atomicWrite.persistenceId(), batch);
}
return Optional.empty();
} catch (Exception e) {
log.error(e.getMessage(), e);
return Optional.of(e);
}
});
}
@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doAsyncDeleteMessagesTo persistenceId'{}' toSequenceNr : {}", persistenceId, toSequenceNr);
}
List<List<?>> seq = cache
.query(new SqlFieldsQuery("select sequenceNr from JournalItem where sequenceNr <= ? and persistenceId=?")
.setArgs(toSequenceNr, persistenceId))
.getAll();
Set<Long> keys = listsToStreamLong(seq).collect(Collectors.toSet());
if (log.isDebugEnabled()) {
log.debug("remove keys {}", keys);
}
cache.removeAll(keys);
return null;
});
}
private JournalItem convert(PersistentRepr p) {
return new JournalItem(p.sequenceNr(), p.persistenceId(), serializer.toBinary(p));
}
private PersistentRepr convert(JournalItem item) {
return (PersistentRepr) serializer.fromBinary(item.getPayload());
}
}

the main snapshot implementation class is IgniteSnapshotStore  :


/**
* the main ignite snapshot implementation based into SnapshotStore
*/
@Slf4j
public class IgniteSnapshotStore extends SnapshotStore {
private final Serializer serializer;
private final Store<SnapshotItem> storage;
private final IgniteCache<Long, SnapshotItem> cache;
private final BiFunction<Config, ActorSystem, IgniteCache<Long, SnapshotItem>> snapshotCacheProvider =
new SnapshotCacheProvider();
public IgniteSnapshotStore(Config config) throws NotSerializableException {
ActorSystem actorSystem = context().system();
storage = new Store<>(actorSystem);
serializer = SerializationExtension.get(actorSystem).serializerFor(Snapshot.class);
cache = snapshotCacheProvider.apply(config, actorSystem);
}
private static Set<Long> listsToSetLong(List<List<?>> list) {
return list.stream().flatMap(Collection::stream).filter(o -> o instanceof Long).map(o -> (Long) o).collect(Collectors.toSet());
}
@Override
public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doLoadAsync '{}' {} {}", persistenceId, criteria.minSequenceNr(), criteria.toString());
}
try (QueryCursor<Cache.Entry<Long, SnapshotItem>> query = cache
.query(new SqlQuery<Long, SnapshotItem>(SnapshotItem.class, "sequenceNr >= ? AND sequenceNr <= ? AND timestamp >= ? AND timestamp <= ? and persistenceId=?")
.setArgs(criteria.minSequenceNr(), criteria.maxSequenceNr(), criteria.minTimestamp(), criteria.maxTimestamp(), persistenceId))) {
List<Cache.Entry<Long, SnapshotItem>> iterator = query.getAll();
final Optional<Cache.Entry<Long, SnapshotItem>> max = iterator.stream().max((o1, o2) -> {
if (o1.getValue().getSequenceNr() > o2.getValue().getSequenceNr()) {
return 1;
} else if (o1.getValue().getTimestamp() > o2.getValue().getTimestamp()) {
return 1;
} else {
return1;
}
});
return Optional.ofNullable(max.isPresent() ? convert(persistenceId, max.get().getValue()) : null);
}
});
}
@Override
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
return storage.execute(metadata.persistenceId(), cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doSaveAsync '{}' ({})", metadata.persistenceId(), metadata.sequenceNr());
}
SnapshotItem item = convert(metadata, snapshot);
cache.put(item.getSequenceNr(), item);
return null;
});
}
@Override
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
return storage.execute(metadata.persistenceId(), cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doDeleteAsync '{}' ({})", metadata.persistenceId(), metadata.sequenceNr());
}
cache.remove(metadata.sequenceNr());
return null;
});
}
@Override
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return storage.execute(persistenceId, cache, (entityIdParam, cacheParam) -> {
if (log.isDebugEnabled()) {
log.debug("doDeleteAsync '{}' ({}; {})", persistenceId, criteria.minSequenceNr(), criteria.maxSequenceNr());
}
List<List<?>> seq = cache
.query(new SqlFieldsQuery("select sequenceNr from SnapshotItem where sequenceNr >= ? AND sequenceNr <= ? AND timestamp >= ? AND timestamp <= ? and persistenceId=?")
.setArgs(criteria.minSequenceNr(), criteria.maxSequenceNr(), criteria.minTimestamp(), criteria.maxTimestamp(), persistenceId))
.getAll();
Set<Long> keys = listsToSetLong(seq);
if (log.isDebugEnabled()) {
log.debug("remove keys {}", keys);
}
cache.removeAll(keys);
return null;
});
}
private SnapshotItem convert(SnapshotMetadata metadata, Object snapshot) {
return new SnapshotItem(metadata.sequenceNr(), metadata.persistenceId(), metadata.timestamp(), serializer.toBinary(new Snapshot(snapshot)));
}
private SelectedSnapshot convert(String persistenceId, SnapshotItem item) {
SnapshotMetadata metadata = new SnapshotMetadata(persistenceId, item.getSequenceNr(), item.getTimestamp());
Snapshot snapshot = (Snapshot) serializer.fromBinary(item.getPayload());
return SelectedSnapshot.create(metadata, snapshot.data());
}
}

For more details feel free to dive into the code based , it is a small code base for now !.

Summary :

2 comments

  1. Thanks Romeh for the project. Looks like this project works fine for Single Node and not for cluster integration and we are facing below issues

    1. Single node:
    We are able to persist the yang tree data to ODL – Good
    On restart of the server (Ignite) OSGi Karaf container – Data is not available in ODL to read.
    Are we missing something, why data is not loaded in-memory. But the binary and ignite has those journal and snapshot files

    2. Cluster Node:
    Data replication at the ignite is working fine after forming the ignite cluster. But looks like data which is persisted in Node1 Yang tree data is not available to read it in Node2. Restconf yang page shows “No data to read” issue..

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s