5. Short tour around akka Effect and EventSourcedBehaviorTestKit

2021-08-22 scala akka test

Motivation

On the last time I gave a chance to EventSourcedBehaviorTestKit to test my application. Unexpectedly one of my test failed with timeout error, although application seemed to work fine.

java.lang.AssertionError: Timeout (3 seconds) during receiveMessage while waiting for message

Problem

For purpose of this post I have prepared simple project which illustrates the problem. SampleActor bases on EventSourcedBehavior with the following commandHandler:

override def applyCommand(cmd: Command)(implicit context: ActorContext[Command]): ReplyEffect[Event, State] = cmd match {
  case TestMe(TestCase.TEST_CASE_1, replyTo) =>
    Effect.reply(replyTo)(TestSuccess)
  case TestMe(TestCase.TEST_CASE_2, replyTo) =>
    Effect.stop()
      .thenReply(replyTo)(_ => TestSuccess)
  case c =>
    context.log.warn("{}[id={}, state=Empty] received unknown command[{}].", entityType, id, c)
    Effect.stop()
      .thenNoReply()
}

Pay attention to two first case blocks. In the second case Effect.stop() is called before thenReply(replyTo)(_ => TestSuccess). My test class looks like follows:

import SampleActor.TestCase
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.must.Matchers.convertToAnyMustWrapper
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{BeforeAndAfterEach, GivenWhenThen}

class SampleActorSpec extends ScalaTestWithActorTestKit(ConfigFactory.parseString("akka.actor.allow-java-serialization = true").withFallback(EventSourcedBehaviorTestKit.config)) with AnyWordSpecLike with BeforeAndAfterEach with GivenWhenThen {

  private val eventSourcedTestKit = EventSourcedBehaviorTestKit[SampleActor.Command, SampleActor.Event, SampleActor.State](
    system, SampleActor("testId")
  )

  override protected def beforeEach(): Unit = {
    super.beforeEach()
    eventSourcedTestKit.clear()
  }

  "SampleActor#TestMe" should {
    "return TestSuccess for test case TEST_CASE_1" in {
      Given("empty SampleActor")
      // do nothing - declared above

      When("TestMe message with testCase=TEST_CASE_1 is send")
      val result = eventSourcedTestKit.runCommand(ref => SampleActor.Commands.TestMe(TestCase.TEST_CASE_1, ref))

      Then("actor replies with TestSuccess")
      result.reply mustBe theSameInstanceAs(SampleActor.Commands.TestMe.Results.TestSuccess)
    }
  
    "return TestSuccess for test case TEST_CASE_2 (this test fails)" in {
      Given("empty SampleActor")
      // do nothing - declared above

      When("TestMe message with testCase=TEST_CASE_2 is send")
      val result = eventSourcedTestKit.runCommand(ref => SampleActor.Commands.TestMe(TestCase.TEST_CASE_2, ref))

      Then("actor replies with TestSuccess")
      result.reply mustBe theSameInstanceAs(SampleActor.Commands.TestMe.Results.TestSuccess)
    }
  }
}

The second test fails with error:

- should return TestSuccess for test case TEST_CASE_2 (this test fails) ***
[info]   java.lang.AssertionError: Timeout (3 seconds) during receiveMessage while waiting for message.
[info]   at akka.actor.testkit.typed.internal.TestProbeImpl.assertFail(TestProbeImpl.scala:399)
[info]   at akka.actor.testkit.typed.internal.TestProbeImpl.$anonfun$receiveMessage_internal$1(TestProbeImpl.scala:182)
[info]   at scala.Option.getOrElse(Option.scala:201)
[info]   at akka.actor.testkit.typed.internal.TestProbeImpl.receiveMessage_internal(TestProbeImpl.scala:182)
[info]   at akka.actor.testkit.typed.internal.TestProbeImpl.receiveMessage(TestProbeImpl.scala:175)
[info]   at akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl.getState(EventSourcedBehaviorTestKitImpl.scala:178)
[info]   at akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl.runCommand(EventSourcedBehaviorTestKitImpl.scala:149)
[info]   at SampleActorSpec.$anonfun$new$4(SampleActorSpec.scala:41)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)

Effects and Behaviors

At the beginning I suspected that in TEST_CASE_2 SampleActor is stopped before reply message is send. So far I was sure that code:

  Effect.stop().thenReply(replyTo)(_ => TestSuccess)

sends TestSuccess message first and then actor stopped, not vice versa (stopped actor can’t send message, so it might explain the problem). I didn’t find confirmation of it in akka documentation, so I took look into the code.

All effects are keep in CompositeEffect case class:

private[akka] final case class CompositeEffect[Event, State](
    persistingEffect: scaladsl.EffectBuilder[Event, State],
    _sideEffects: immutable.Seq[SideEffect[State]])
    extends EffectImpl[Event, State] {

  override val events: immutable.Seq[Event] = persistingEffect.events

  override def toString: String =
    s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})"
}

CompositeEffect case class consists of two fields: persistingEffect which has sequence of events to persist and sideEffects which is a sequence of other effects like sending message or stopping actor. Let’s check how it works in our case of Effect.stop().thenReply(replyTo)(_ => TestSuccess). We don’t have events to persist, persistingEffect.events is Nil, persistingEffect field is type of PersistNothing and _sideEffects field is type of Stop:

def stop[Event, State](): EffectBuilder[Event, State] =
  none.thenStop()
def none[Event, State]: EffectBuilder[Event, State] =
  PersistNothing.asInstanceOf[EffectBuilder[Event, State]]
private[akka] case object PersistNothing extends EffectImpl[Nothing, Nothing]
private[akka] abstract class EffectImpl[+Event, State]
  extends javadsl.EffectBuilder[Event, State]
    with javadsl.ReplyEffect[Event, State]
    with scaladsl.ReplyEffect[Event, State]
    with scaladsl.EffectBuilder[Event, State] {
  /* All events that will be persisted in this effect */
  override def events: immutable.Seq[Event] = Nil              //PersistNothing will inherit this value

  //omitted code
}
override def thenStop(): EffectImpl[Event, State] =
    CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])

The question is how these effects are handled. Let’s come back to not-persistent akka actor for a moment. Regular actor is defined by Behavior class. In simplification Behavior is simple commandHandler which returns new Behavior (if you don’t need to change current Behavior you can use Behaviors.same construct). CommandHandler of persistent actor which is build on EventSourcedBehavior returns Effect not Behavior. But underneath effects are translated to Behavior.

effect match {
  //omitted code
  
  case _: PersistNothing.type =>
    (applySideEffects(sideEffects, state), true)
  
  //omitted code
}
def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S]): Behavior[InternalProtocol] = {
var behavior: Behavior[InternalProtocol] = new HandlingCommands(state)
val it = effects.iterator

    while (it.hasNext) {
      val effect = it.next()
      behavior = applySideEffect(effect, state, behavior)
    }

    if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)
      Behaviors.stopped
    else
      behavior
}

All effects are applied one by one to retrieve result Behavior. On the following listing you will find that Stop effect is translated to Behaviors.stopped and ReplyEffect which is type of Callback is just executed:

def applySideEffect(
  effect: SideEffect[S],
  state: RunningState[S],
  behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
    effect match {
      case _: Stop.type @unchecked =>
        Behaviors.stopped
  
      case _: UnstashAll.type @unchecked =>
        unstashAll()
        behavior
  
      case callback: Callback[_] =>
        callback.sideEffect(state.state)
        behavior
  
      case _ =>
        throw new IllegalArgumentException(s"Unsupported side effect detected [${effect.getClass.getName}]")
    }
}

Above sideEffect call is just regular function call.

private[akka] class Callback[State](val sideEffect: State => Unit) extends SideEffect[State] {
  override def toString: String = "Callback"
}

The following code shows that thenReply method returns ReplyEffectImpl which is a Callback type:

override def thenReply[ReplyMessage](replyTo: ActorRef[ReplyMessage])(
      replyWithMessage: State => ReplyMessage): EffectImpl[Event, State] =
    CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](replyTo, replyWithMessage))
final private[akka] class ReplyEffectImpl[ReplyMessage, State](
    replyTo: ActorRef[ReplyMessage],
    replyWithMessage: State => ReplyMessage)
    extends Callback[State](state => replyTo ! replyWithMessage(state)) {
  override def toString: String = "Reply"
}

To sum up Effect.stop().thenReply(replyTo)(_ => TestSuccess) executes side effect (sends the message) first, and then it returns Behavior.stopped which will eventually stop the actor.

Described analysis is very educational but not explain why my test fails.

Read the logs!

If you look closer at the listing at the beginning of this post, you find these two lines:

[info]   at akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl.getState(EventSourcedBehaviorTestKitImpl.scala:178)
[info]   at akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl.runCommand(EventSourcedBehaviorTestKitImpl.scala:149)

and in test execution logs there is one more:

11:24:43.748 [SampleActorSpec-akka.actor.default-dispatcher-7] INFO akka.actor.LocalActorRef - Message [akka.persistence.typed.internal.EventSourcedBehaviorImpl$GetState] to Actor[akka://SampleActorSpec/system/test/$c#-569774325] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://SampleActorSpec/system/test/$c#-569774325] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

The logs point that problem is related to EventSourcedBehaviorTestKitImpl#runCommand. After small investigation it found out that EventSourcedBehaviorTestKitImpl#runCommand always sends two messages to tested actor. It first sends argument command and then EventSourcedBehaviorTestKit.GetState message. In the following listing getState method uses internalActor reference which is just upcast reference to tested actor.

override def runCommand(command: Command): CommandResult[Command, Event, State] = {
  preCommandCheck(command)
  val seqNrBefore = getHighestSeqNr()
  
  actor ! command                               //sends TestMe command
  
  val newState = getState()                     //sends GetState command to already stopped actor
  val newEvents = getEvents(seqNrBefore + 1)
  
  postCommandCheck(newEvents, newState, reply = None)
  
  CommandResultImpl[Command, Event, State, Nothing](command, newEvents, newState, None)
}
override def getState(): State = {
  internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
  stateProbe.receiveMessage()
}
private var actor: ActorRef[Command] = actorTestKit.spawn(behavior)
private def internalActor = actor.unsafeUpcast[Any]

To sum up SampleActor behaves correctly, but EventSourcedBehaviorTestKitImpl#runCommand tries to send GetState message to the just stopped actor.

Fix tests

To fix tests you can use lower level classes like ActorTestKit instead of EventSourcedBehaviorTestKitImpl. Sample implementation can look as follows:

"return TestSuccess for test case TEST_CASE_2" in {
  Given("empty SampleActor")
  val actorTestKit = ActorTestKit(system)
  val ref = actorTestKit.spawn(SampleActor("testId2"))

  When("TestMe message with testCase=TEST_CASE_2 is send")
  val probeRef = actorTestKit.createTestProbe[SampleActor.Commands.TestMe.Result]()
  ref ! SampleActor.Commands.TestMe(TestCase.TEST_CASE_2, probeRef.ref)

  Then("actor replies with TestSuccess")
  probeRef.expectMessage(SampleActor.Commands.TestMe.Results.TestSuccess)
}