Spring Boot Akka Event Sourcing Starter – Part 4 – Final

Now here we will share some possible designs when you use the spring boot event sourcing toolkit starter plus some remarks and action points .

What are some possible designs using the toolkit for event sourcing and CQRS services :

Using the toolkit with Apache ignite and Kafka for event streaming :

springEventSourcingOverviewFinal

 

Here we do the following :

  1. We use the event sourcing toolkit starter to define the domain write service that will be act as the command side plus we can benefit from Spring Cloud if you will need to support micro-services architecture
  2. The read side application can have different data model for the query needs
  3. We use Apache Ignite data grid as the event store which can be easily scaled by adding more server nodes and you can benefit from the data grid rich features to some computations , Rich SQL query support  plus we will use the Apache ignite continuous query to push new added events to kafka.
  4. We do integration between Apache and Kafka via Kafka connect to read the new added events from the events cache and stream that to the read side application and any other interested application like Fraud detection , reporting …ect.
  5. Infrastructure structure :  Akka Cluster , Ignite cluster , Kafka Cluster Plus Service orchestration like kubernetes .

Using the toolkit with Apache Cassandra :

CassandraFinal2

Here we do the following :

  1. We use the event sourcing toolkit starter to define the domain write service that will be act as the command side plus we can benefit from Spring Cloud if you will need to support micro-services architecture
  2. We use Cassandra as the event sore
  3. We can keep use Kafka connect to stream events to other systems for read query and other analysis and reporting needs.
  4. Infrastructure structure : Akka cluster , Cassandra Cluster , Kafka Cluster Plus Service orchestration like kubernetes .

Using the toolkit with Apache Ignite only:

If you application does not need all those complexisities and just small sized service you use Ignite only with the toolkit to implement the Write and Read side of your CQRS and event sourcing application .

OverviewWithCassandra

  1. We use the event sourcing toolkit starter to define the domain write service that will be act as the command side plus we can benefit from Spring Cloud if you will need to support micro-services architecture
  2. We use the Ignite data grid for event store and for query read projection by using the continuous query or cache interceptors to push the new added event to another cache with the target read model
  3. You can separate the read and write caches into 2 different cluster groups.
  4. You can still use Kafka Connect to stream events to other systems if you like

Using the toolkit with Apache Ignite and Kafka Streams:

KafkaStreams

  1. We use the event sourcing toolkit starter to define the domain write service that will be act as the command side plus we can benefit from Spring Cloud if you will need to support micro-services architecture
  2. We use Apache Ignite for the event store with Kafka connect to stream the events
  3. We use Kafka streams to implement the read side

Off-course there are many other designs , I just shared some in the blog here now we need to summarize some remarks and actions points to be taken into consideration

Summary notes:

  1. Event sourcing and CQRS is not a golden bullet for every need , use it properly when it is really needed and when it fit the actual reasons behind it
  2. You need to have distributed tracing and monitoring for your different clusters for better traceability and error handling
  3. With Akka persistance , you need to cover the following when using it for your domain entities :
    1. Use split brain resolver when using Akka clustering to avoid split brains and to have a predictable cluster partitioning behavior. Few useful links
    2. Make sure to not use Java serialization as it is really bad for your performance and throughput of your application with Akka persistence
    3. Need to think through about active-active model for cross cluster support due to the cluster sharding limitation with that but it is covered in the next points below
  4. When it comes to Active-Active support model for your application , you have multiple options for active active data center support which will come with latency and performance impact , nothing is for free anyhow:
    1. Akka persistence active active model support extension which is an commercial add on : Akka-Persistance-Active-Active
    2. If you use Apache ignite as your event store , you have 2 options :
      1. You can use a backing store for your data grid that support cross data center replication for example Cassandra
      2. You can use GridGain cross data center replication feature which is the commercial version of Apache ignite
    3. You can use Kafka cluster cross data center replication to replicate your event data cross multiple data centers .
    4. If you use Cassandra as event store , you can use cross data center replication feature of Cassandra
    5. At the end you need to think through about how you can will handle active-active model for your event sourced entities and all its side effects with state replication and construction especially if you use Akka persistence which most likely will not be supported without the commercial add-on or implement your solution as well for that.

Hoping I have shared some useful insights which they are open for discussion and validation anytime.

Advertisements

Spring Boot Akka Event Sourcing Starter – Part 3 – The Working Example

Now I will share a working service example of how to use the event sourcing toolkit starter in practice , in the example I will show the following:

  1. How to configure and use the event sourcing starter with spring boot web application
  2. How to implement your aggregate entity using the API of the toolkit
  3. How to define your entity flow using the execution flow API
  4. How to configure your entity
  5. How to configure your Akka system with spring boot
  6. How to call your aggregates from your service and connect that to your DDD service REST API
  7. How to use Google Protobuf to serialize your events instead of Java serialization
  8. The usage of Apache Ignite as your persistence event store with Akka persistence
  9. In Part 4 we sill cover the summary and possible designs plus some special remarks

How to configure and use the event sourcing starter with spring boot web application:

In your spring boot app , add the event souring tool kit maven dependency :

How to implement your aggregate entity (OrderManager) using the API of the toolkit and implement its flow using the toolkit DSL abstraction

your order aggregate flow (OrderManager) implementation will be as the following :

Screen Shot 2018-04-26 at 11.56.43

where the order manager aggregate class will extend the toolkit persistent entity class and define the flow logic for command and event handlers inside your custom entity using the flow execution DSL exposed to you from the Persistent entity , the flow will be as the following :

Untitled Diagram(2)

The code of the order manager class with enough documentation for the flow DSL is on github: OrderManager Java Code

How to configure your persistent aggregate entity via the toolkit API :

AS being explained before , you just need to implement the following interface PersistentEntityProperties and the toolkit will auto discover it for the your entity cluster sharding and persistence configuration , the code reference for the config in the working sample is: The entity configuration

How to configure your Akka system with spring boot

Just need to add a reference for your akka system config file in your spring boot app config file (application.yml) with the proper properties names and the toolkit will pick it up :

How to call your aggregates from your service and connect that to your DDD service REST API

  • First implement your order broker service that will has a reference for PersistentEntityBroker which provided by the toolkit to abstract the cluster sharding lookup for your entity , the Order broker is here : Order Broker
  • Then you use the non blocking asynchronous PatternsCS ask to call the target entity using  PersistentEntityBroker , code snippet to show :

  • Then from your REST API resource , you can call your broker to invoke the target command or query in Async non blocking way as well , the rest API class reference is here (OrderRestController):  , small code snapshot to show how it is done :

  • When you run the app , there is a run-time swagger for the different application REST APIs DOC and testing on http://localhost:9595/swagger-ui.html where you can test order create , validate , sign and state query as well .

How to use Protobuf to serialize your events instead of Java serialization

As we know Java serialization is not optimal for performance optimization , so here I shared also how you can ProtoBuf protocol to do the serialization of the events as it is more optimized , where to check the implementation points :

  • You need to implement SerializerWithStringManifest from AKKA perisistance which in our application is OrderManagerSerializer which will use the generated protobuf builder class from the file below
  • The protobuf definition for the event classes in proto folder of the project: EventsAndCommands.proto
  • Add the needed maven build plugin the generate the needed code based into the schema definition above , the plugin configuration will be as the following :

The usage of Apache Ignite as your persistence event store with Akka persistence:

Here I am going to use a custom Akka persistance plugin with Apache ignite I have created before on : https://github.com/Romeh/akka-persistance-ignite , you can check it out for more technical details about it.

So I just added the needed maven dependency plus add the needed Apache ignite grid configuration for Akka persistance , then this it , now when you build and run the application , it will start Apache ignite server node as well which will be used to store the events and snapshots in its own journals

Now in Part 4 we will go through some remarks and possible different architectures using that toolkit for event sourcing and CQRS services .

References:

  1. Part 4 :https://mromeh.com/2018/04/27/spring-boot-akka-event-sourcing-starter-part-4-final/
  2. GitHub toolkit project URL:  https://github.com/Romeh/spring-boot-akka-event-sourcing-starter
  3. Akka persistence : https://doc.akka.io/docs/akka/2.5/persistence.html
  4. Spring boot : https://projects.spring.io/spring-boot/

 

Spring Boot Akka Event Sourcing Starter – Part 2

Now we will continue in part 2 to explain the starter itself into more details :

FlowContext

The Tookit Starter Is An Abstraction For The Following :

  1. How to define persistent entity actors for your aggregate plus the integration with spring
  2. Aggregate flow DSL abstraction for how to handle commands and events plus asynchronous command actions which is not highly recommend as your aggregate should be your transaction boundary context and should own its contextual data as well
  3. Cluster sharding abstraction for your created persistent entities plus make it configurable via spring
  4. Actor system configuration and integration with spring boot

Let us go over the points mentioned above quickly then in part 3 i will share a complete working example to act as a reference about how to use the event sourcing toolkit starter

The Generic Persistent Entity Class In The Starter :

the main abstracted entity class is  PersistentEntity class : The root entity class

which extend AbstractPersistentActor from Akka persistance toolkit , it has all common and abstracted technical functionalities for how to define persistent actors and just let you focus on the business flow of your aggregate logic .

The Aggregate Flow DSL Overview :

the flow as mentioned before in part 1 , it just abstract the way of how to define a flow of commands and events handling plus the responses needed to be sent back to the sender if any , please check part 1 for more deep flow explanation , I will just cover here the technical overview quickly plus we will see concrete example for how define the flow in part 3

The flow context when command is received will be initiated with the current state before handling the command and then based into the defined flow in your aggregate , the command or event will be handled accordingly .

The flow context overview :

Screen Shot 2018-04-23 at 12.49.14.png

then the actions will be generated from flow command handlers or event handlers based into the class diagram above are :

  1. Persist one generated event then do post action if any
  2. Persist list of generated events then do post action if any
  3. No generated events to persist and just do post action if any (read only commands case)

Screen Shot 2018-04-23 at 12.48.52.png

The cluster Sharding abstraction into the toolkit starter :

the 2 main classes that abstract cluster sharding and do the spring integration are  PersistentEntitySharding and PersistentEntityBroker , you can check the code for more details .

  • PersistentEntitySharding abstract the cluster sharding creation per Entity based into its configuration via the starter : code reference
  • PersistentEntityBroker is the broker to get the persistent entity shard in a abstracted way from your calling service by just passing the aggregate class type

Screen Shot 2018-04-23 at 12.57.50

The actor system integration with spring configuration and how it can be configured via spring configuration reference :

There is a specific interface need to be extended when you need to configure your entity bean which is PersistentEntityProperties where you can configure many properties as being shown into the class diagram below , in part 3  we will see a concrete example of its usage :

Screen Shot 2018-04-23 at 13.07.47.png

  1. snspshotStateAfter property is to snapshot the state after specific configured periodic time
  2. entityPassivateAfter to passivate the entity actor after configured time
  3. tags is the event tags used to tag events before storing them into the event store
  4. numberOfshards : configuration of the number of cluster shards for this entity
  5. persistentIdPostfix and PersistenceIdProfix : used for the unique aggregate Id within the shard .
  6. asyncPersistentEntityDispatcherName :  the name of the configure Akka dispatcher to be use for the Async actions within the aggregate actor
  7. pipeDispatcherName : the configured Akka dispatcher name to be used for pipe communication within the aggregate actor .
  8. scheduledAsyncEntityActionTimeout : the configured timeout for Async action response waiting to not block the actor forever

Then last point is the Spring configuration to reference your Akka actor system configuration to be used with the toolkit spring boot configuration initialization , can be done like the following :

The configuration need to be prefixed with (spring.akka) , where you need to provide the location if your actor system configuration file and the name of your actor system , In Part 3 we will go through detailed Order manager application sample based into the explained toolkit starter.

// we will see detailed reference for that in part 3 with the working example
spring.akka.config: eventSourcing.conf
spring.akka.system-name: orderManagerSyste

References:

  1. Part 3 :https://mromeh.com/2018/04/27/spring-boot-akka-event-sourcing-starter-part-3-the-working-example/
  2. GitHub Toolkit and Order Manager sample code project URL:  https://github.com/Romeh/spring-boot-akka-event-sourcing-starter
  3. Akka persistence : https://doc.akka.io/docs/akka/2.5/persistence.html
  4. Spring boot : https://projects.spring.io/spring-boot/

 

Spring Boot Akka Event Sourcing Starter – Part 1

Here I am going to share a custom toolkit wrapped as a spring boot with AKKA persistence starter to act as a read made toolkit for event driven asynchronous non blocking flow API ,  event sourcing and CQRS implementation within spring boot services which can be part of spring cloud micro-services infrastructure , we will cover the following :

  1. Overview of the toolkit for DDD, event sourcing and CQRS implementation
  2. The integration between Akka persistance and spring boot via a starter implementation with a lot of abstraction for , abstract entity aggregate, cluster sharding , integration testing  and flow definition
  3. A working application example that show case how it can be used
  4. Summary of possible designs
  5. What is next and special remarks

The Overview :

Before going through the toolkit implementation , you need just to go through domain driven design , event sourcing and CQRS principles , here one good URL that can help you to get a nice overview to understand the pros and cons of that design and when you need it and when it is not :

Instead of implementing those patterns from scratch , I have decided to use Akka persistence to apply the core principles of event sourcing plus my layer above to abstract how to define your aggregate with its command and event handling flow .

Within the toolkit , the Aggregate command and flow handling will be as the following :

Aggregate flow(3).png

The flow definition API is as the following :

  • There are state changing command handlers flow definition which match command class type to a specific command handler
  • There are event handlers that match event class type to an event handler which will do the related logic of that event triggering
  • there are read ONLY command handlers which does not change the state of the aggregate entity , it can be used for query actions or other actions that does not mutate the entity state by appending new events

So the flow API different semantic branches are :

  1. If Command message is received
    • if the command is transnational ?
      1. Get the related command handler for that command type based into the flow API definition for that aggregate and the related current flow context with the current aggregate state
      2. Execute the command handler logic which will trigger one of the following 2 cases :
        • single event to be persisted then any configurable post action to be executed after persisting the event to the event store like post processing and sending back response to the sender
        • List of events to be persisted  then any configurable post action to be executed after persisting the event to the event store like post processing and sending back response to the sender
    • if the command is read ONLY ?
      • Just execute the configurable command handler for it based into the flow API definition for that aggregate and the related current flow context with the current aggregate state  then execute any configurable post processing actions
  2. If Event message is received
    • Get the related event handler based into the  defined flow for the aggregate then execute it against the current flow context and aggregate state
  3. if Stop message is received
    • it will trigger a safe stop flow for the aggregate entity actor
  4. If Receive time-out is message received
    • it will be received when there is ASYNC flow executed for a command and the waiting for response mode is of the aggregate entity actor is timed-out to avoid blocking the actor for long time which which can cause starvation and performance issues

Now in Part 2 we will cover the spring boot Akka event sourcing starter details which will cover the following for you :

  1. Smooth integration between Akka Persistance and Spring Boot
  2. Generic DSL for the aggregate flow definition for commands and events
  3. Abstract Aggregate persistent entity actor with all common logic in place and which can be used with the concrete managed spring beans implementation of different aggregate entities
  4. Abstract cluster sharding run-time configuration and access via spring boot custom configuration and a generic entity broker that abstract the cluster shading implementation for you

References :

Akka Persistence with Apache ignite

In this post we will share a starting project to use Apache ignite data grid an event and snapshot store to mix the benefits of the event sourcing and the data grid .

The implementation is based into the Journal plugin TCK specs provided by Akka persistence.

This is mainly using Apache ignite with akka persistence to provide journal and snapshot store by using the partitioned caches and benefit from the distributed highly available data grid features plus the nice query and data computations features in Ignite that can be used to have normalized views from the event store and do analytical jobs over them despite it is advised to keep write nodes separate from read nodes for better scalability.

Final

 

Akka and Ignite used versions:

Akka version :2.5.7+ , Ignite Version :2.3.0+

Journal plugin

  • All operations required by the Akka Persistence journal plugin API are fully supported.
  • It use apache ignite partitioned cache with default number of backups to 1 , that can be changed into reference.conf file.

Snapshot store plugin

How to use

Enable the plugins into your akka cluster configuration:

akka.persistence.journal.plugin = "akka.persistence.journal.ignite"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot.ignite"

Configure Ignite data grid properties , default configured on localhost.

ignite {
  //to start client or server node to connect to Ignite data cluster 
  isClientNode = false
  // for ONLY testing we use localhost
  // used for grid cluster connectivity
  tcpDiscoveryAddresses = "localhost"
  metricsLogFrequency = 0
  // thread pools used by Ignite , should based into target machine specs
  queryThreadPoolSize = 4
  dataStreamerThreadPoolSize = 1
  managementThreadPoolSize = 2
  publicThreadPoolSize = 4
  systemThreadPoolSize = 2
  rebalanceThreadPoolSize = 1
  asyncCallbackPoolSize = 4
  peerClassLoadingEnabled = false
  // to enable or disable durable memory persistance
  enableFilePersistence = true
  // used for grid cluster connectivity, change it to suit your configuration 
  igniteConnectorPort = 11211
  // used for grid cluster connectivity , change it to suit your configuration 
  igniteServerPortRange = "47500..47509"
  //durable memory persistance storage file system path , change it to suit your configuration 
  ignitePersistenceFilePath = "./data"
}

 

and you will have ignite enabled as your journal and snapshot plugins , you can enable it by starting server node or client based into the configuration  above .

Technical details :

the main journal implementation is IgniteWriteJournal :

the main snapshot implementation class is IgniteSnapshotStore  :

For more details feel free to dive into the code based , it is a small code base for now !.

Summary :