8. Akka Projection - case study

2021-09-27 linkshortener scala akka

Motivation

In the one of the previous posts I described how to use akka persistence on the example of linkshortener service. It explains how persistence actor works and how related are Event, Command and State. However, the post covers only write stack of the akka actor system. It doesn’t describe how to read data. Events Journal keeps Events as binary data, so you can’t easily read them. The goal of this post is to explain how to build read stack with Akka Projection. To do this I will create view database tables and fill them with data.

All attached snippets are part of linkshortener project from my github. Project sources can change with time, so I tagged current version with v1.0.1 tag. You can always switch to it using git checkout v1.0.1 command.

Write and Read

Akka Projection is project which allows you to read Events from Events Journal and pass them to Event Handler. I use Event Handler to save data in view database tables. I created two view tables: short_links and short_link_clicks. Following schema shows relations between Akka Persistence (write stack) and Akka Projection (read stack).

Akka Diagram

Persistence Actor receives Commands. Commands are processed sequentially: one at time. Actor Command Handler persists Events in Events Journal basing on Command data and its current State. Actor State is calculated by applying all Events: one by one (by Actor Event Handler). Projection reads Events from Events Journal and pass them to Projection Events Handler. Projection Events Handler saves data to view tables. Data from view tables feeds application views.

Implementation

Implementation of ShortLinkProjection can be divided into three main components:

  • SourceProvider - it purpose is pull Events from Events Journal,
  • JdbSessionFactory - it purpose is manage the database connections (and transactions),
  • EventHandler - it purpose is to process Projection events using database connection from JdbSessionFactory.

The following snippet shows implementation of ShortLinkProjection class.

class ShortLinkProjection @Inject()(actorSystem: ActorSystem, jdbcSessionFactory: JdbcSessionFactory){

  //omitted lines

  private val projection: ExactlyOnceProjection[Offset, EventEnvelope[ShortLink.Event]] =
    JdbcProjection.exactlyOnce(
        projectionId = ProjectionId(ShortLink.entityType, "postgres"),
        sourceProvider,
        () => jdbcSessionFactory.create(),
        handler = () => new EventHandler())(actorSystem.toTyped)

  actorSystem.spawn(ProjectionBehavior(projection), projection.projectionId.id)
}

I use ExactlyOnceProjection created with JdbcProjection#exactlyOnce method. This projection type handles each Event in new transaction. You can also process multiple Events in single transaction using JdbcProjection#atLeastOnce or JdbcProjection#groupedWithin methods. For more details check documentation.

JdbcProjection means I keep Offset in relational database and use JDBC to process Events in Events Handler. Mentioned Offset is just id (value of ordering column from event_journal table) of last processing Event by given Projection. Depending on Events Handler purpose you may need to use different projection type, for example CassandraProjection.

Projection has id which consists of name and key. I use entity type as name and postgres string as key value. In this way I can define different projections for the same Source Provider, for example if I would like to independently save data to view tables and produce Events to Kafka topic I can use two Projections with the same name (entity type) and different key (appropriately postgres and kafka).

In the last line of above snippet, I create instance of defined projection. To do this I use ProjectionBehavior class from Akka Projection. Projection actor should be created on system start so I configured ShortLinkProjection as Eager Singleton by adding new Play Framework module - ProjectionModule.

class ProjectionModule extends AbstractModule with Logging {

  override def configure(): Unit = {
    logger.info("ProjectionModule starts.")
    bind(classOf[ShortLinkProjection]).asEagerSingleton()
  }
}

Small change in configuration is also required.

play.modules.enabled += "projection.ProjectionModule"

SourceProvider

private val sourceProvider: SourceProvider[Offset, EventEnvelope[ShortLink.Event]] =
  EventSourcedProvider
    .eventsByTag[ShortLink.Event](actorSystem.toTyped, readJournalPluginId = JdbcReadJournal.Identifier, tag = ShortLink.entityType)

First we need to define SourceProvider. Source Provider defines which Events are needed and which Journal plugin is used. I simply use Akka Projection class EventSourcedProvider and its eventsByTag method. Events source is Events Journal. Be aware Events source doesn’t have to be Akka Persistence, for example it could also be Kafka or you can define your own Source Provider.

JdbcSessionFactory

Factory method of akka.projection.jdbc.JdbcSession is one of JdbcProjection#exactlyOnce arguments. Purpose of JdbcSession is to manage connection in scope of single transaction. First I prepare HikariConnectionPool class to define connection pool. Pool size is equal to Akka Projection property akka.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size. This is recommendation from documentation:

JDBC APIs are blocking by design, therefore Akka Projections JDBC will use a dedicated dispatcher to run all JDBC calls. It’s important to configure the dispatcher to have the same size as the connection pool.

Each time the projection handler is called one thread and one database connection will be used. If your connection pool is smaller than the number of threads, the thread can potentially block while waiting for the connection pool to provide a connection.

@Singleton
class HikariConnectionPool @Inject()(config: Config){

  private val hConfig = new HikariConfig
  private val poolSize = config.getInt("akka.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size")

  hConfig.setDriverClassName(config.getString("linkshortener.db.driver"))
  hConfig.setJdbcUrl(config.getString("linkshortener.db.url"))
  hConfig.setUsername(config.getString("linkshortener.db.username"))
  hConfig.setPassword(config.getString("linkshortener.db.password"))
  hConfig.setAutoCommit(false)
  hConfig.setMinimumIdle(poolSize)
  hConfig.setMaximumPoolSize(poolSize)

  private val hDataSource = new HikariDataSource(hConfig)

  def getConnection: Connection = hDataSource.getConnection
}

JdbcSessionFactory is as simple as possible:

@Singleton
class JdbcSessionFactory @Inject()(projectionDataSource: HikariConnectionPool){
  def create() = new JdbcSession(projectionDataSource.getConnection)
}

object JdbcSessionFactory {

  class JdbcSession(connection: Connection) extends akka.projection.jdbc.JdbcSession {
    override def withConnection[Result](func: function.Function[Connection, Result]): Result = func(connection)
    override def commit(): Unit = connection.commit()
    override def rollback(): Unit = connection.rollback()
    override def close(): Unit = connection.close()
  }
}

Event Handler

Event Handler purpose is to process Projection Events. In my case it is set of simple database inserts. I additionally use Anorm to simplify syntax, but it can be regular JDBC calls.

class EventHandler() extends JdbcHandler[EventEnvelope[ShortLink.Event], JdbcSessionFactory.JdbcSession] with Logging {

    override def process(session: JdbcSessionFactory.JdbcSession, envelope: EventEnvelope[ShortLink.Event]): Unit = {
      logger.info(s"ShortLinkProjection receives ${envelope.event}.")
      envelope.event match {
        case e: ShortLink.Events.Created =>
          session.withConnection(implicit conn => {
            SQL"""
              insert into short_links(short_link_id, short_link_domain, short_link_url, original_link_url, tags, created_timestamp)
              values (${e.shortLinkId}, ${e.shortLinkDomain}, ${e.shortLinkUrl}, ${e.originalLinkUrl}, ${e.tags.mkString(",")}, ${e.timestamp})
            """.executeInsert()
          })
        case e: ShortLink.Events.Clicked =>
          session.withConnection(implicit conn => {
            SQL"""
              insert into short_link_clicks(short_link_id, user_agent_header, x_forwarded_for_header, created_timestamp)
              values (${e.shortLinkId}, ${e.userAgentHeader}, ${e.xForwardedForHeader}, ${e.timestamp})
            """.executeInsert()
          })
        case other =>
          logger.info(s"ShortLinkProjection receives $other.")
      }
    }
  }

Summary

I described how to use Akka Projection with on an example of linkshortener service. Akka Projection is used to process ShortLink entity Events and save data to view tables: short_links and short_link_clicks. These tables can be used to feed the application view with data. If you want to learn more about Akka Projection or learn how to use it with different solutions like Cassandra or Kafka check documentation.