Motivation
On the last time I gave a chance to EventSourcedBehaviorTestKit
to test my application. Unexpectedly
one of my test failed with timeout error, although application seemed to work fine.
java.lang.AssertionError: Timeout (3 seconds) during receiveMessage while waiting for message
Problem
For purpose of this post I have prepared simple project which
illustrates the problem. SampleActor
bases on EventSourcedBehavior
with the following commandHandler:
override def applyCommand(cmd: Command)(implicit context: ActorContext[Command]): ReplyEffect[Event, State] = cmd match {
case TestMe(TestCase.TEST_CASE_1, replyTo) =>
Effect.reply(replyTo)(TestSuccess)
case TestMe(TestCase.TEST_CASE_2, replyTo) =>
Effect.stop()
.thenReply(replyTo)(_ => TestSuccess)
case c =>
context.log.warn("{}[id={}, state=Empty] received unknown command[{}].", entityType, id, c)
Effect.stop()
.thenNoReply()
}
Pay attention to two first case
blocks. In the second case Effect.stop()
is called before thenReply(replyTo)(_ => TestSuccess)
.
My test class looks like follows:
import SampleActor.TestCase
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.must.Matchers.convertToAnyMustWrapper
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{BeforeAndAfterEach, GivenWhenThen}
class SampleActorSpec extends ScalaTestWithActorTestKit(ConfigFactory.parseString("akka.actor.allow-java-serialization = true").withFallback(EventSourcedBehaviorTestKit.config)) with AnyWordSpecLike with BeforeAndAfterEach with GivenWhenThen {
private val eventSourcedTestKit = EventSourcedBehaviorTestKit[SampleActor.Command, SampleActor.Event, SampleActor.State](
system, SampleActor("testId")
)
override protected def beforeEach(): Unit = {
super.beforeEach()
eventSourcedTestKit.clear()
}
"SampleActor#TestMe" should {
"return TestSuccess for test case TEST_CASE_1" in {
Given("empty SampleActor")
// do nothing - declared above
When("TestMe message with testCase=TEST_CASE_1 is send")
val result = eventSourcedTestKit.runCommand(ref => SampleActor.Commands.TestMe(TestCase.TEST_CASE_1, ref))
Then("actor replies with TestSuccess")
result.reply mustBe theSameInstanceAs(SampleActor.Commands.TestMe.Results.TestSuccess)
}
"return TestSuccess for test case TEST_CASE_2 (this test fails)" in {
Given("empty SampleActor")
// do nothing - declared above
When("TestMe message with testCase=TEST_CASE_2 is send")
val result = eventSourcedTestKit.runCommand(ref => SampleActor.Commands.TestMe(TestCase.TEST_CASE_2, ref))
Then("actor replies with TestSuccess")
result.reply mustBe theSameInstanceAs(SampleActor.Commands.TestMe.Results.TestSuccess)
}
}
}
The second test fails with error:
- should return TestSuccess for test case TEST_CASE_2 (this test fails) ***
[info] java.lang.AssertionError: Timeout (3 seconds) during receiveMessage while waiting for message.
[info] at akka.actor.testkit.typed.internal.TestProbeImpl.assertFail(TestProbeImpl.scala:399)
[info] at akka.actor.testkit.typed.internal.TestProbeImpl.$anonfun$receiveMessage_internal$1(TestProbeImpl.scala:182)
[info] at scala.Option.getOrElse(Option.scala:201)
[info] at akka.actor.testkit.typed.internal.TestProbeImpl.receiveMessage_internal(TestProbeImpl.scala:182)
[info] at akka.actor.testkit.typed.internal.TestProbeImpl.receiveMessage(TestProbeImpl.scala:175)
[info] at akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl.getState(EventSourcedBehaviorTestKitImpl.scala:178)
[info] at akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl.runCommand(EventSourcedBehaviorTestKitImpl.scala:149)
[info] at SampleActorSpec.$anonfun$new$4(SampleActorSpec.scala:41)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
Effects and Behaviors
At the beginning I suspected that in TEST_CASE_2
SampleActor is stopped before reply message is send. So far
I was sure that code:
Effect.stop().thenReply(replyTo)(_ => TestSuccess)
sends TestSuccess
message first and then actor stopped, not vice versa (stopped actor can’t send message, so it might explain the problem).
I didn’t find confirmation of it in akka documentation,
so I took look into the code.
All effects are keep in CompositeEffect
case class:
private[akka] final case class CompositeEffect[Event, State](
persistingEffect: scaladsl.EffectBuilder[Event, State],
_sideEffects: immutable.Seq[SideEffect[State]])
extends EffectImpl[Event, State] {
override val events: immutable.Seq[Event] = persistingEffect.events
override def toString: String =
s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})"
}
CompositeEffect
case class consists of two fields: persistingEffect which has sequence of events to persist
and sideEffects which is a sequence of other effects like sending message or stopping actor. Let’s check how it works
in our case of Effect.stop().thenReply(replyTo)(_ => TestSuccess)
. We don’t have events to persist,
persistingEffect.events
is Nil
, persistingEffect
field is type of PersistNothing
and _sideEffects
field
is type of Stop
:
def stop[Event, State](): EffectBuilder[Event, State] =
none.thenStop()
def none[Event, State]: EffectBuilder[Event, State] =
PersistNothing.asInstanceOf[EffectBuilder[Event, State]]
private[akka] case object PersistNothing extends EffectImpl[Nothing, Nothing]
private[akka] abstract class EffectImpl[+Event, State]
extends javadsl.EffectBuilder[Event, State]
with javadsl.ReplyEffect[Event, State]
with scaladsl.ReplyEffect[Event, State]
with scaladsl.EffectBuilder[Event, State] {
/* All events that will be persisted in this effect */
override def events: immutable.Seq[Event] = Nil //PersistNothing will inherit this value
//omitted code
}
override def thenStop(): EffectImpl[Event, State] =
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
The question is how these effects are handled. Let’s come back to not-persistent akka actor for a moment. Regular actor is defined
by Behavior
class. In simplification Behavior
is simple commandHandler which returns new Behavior
(if you don’t need to change current Behavior
you
can use Behaviors.same
construct). CommandHandler of persistent actor which is build on EventSourcedBehavior
returns Effect
not Behavior
. But underneath effects are translated to Behavior
.
effect match {
//omitted code
case _: PersistNothing.type =>
(applySideEffects(sideEffects, state), true)
//omitted code
}
def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S]): Behavior[InternalProtocol] = {
var behavior: Behavior[InternalProtocol] = new HandlingCommands(state)
val it = effects.iterator
while (it.hasNext) {
val effect = it.next()
behavior = applySideEffect(effect, state, behavior)
}
if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)
Behaviors.stopped
else
behavior
}
All effects are applied one by one to retrieve result Behavior
. On the following listing you will find that Stop
effect is translated to Behaviors.stopped
and ReplyEffect
which is type of Callback
is just executed:
def applySideEffect(
effect: SideEffect[S],
state: RunningState[S],
behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
effect match {
case _: Stop.type @unchecked =>
Behaviors.stopped
case _: UnstashAll.type @unchecked =>
unstashAll()
behavior
case callback: Callback[_] =>
callback.sideEffect(state.state)
behavior
case _ =>
throw new IllegalArgumentException(s"Unsupported side effect detected [${effect.getClass.getName}]")
}
}
Above sideEffect
call is just regular function call.
private[akka] class Callback[State](val sideEffect: State => Unit) extends SideEffect[State] {
override def toString: String = "Callback"
}
The following code shows that thenReply
method returns ReplyEffectImpl
which is a Callback
type:
override def thenReply[ReplyMessage](replyTo: ActorRef[ReplyMessage])(
replyWithMessage: State => ReplyMessage): EffectImpl[Event, State] =
CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](replyTo, replyWithMessage))
final private[akka] class ReplyEffectImpl[ReplyMessage, State](
replyTo: ActorRef[ReplyMessage],
replyWithMessage: State => ReplyMessage)
extends Callback[State](state => replyTo ! replyWithMessage(state)) {
override def toString: String = "Reply"
}
To sum up Effect.stop().thenReply(replyTo)(_ => TestSuccess)
executes side effect (sends the message) first, and
then it returns Behavior.stopped
which will eventually stop the actor.
Described analysis is very educational but not explain why my test fails.
Read the logs!
If you look closer at the listing at the beginning of this post, you find these two lines:
[info] at akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl.getState(EventSourcedBehaviorTestKitImpl.scala:178)
[info] at akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl.runCommand(EventSourcedBehaviorTestKitImpl.scala:149)
and in test execution logs there is one more:
11:24:43.748 [SampleActorSpec-akka.actor.default-dispatcher-7] INFO akka.actor.LocalActorRef - Message [akka.persistence.typed.internal.EventSourcedBehaviorImpl$GetState] to Actor[akka://SampleActorSpec/system/test/$c#-569774325] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://SampleActorSpec/system/test/$c#-569774325] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
The logs point that problem is related to EventSourcedBehaviorTestKitImpl#runCommand
. After small investigation it
found out that EventSourcedBehaviorTestKitImpl#runCommand
always sends two messages to tested actor. It first sends argument command
and then EventSourcedBehaviorTestKit.GetState
message. In the following listing getState
method uses internalActor
reference which is just upcast reference to tested actor.
override def runCommand(command: Command): CommandResult[Command, Event, State] = {
preCommandCheck(command)
val seqNrBefore = getHighestSeqNr()
actor ! command //sends TestMe command
val newState = getState() //sends GetState command to already stopped actor
val newEvents = getEvents(seqNrBefore + 1)
postCommandCheck(newEvents, newState, reply = None)
CommandResultImpl[Command, Event, State, Nothing](command, newEvents, newState, None)
}
override def getState(): State = {
internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
stateProbe.receiveMessage()
}
private var actor: ActorRef[Command] = actorTestKit.spawn(behavior)
private def internalActor = actor.unsafeUpcast[Any]
To sum up SampleActor
behaves correctly, but EventSourcedBehaviorTestKitImpl#runCommand
tries to send GetState
message
to the just stopped actor.
Fix tests
To fix tests you can use lower level classes like ActorTestKit
instead of EventSourcedBehaviorTestKitImpl
. Sample implementation can look as follows:
"return TestSuccess for test case TEST_CASE_2" in {
Given("empty SampleActor")
val actorTestKit = ActorTestKit(system)
val ref = actorTestKit.spawn(SampleActor("testId2"))
When("TestMe message with testCase=TEST_CASE_2 is send")
val probeRef = actorTestKit.createTestProbe[SampleActor.Commands.TestMe.Result]()
ref ! SampleActor.Commands.TestMe(TestCase.TEST_CASE_2, probeRef.ref)
Then("actor replies with TestSuccess")
probeRef.expectMessage(SampleActor.Commands.TestMe.Results.TestSuccess)
}