9. Akka Cluster Sharding - case study

2021-10-17 linkshortener scala akka

Introduction

In the previous posts I described concept of linkshortener service, how to implement it with Akka Persistence and how to prepare view tables with Akka Projection. This time I will describe how to run linkshortener service on several nodes with Akka Cluster Sharding.

All attached snippets are part of linkshortener project from my github. Project sources can change with time, so I tagged current version with v1.0.5 tag. You can always switch to it using git checkout v1.0.5 command.

From above

Cluster in Akka is understanding as group of Nodes. Every Node is an application instance. Akka introduces two useful extensions: Cluster Sharding and Cluster Singleton. Cluster Sharding gives you simply API to start actors, but in contrast to regular API it arranges instances among all Nodes. ClusterSingleton starts actor as singleton in scope of whole Cluster. If Node on which Cluster Singleton lives gets down, then it will be recreated on other Node.

Cluster Sharding extension introduces concept of Entity. Entites are grouped into Shards and Shards are managed by ShardRegions. On every Node is only one ShardRegion. If Node gets down its Shards will be moved to otherNodes. Statuses of Nodes are monitored by Shard Coordinator (which is Cluster Singleton).

In linkshortener service Entity is ShortLink actor and Cluster Singleton is ShortLinkProjection.

Akka Diagram

Implementation

ShortLinkSharding

ShortLinkSharding is wrapper of ShardRegion. Reference to ShardRegion is obtained by ClusterSharding#init method and ClusterSharding is simple ActorSystem wrapper. To get a reference to ShortLink Entity you simply need to call entityForFor method. Everything can be done in few lines of code.

@Singleton
class ShortLinkSharding @Inject()(actorSystem: ActorSystem, config: Config) {

  private val sharding: ClusterSharding = ClusterSharding(actorSystem.toTyped)
  private val entityTypeKey: EntityTypeKey[ShortLink.Command] = ShortLink.TypeKey

  sharding.init(Entity(entityTypeKey) { entityContext =>
    ShortLink(entityContext.entityId, entityContext.shard, config)
  })

  def entityRefFor(id: String): EntityRef[ShortLink.Command] = {
    sharding.entityRefFor(entityTypeKey, id)
  }
}

ShortLinkProjection

ShortLinkProjection described in previous post is run as ClusterSingleton. It is because projection Event Handler is not idempotent (all nodes share database), so we need to be sure that every Event will be processed exactly once on single node. ClusterSingleton, similar to ClusterSharding, is simple ActorSystem extension and can be used with few lines of code.

@Singleton
class ShortLinkProjection @Inject()(actorSystem: ActorSystem, jdbcSessionFactory: JdbcSessionFactory){

  private val sourceProvider: SourceProvider[Offset, EventEnvelope[ShortLink.Event]] =
    EventSourcedProvider
      .eventsByTag[ShortLink.Event](actorSystem.toTyped, readJournalPluginId = JdbcReadJournal.Identifier, tag = ShortLink.TypeKey.name)

  private val projection: ExactlyOnceProjection[Offset, EventEnvelope[ShortLink.Event]] =
    JdbcProjection.exactlyOnce(
        projectionId = ProjectionId(ShortLink.TypeKey.name, "postgres"),
        sourceProvider,
        () => jdbcSessionFactory.create(),
        handler = () => new EventHandler()
    )(actorSystem.toTyped)

  ClusterSingleton(actorSystem.toTyped).init(SingletonActor(
    Behaviors.supervise(ProjectionBehavior(projection)).onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.2)),
    projection.projectionId.id
  ))
}

If your system persists big amounts of Events and system performance is an issue, you may be interested in Sharded Deamon Process.

Stop actor better

In the post about Akka Persistence I described goal of ShortLinkFactory actor which is also their manager. It keeps reference to all active actors and watch them. When actor stops then ShortLinkFactory removes its reference from active actors list. I also mentioned downside of this solution. There is small chance that ShortLink actor receives Stop command and during processing this command it will receive other command (Click for example). As the result actor will stop and Click message won’t be processed (client code receives timeout failure) and application will return 500 http code.

ClusterSharding introduces solutions which resolves this problem. Actor just need to send ClusterSharding.Passivate(context.self) to shard reference. After receiving this message Shard will stop actor. Messages which comes later will be buffered and delivered to new actor incarnation. You must be aware that only messages arrives through Shard (EntityRef) will be buffered, messages send directly to actor through ActorRef still can be lost.

case Passivate =>
  shard ! ClusterSharding.Passivate(context.self)
  Effect.noReply

Linkshortener service cases are very simple. Every request can be processed without the need to call external systems API and it is acceptable if request ends with 500 http code. But imagine operation which takes times which can’t be just interrupted (for example payment transfer). If this operation is triggered by API, then system probably only accepts request (for example returns 202 http code) and process it asynchronously. If asynchronously processing crash it needs to be resumed when system will be back. To achieve this Remembering Entitiescan be used. With Remembering Entities all active (non-passivated) actors will be restarted on system start. If Remembering Entities won’t be used actor will be restarted only on next message arrivals. Details of Remembering Entities are out the scope of this post, if you are looking for more information check documentation.

Configuration

I split configuration into two files. First file is basic configuration which need to be included on every node. Properties from second file are node-specific and must be set directly on every node.

In first file pay attention for rebalance-absolute-limit property which enables new (and recommended) rebalance algorithm introduces in Akka 2.6.10.

akka {
  actor {
    provider = "cluster"
  }

  cluster {
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"

    sharding {
        number-of-shards = 50
        least-shard-allocation-strategy.rebalance-absolute-limit = 10
    }
  }
}

Second file configures node address and addresses of seed-nodes.

akka {
  remote.artery {
    canonical {
      hostname = "linkshortener-1.mynet"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka://application@linkshortener-1.mynet:2551",
      "akka://application@linkshortener-2.mynet:2551",
      "akka://application@linkshortener-3.mynet:2551"
    ]
  }
}

Be aware in order to start cluster first node from seed-nodes must run. It is to prevent that node start independently when communication between nodes is not possible (for example in case of network issues). When cluster will be formed first node can down and rest of cluster will still work.

Summary

I described how to use Akka Cluster Sharding extension on an example of linkshortener service. Cluster Sharding and Cluster Singleton are great tools to run application on few nodes, scaling out systems and increase failure tolerance. If you want to learn more about Cluster Sharding check documentation.