6. Akka persistence - case study

2021-09-04 linkshortener scala akka

Introduction

In the previous post I described linkshortener service concept. This time I am describing how to implement it using akka-persistence. We are dealing with very simple service, so there is only one persistence actor - ShortLink. All attached snippets are part of linkshortener project from my github. Project sources can change with time, so I tagged current version with v1.0.0 tag. You can always switch to it using git checkout v1.0.0 command.

Persistence actor

Persistence actor in akka is defined by three sets: Commands, Events and States. Actor receives Commands. Commands are processing sequentially: one at a time. Command processing can result with Event persisting. Events are stored in Events Journal. Actor State is calculated by applying all Events one by one.

Persistence actor - Diagram

Let’s define sample actor A with:

  • Commands: C1, C2,
  • Events: E1, E2,
  • States: S1, S2.

Good way to illustrate actor A States transitions is Mealy machine. Assuming Commands are inputs and Events are outputs:

Persistence actor - Mealy machine

At the beginning actor A has S1 state. When it receives C1 command it persists E1 event and changes state from S1 to S2. If it receives C2 in S1 state nothing happens - actor keeps S1 state and an event isn’t persisted. If actor has S2 state it persists E2 event on C1 command without state change. In case of C1 command nothing happens.

We need to define Commands, Events and States of ShortLink actor. For reference linkshortener service consists of two endpoints:

  • private endpoint to create short link from original link url,
  • public endpoint to redirect from short link url to the original link url.

Commands

  • Create - contains information about original link url to which ShortLink should be redirected. It is sent on private endpoint call when new short link url is creating.
  • Click - asks ShortLink for original link url. It is sent on public endpoint call when short link url is clicked.

The following listing shows Commands implementation in scala. Every Command has companion object which defines Result trait. For example ShortLink actor has two possible responses for Create Command. It replies with Created result if actor hasn’t yet persisted Created Event or it replies with AlreadyExists result if actor has already persisted Created Event. There is also one extra command: Stop, which I will describe later in section Make sure actor will be stopped.

sealed trait Command extends CborSerializable
object Commands {
  case class Create(originalLinkUrl: String, replyTo: ActorRef[Create.Result]) extends Command
  case object Create {
    sealed trait Result extends CborSerializable
    object Results {
      case class Created(shortLinkId: String, shortLinkUrl: String) extends Result
      case object AlreadyExists extends Result
    }
  }
  case class Click(userAgentHeader: Option[String], xForwardedForHeader: Option[String], replyTo: ActorRef[Click.Result]) extends Command
  case object Click {
    sealed trait Result extends CborSerializable
    object Results {
      case class RedirectTo(originalLinkUrl: String) extends Result
      case object NotFound extends Result
    }
  }
  case object Stop extends Command
}

Events

  • Created - it is persisted on Create command. It keeps information about original link url.
  • Clicked - it is persisted on Click command. It keeps information about client X-Forwarded-For and User-Agent headers.

The following listing shows Events object definition in scala.

sealed trait Event extends CborSerializable
object Events {
  case class Created(shortLinkId: String, shortLinkDomain: String, shortLinkUrl: String, originalLinkUrl: String, timestamp: Instant = Instant.now()) extends Event
  case class Clicked(shortLinkId: String, userAgentHeader: Option[String], xForwardedForHeader: Option[String], timestamp: Instant = Instant.now()) extends Event
}

States

ShortLink actor can be in two states:

  • EmptyShortLink - in akka there is no situation that you don’t get actor reference because actor doesn’t exist. It is more correct to think that actor has always existed but at the beginning it has no events - it is in Empty state. In this state ShortLink actor is like empty box - it has only id (PersistenceId), there is no information about original link yet.
  • ShortLink - actor state when it receives Create command. It has information about original link.

In this section I am focusing on description of main business cases which are related to handling of Create and Click Commands. Handling of Stop Command is described in section Make sure actor will be stopped.

Empty state

ShortLink actor in Empty state waits for Create command. When it is received actor creates new short link url basing on its id, persists Created Event containing all needed original link data and replies to Command sender with Created Result. Applying Created Event changes actor state to second state - Active. If actor in Empty state receives Click command it replies to Command sender with NotFound result and stops itself without persisting any Event.

case class EmptyState(id: String, shortLinkDomain: String) extends State {

  override def snapshot: Snapshot = throw new IllegalStateException(s"EmptyShortLink[$id] has not approved state yet.")

  override def applyCommand(cmd: Command)(implicit context: ActorContext[Command]): ReplyEffect[Event, State] = cmd match {
    case c: Create =>
      val url = s"$shortLinkDomain${controllers.routes.ShortLinkController.getShortLink(id).url}"
      Effect.persist(Events.Created(id, shortLinkDomain, url, c.originalLinkUrl))
        .thenReply(c.replyTo)(_ => Create.Results.Created(id, url))
    case c: Click =>
      Effect.stop()
        .thenReply(c.replyTo)(_ => Click.Results.NotFound)
    case Stop =>
      Effect.stop()
        .thenNoReply()
    case c =>
      context.log.warn("{}[id={}, state=Empty] received unknown command[{}].", entityType, id, c)
      Effect.stop()
        .thenNoReply()
  }

  override def applyEvent(state: State, event: Event)(implicit context: ActorContext[Command]): State = event match {
    case e: Created =>
      ActiveState(e.shortLinkId, Snapshot(id, e.shortLinkDomain, e.shortLinkUrl, e.originalLinkUrl), shortLinkDomain)
    case e =>
      context.log.warn(s"{}[id={}, state=Empty] received unexpected event[{}]", entityType, id, e)
      state
  }
}

Active state

ShortLink actor in Active state handles Click Command. When this Command is received actor persists Clicked Event containing clicker important data like User-Agent and X-Forwarded-For headers values and it replies to Command sender with Clicked result. Applying Clicked Event isn’t change actor state - actor stays in Active state. On Create Command actor only replies to Command sender with AlreadyExists result.

case class ActiveState(id: String, snapshot: Snapshot, shortLinkDomain: String) extends State {

  override def applyCommand(cmd: Command)(implicit context: ActorContext[Command]): ReplyEffect[Event, State] = cmd match {
    case c: Create =>
      Effect.reply(c.replyTo)(Create.Results.AlreadyExists)
    case c: Click =>
      Effect.persist(Events.Clicked(id, c.userAgentHeader, c.xForwardedForHeader))
        .thenReply(c.replyTo)(_ => Click.Results.RedirectTo(snapshot.originalLinkUrl))
    case Stop =>
      Effect.stop()
        .thenNoReply()
    case c =>
      context.log.warn("{}[id={}] unknown command[{}].", entityType, id, c)
      Effect.noReply
  }

  override def applyEvent(state: State, event: Event)(implicit context: ActorContext[Command]): State = event match {
    case _: Clicked =>
      //do nothing
      state
    case e =>
      context.log.warn(s"{}[id={}] received unexpected event[{}]", entityType, id, e)
      state
  }
}

All above considerations can be illustrated with Mealy machine:

ShortLink - Mealy machine

Take care of yours actors

Actor in comparing to standard Java (or Scala) object is not automatically removed from memory by Garbage Collector. Actor is part of actor system until you stop it. It’s why you should be aware of your actors state and manage them. I extracted this responsibility to ShortLinkFactory object. ShortLinkFactory:

  • spawns new child actors,
  • maintains the map of active child actors.

ShortLinkFactory defines two Commands:

  • GetRef - it is sent by client code to get reference of actor with given id. If actor is already started then it reference is returned. Otherwise, new actor is spawned and new reference is added to active actors set.
  • RefTerimnated - internal command which is received when any of child stops. It contains stopped child id which is used to remove stopped child from active actors set. ShortLinkFactory subscribes to this message in code line context.watchWith(ref, RefTerminated(c.id)).
object ShortLinkFactory {

  val childs: mutable.Map[String, ActorRef[ShortLink.Command]] = mutable.Map()

  sealed trait Command
  object Commands {
    case class GetRef(id: String, behavior: Behavior[ShortLink.Command], replyTo: ActorRef[GetRef.Result]) extends Command
    case class RefTerminated(id: String) extends Command

    object GetRef {
      sealed trait Result
      object Results {
        case class Ref(ref: ActorRef[ShortLink.Command]) extends Result
      }
    }
  }

  def apply(): Behavior[Command] = Behaviors.setup { implicit context =>
    Behaviors.receiveMessage {
      case c: GetRef =>
        val ref = getOrSpawn(c.id, c.behavior)
        childs.put(c.id, ref)
        context.watchWith(ref, RefTerminated(c.id))
        c.replyTo ! GetRef.Results.Ref(ref)
        Behaviors.same
      case c: RefTerminated =>
        context.log.debug(s"ShortLinkFactory removes Child[id=${c.id}].", c)
        childs.remove(c.id)
        Behaviors.same
      case c =>
        context.log.warn("ShortLinkFactory received unknown command[{}].", c)
        Behaviors.same
    }
  }

  private def getOrSpawn(id: String, behavior: Behavior[ShortLink.Command])(implicit context: ActorContext[Command]): ActorRef[ShortLink.Command] = childs.get(id) match {
    case Some(v) =>
      context.log.debug(s"Child[id={}] exists. Factory returns the one.", id)
      v
    case None =>
      context.log.debug(s"Child[id={}] not found. Factory spawns new one.", id)
      context.spawn(behavior, id)
  }
}

I also added WithShortLink trait for convenience. It ensures that ShortLinkFactory is instantiated as singleton and it delivers shortLink helper method to retrieve ShortLink actor ref without the need to send the GetRef command directly.

trait WithShortLink {
  self: Logging =>

  def actorSystem: ActorSystem

  protected val shortLinkFactory: ActorRef[ShortLinkFactory.Command] = getShortLinkFactory(actorSystem)

  def shortLink(id: String, config: Config)(implicit timeout: Timeout, scheduler: Scheduler, ec: ExecutionContext): Future[ActorRef[ShortLink.Command]] = {
    shortLinkFactory.ask(replyTo => GetRef(id, ShortLink(id, config), replyTo)).map{
      case v: GetRef.Results.Ref =>
        v.ref
      case e =>
        throw new RuntimeException(s"WithShortLink received unknown command[$e].")
    }
  }
}

object WithShortLink {
  private var shortLinkFactoryOpt: Option[ActorRef[ShortLinkFactory.Command]] = None

  private def getShortLinkFactory(actorSystem: ActorSystem): ActorRef[ShortLinkFactory.Command] = {
    if(shortLinkFactoryOpt.nonEmpty) {
      shortLinkFactoryOpt.get
    }
    else {
      val factoryRef = actorSystem.spawn(ShortLinkFactory(), "ShortLinkFactory")
      shortLinkFactoryOpt = Some(factoryRef)
      factoryRef
    }
  }

  object ShortLinkFactory {
    //ShortLinkFactory definition - see previous listing
  }
}

Make sure actor will be stopped

ShortLinkFactory singleton actor manages the list of active actors. New actor is spawned when ShortLinkFactory receives GetRef and actor with given id doesn’t already exists. But when actor is stopped? One of method to stop actor is Effect.stop call. ShortLink uses stop Effect in empty state on Click command.

case c: Click =>
  Effect.stop()
  .thenReply(c.replyTo)(_ => Click.Results.NotFound)

Most of defined command handlers doesn’t use Effect.stop call. If Commands in order: Create, Click will be sent then Effect.stop won’t be called. Why not use Effect.stop for each command? If we have many link click actions then isn’t worth to start and stop actor for each request. More reasonable is to keep actor in memory for some time and stop it when it is not used. Akka api provides setReceiveCommand with two arguments: timeout argument describes how long actor should wait for next command and message argument is Command sent to the actor when timeout is met. ReceiveTimeout is configured on actor context, for example in Behavior factory method:

def apply(id: String, config: Config): Behavior[Command] = Behaviors.setup { implicit context =>
  context.log.debug2("Starting entity actor {}[id={}]", entityType, id)
  context.setReceiveTimeout(receiveTimeout, Stop)
  EventSourcedBehavior.withEnforcedReplies[Command, Event, State](
    persistenceId(id),
    EmptyState(id, config.getString("linkshortener.shortLink.domain")),
    (state, cmd) => {
      context.log.debug("{}[id={}] receives command {}", entityType, id, cmd)
      state.applyCommand(cmd)
    },
    (state, event) => {
      context.log.debug("{}[id={}] persists event {}", entityType, id, event)
      state.applyEvent(state, event)
    }
  ).withTagger(_ => Set("linkshortener", entityType, "v1"))
}

ShortLink handles Stop command in the same manner in both states:

case Stop =>
  Effect.stop()
    .thenNoReply()

You should be aware one of downsides of described design. There is small chance that ShortLink actor receives Stop command and during processing this command it will receive other command (Click for example). As the result actor will stop and Click message won’t be processed (client code receives timeout failure) and application will return 500 http code. It can be fixed with akka Cluster Sharding or you can design your own solution. Fix of this problem is out of the scope of this post.