First steps to the Actor model with Scala and Akka

This article assumes you’ve already read: “Simplifying efficiency – The actor model”

Scala programming language has been designed with the actor model in mind and the Akka toolkit brought it to the stratosphere.

Akka can do its job both for Java and Scala, but given the amazing integration with Scala and the fact I ideally like Scala more than Java (even though I’m not that expert in it), I thought it would have been more enjoyable and intriguing to go for it.
Before we get started, mind that Scala has its own actor model implementation, but starting with Scala 2.10 it has been deprecated in favor of Akka.

This article is not meant to be a guide to Akka, but a way to go through the basics of the main features and let you evaluate what it can do for you.

Hello world

We start real quick and create our hello world actor!

class HelloWorldActor extends Actor{

  def receive = {
    case "greetings" =>
      println("hello world!");
  }
}

Let’s look into it, shall we?
After declaring the class as a descendant of Actor, we implement a “receive” closure which verifies whether the message is equal to “greetings”. If it is, then it prints “hello world”. The “Actor” suffix in the class name is just a convention of mine and is not required.

This is how we make it work:

object Main extends App {
   val actorSystem : ActorSystem = ActorSystem()
   val hwa = actorSystem.actorOf(Props[HelloWorldActor])
   hwa ! "greetings"
   Thread.sleep(1000)
   actorSystem.shutdown()
}
  • Line 2 : initialization of the actor system
  • Line 3 : this is an interesting part. We ask the actor system to instantiate an actor and obtain an ActorRef. You won’t have access to the instance itself, but to a reference that will only allow you to send messages to it, which might look a bit obnoxious at first, but you can clearly understand how this is so very necessary: if you can mess it up, you probably will. Enforcing the messaging system as the sole system of communication is for your own safety. Props is a configuration wrapper, you can read about it in the documentation. This initialization does not only create a reference for you, but also a system wide reference to it.
  • Line 4 : finally, we can send a message to the actor by using this glorious syntax
  • Line 5 : cheap and dirty. As the message is asynchronous, there’s a consistent possibility that the main thread could end before the message is handled
  • Line 6 : system shutdown

The synchronous asynchrony

As I stated in my introductory article, an actor instance is not a thread but a worker and performs one operation at a time. To demonstrate this fact, let’s modify our program like this.

//main
object Main extends App {

	 val actorSystem : ActorSystem = ActorSystem()
	 val hwa = actorSystem.actorOf(Props[HelloWorldActor])
	 for ( i <- 0 until 10)
           hwa ! "greetings "+i
	 Thread.sleep(1000)
	 actorSystem.shutdown()
}


//HelloWorldActor
class HelloWorldActor extends Actor{ def receive = {
  case message : String =>
      if(message.startsWith("greetings")){
	      Thread.sleep(new Random().nextInt(100))
	      val reply : String = message.replace("greetings","hello world")
	      println(reply);
      }
  }
}

On line 17, a random pause will introduce delays to make the whole thing more real.
Even though the messages are actually dispatched asynchronously, the worker is one and will perform one operation at a time pulling tasks from a FIFO queue where the messages are in the receive order. So all you have achieved so far is a worker that is asynchronous from the calling thread.

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9

Therefore, as you might have guessed…

object Main extends App {

	 val actorSystem : ActorSystem = ActorSystem()
	 val hwa1 = actorSystem.actorOf(Props[HelloWorldActor])
	 val hwa2 = actorSystem.actorOf(Props[HelloWorldActor])
	 for ( i <- 0 until 10) {
		 if(i%2==0)
			 hwa1 ! "greetings "+i
			else
			  hwa2 ! "greetings "+i
	 }

	 Thread.sleep(1000)
	 actorSystem.shutdown()

}

Will produce sort of an unpredictable order, since two workers will be able to perform a task at the same time.

hello world 1
hello world 0
hello world 3
hello world 2
hello world 4
hello world 5
hello world 6
hello world 8
hello world 7
hello world 9

But actors are also very good at talking to each other (it actually is what they do all the time) and we want to see it in action. Welcome the HiThereActor

class HiThereActor extends Actor{
  def receive = {
    case "hello world" =>
    	println ("hi there")
  }
}

Our main:

object Main extends App {

	 val actorSystem : ActorSystem = ActorSystem()
	 val hwa1 = actorSystem.actorOf(Props[HelloWorldActor])
	 val hwa2 = actorSystem.actorOf(Props[HelloWorldActor])

	 val hta = actorSystem.actorOf(Props[HiThereActor], "hithere")

	 for ( i <- 0 until 10) {
		 if(i%2==0)
			 hwa1 ! "greetings "+i
			else
			  hwa2 ! "greetings "+i
	 }

	 Thread.sleep(1000)
	 actorSystem.shutdown()

}

We instantiate the HiThereActor. Now, you might be asking yourself: how do I have the HelloWorldActor talk to the HiThereActor? Do I need to store its reference somewhere? You can, and in a simple application it would be considered completely legit, but let’s invest some time talking about an interesting alternative.
When I instantiated the HiThereActor I also passed a string Akka uses as a “name”. That name can be used as a fragment of a path that allows me to retrieve that reference:

class HelloWorldActor extends Actor{
  def receive = {
    case message : String =>
      if(message.startsWith("greetings")){
	      Thread.sleep(new Random().nextInt(100))
	      val reply : String = message.replace("greetings","hello world")
	      println(reply);
	      val other = context.actorSelection(ActorPath.fromString("akka://default/user/hithere"))
	      other ! "hello world";
      }
  }

When it’s time to talk to the other actor, I provide that “path” that allows me to retrieve the reference to that specific instance. Eventually, I can send that message and make it all work beautifully.

hello world 1
hi there
hello world 0
hi there
hello world 2
hi there
hello world 3
hi there
hello world 4
hi there
hello world 6
hi there
hello world 8
hi there
hello world 5
hi there
hello world 7
hi there
hello world 9
hi there

Thanks to my “outstanding” charting skills, I can show you what is actually happening.

actor model example #1

Some considerations:

  • You can tell when a message is sent, but you can’t tell when it’s going to be handled
  • If an actor is busy performing a task, the message will stay in the mailbox until it’s done
  • The more the actors of a certain type, the more messages are going to be handled in parallel, but this number is perfectly finite and every actor is clearly identifiable

The fact that an actor is consistently deployed and can perform one operation at a time, gives us access to its stateful capabilities, so I wouldn’t find anything weird if you wanted to store the state of your actor and write a functionality to provide it to another actor:

object HelloWorldActor{

  case class Status
}

class HelloWorldActor extends Actor{

  var counter : Int = 0
  var lastGreeting : String = null

  def receive = {
    case message : String =>
      if(message.startsWith("greetings")){
    	  counter+=1;
    	  lastGreeting = message;
	      Thread.sleep(new Random().nextInt(100))
	      val reply : String = message.replace("greetings","hello world")
	      println(reply);
	      val other = context.actorSelection(ActorPath.fromString("akka://default/user/hithere"))
	      other ! "hello world";
      }
    case HelloWorldActor.Status =>
      val status = Map(("counter",counter), ("lastGreeting",lastGreeting))
      sender() ! status
  }
}

In the “object” section we’re declaring a class that can be used as a message. It is a handy way to identify messages designed to trigger a specific event.
Right before the receive closure, I declared two local variables and in the “message” case, I updated them. Since this method is going to run once at a time by design, there’s no race condition at all.
In line 22, we therefore created a new case (representing a request for a status update) and in line 23, we craft a Map to be used as a message. Finally, in line 24, I send the message to the sender, that is the actor who sent the Status “request” to this actor.
What’s left to implement is a checker actor that asks HelloWorldActors about their status, not included in this article, but you already have all the data to build one yourself.

Routers

Hopefully, the simple hello world triggered some thinking in your brain about how you could apply this approach to a number of situations you handled differently.
On the other hand, it is also obvious this is not the end of the story, otherwise it wouldn’t be that great deal at all.
What we’re going to talk about is something you already puked on while reading the previous example: balancing messages between actors of the same type.
Of course there’s a neat solution to this problem, but not all implementations are that smooth.
Fortunately, we’re working with Akka, which provides us a number of solutions, based on how complex the scenario is.
Here’s an easy one

object Main extends App {
	 val actorSystem : ActorSystem = ActorSystem()
	 val router = actorSystem.actorOf(Props[HelloWorldActor].withRouter(RoundRobinRouter(nrOfInstances = 2)))
	 val hta = actorSystem.actorOf(Props[HiThereActor],"hithere")
	 for ( i <- 0 until 10)
		router ! "greetings "+i
	 Thread.sleep(1000)
	 actorSystem.shutdown()
}

In line 3, we state we want to create a router that will take care of 2 HelloWorldActor instances, and it will distribute the messages in round robin.
In the loop, we simply send our message to the router and obtain the expected behavior, without really knowing anything about the HelloWorldActor instances. Yet the actor instances are constantly and consistently 2, stateful, performing tasks linearly.
Needless to say RoundRobinRouter is just one of the options. Other implementations are already in the box, such as SmallestMailboxRouter, and of course you can create yours, based on your needs. After all, as you may have guessed, routers are (special) actors as well.

Failure and recovery

What happens to an actor when it fails? In the previous article, we mentioned how letting an actor fail rather than blindly try-catching it is a good thing. But of course, you can’t simply ignore failure, so how are you going to handle it, really?
Let’s get back to the example “Routers” example, and edit the HelloWorldActor as follows:

class HelloWorldActor extends Actor{

  def receive = {
    case message : String =>
      if(message.startsWith("greetings")){
    	  val i = 1/0
	      Thread.sleep(new Random().nextInt(100))
	      val reply : String = message.replace("greetings","hello world")
	      println(reply);
      }
  }
}

If you’re not one of those believing math is a New World Order plot against free thinkers, you can clearly tell this is going to blow up, but what you are going to get is probably different from what you expect.
In your console, a number of: “java.lang.ArithmeticException: / by zero” will show up. How many? It depends, not one, nor 10. In my case, I had 5.
Ok, let’s throw in some more mystery, by changing the “main” loop like this:

	 for ( i <- 0 until 10) {
	   router ! "greetings "+i
	   Thread.sleep(100)
	 }

And you just got 10. This is the Holy Grail of the WTF.
Long story short:

  • Actors are supervised by supervisors which decide what to do when an actor fails
  • Every actor is responsible for supervising its child actors, so this is where the supervision strategy is declared
  • By default, actors have a supervisor that will reinitialize and restart a failing child actor
  • When an actor blows up and gets reinitialized, its mailbox is lost
  • If you send messages to an actor very fast, they will stack up in the mailbox as the actor is executing the first task, so when the actor fails the mailbox will not be empty
  • By slowing down the message rate, the actor will fail before the messages start stacking up

Now, this example might look a bit stupid (because it fails every time) but tells us a lot about how failure recovery is a key topic. Why the actors are restarting by default and not simply resuming operations? The reason is pretty simple: as actors are stateful, you can’t really tell a priori if the failure is caused by the state of the actor itself, so it’s safer to restart it from scratch. In a previous example, we used the actor state to store some status information, but what if the state was a persistent connection to a database?
This is not necessarily true for you, as there are a lot of things you might want to do in case of failure, maybe based on the type of failure.
Back to our example, we might be interested in having the router (parent of the HelloWorldActor) to adopt a different strategy, such as “Resume”, which means “never mind, keep going”.

So here’s how to declare this:

object Main extends App {
     
     val actorSystem : ActorSystem = ActorSystem()
     val supervisorStrategy = OneForOneStrategy(){
      case _:ArithmeticException  =>
        SupervisorStrategy.Resume;
      case _:Exception =>
        SupervisorStrategy.Stop;
    }
     val router = actorSystem.actorOf(Props[HelloWorldActor].withRouter(RoundRobinRouter(nrOfInstances = 2,supervisorStrategy = supervisorStrategy)))
     val hta = actorSystem.actorOf(Props[HiThereActor], "hithere")
    
     for ( i <- 0 until 10)
         router ! "greetings "+i
    
     Thread.sleep(5000)
     actorSystem.shutdown()
    
}

As we’re not implementing the router itself, we’re passing the supervisor strategy as a parameter. In this case, we’re basically saying:

  • This strategy has to be applied to each child actor individually. In opposition to the OneForOneStrategy, the AllForOneStrategy takes action on all child actors.
  • In case the error is an ArithmeticException, don’t worry and resume operations, which means “we’re pretty sure the error is not related to the actor state”.
  • In case the error is any other exception, restart the actor.

Now, since our example throws ArithmeticExceptions, we now expect the actors not to be restarted and keep trying for all the messages it has in the queue.

Of course in this example we have the router supervise its child actors, but when you implement an actor yourself that has child actors, you can simply override the default supervisor inside the actor by writing:

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException      => Resume
    case _: NullPointerException     => Restart
    case _: IllegalArgumentException => Stop
    case _: Exception                => Escalate
  }

Now this snippet shows 2 more features I do believe you can study by yourself. Just a hint so you know where to start:

  • the actor can be restarted 10 times in a minute before it’s considered legally dead
  • If the failure is a generic exception, escalate the supervision of the event to the parent actor, higher in the chain of responsibility (and maybe it’ll know what to do…)

Remoting

The awesomeness parade isn’t over yet.
In Akka, actors are not necessarily meant to be in the very same process, not even in the same server! So rather than simply instantiating and using your actors, you can transform the process in a server, awaiting messages for its actors.
And guess what, it’s pretty easy to do. First thing you need to do, is downloading and employing Akka Remote library which adds the feature we need.
To accomplish this, we are going to use another interesting feature of Akka: a mighty configuration file where you can configure the platform behavior and preconfigure certain components. All the magic happens in the application.conf file that has to be in the program classpath.
Now, there’s a big number of ways to work with remoting. You can remotely deploy actors, balance, distribute etc. Honestly, I’m pretty sure you can start discovering it yourself once you have a basic working example, and that’s exactly what we’re going to do.

  • Two programs, CX and SX
  • CX contains the HiThereActor
  • SX contains the HelloWorldActor
  • CX sends a message to the HelloWorldActor in SX
  • HelloWorldActor sends a message to the HiThereActor in CX

SX

//Main
object Main extends App {
	val actorSystem : ActorSystem = ActorSystem()
	actorSystem.actorOf(Props[HelloWorldActor],"hello")
}

//HelloWorldActor
class HelloWorldActor extends Actor{

  def receive = {
    case message : String =>
      if(message.startsWith("greetings")){
	      Thread.sleep(new Random().nextInt(100))
	      val reply : String = message.replace("greetings","hello world")
	      println(reply);
	      val other = context.actorSelection(ActorPath.fromString("akka.tcp://default@127.0.0.1:2553/user/hi"))
	      other ! "hello world";
      }
  }
}

//application.conf
akka {
  actor.provider = "akka.remote.RemoteActorRefProvider"
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
  }
}

CX

//Main
object Main extends App{
  val actorSystem : ActorSystem = ActorSystem()
  val hello = actorSystem.actorSelection("akka.tcp://default@127.0.0.1:2552/user/hello")
  val hithere = actorSystem.actorOf(Props[HiThereActor], name="hi")
  hello ! "greetings 1"
}

//HiThereActor
class HiThereActor extends Actor{
 def receive = {
  case "hello world" => println ("hi there")
 }
}

//application.conf
akka {
 actor.provider = "akka.remote.RemoteActorRefProvider"
 remote {
  enabled-transports = ["akka.remote.netty.tcp"]
  netty.tcp {
   hostname = "127.0.0.1"
   port = 2553
  }
 }
} 

When SX starts, a HelloWorldActor is initialized with “hello” as name. The configuration says the server is going to listen on port 2552. The process is not going to stop once the App code is executed, instead it’s going to stay there and join the conversation.
When CX starts, a HiThereActor is initialized with “hi” as name. Just like SX, the server is going to stay on and listen on port 2553. The App code eventually retrieves a reference to the actor known as “hello”, explicitly pointing out its network location. Once the reference is there, it sends the “greeting 1” message.
The message is accepted by the SX server which will print “hello world 1”, obtain a reference to the actor known as “hi” in the CX server and send a message back.
Needless to say that the CX console will eventually print “hi there”.

In this simple example, you can see the easiest (and least flexible) way to invoke a remote actor, which is, by the way, the 10% of what this thing is capable of. Of course explicitly referencing actor names and IP addresses right in the code is not advisable, but it gives you the idea.
Routing messages over a pool of distributed actors in a large scale cluster, deploying actors remotely, managing failure and recovery are just a glimpse of the other topics you will need to work on to fully master this beast.
Yet there really is nothing difficult in unleashing the other capabilities. Want to load balance between multiple actors of the same type in different servers?

...
actor{
  	provider = "akka.remote.RemoteActorRefProvider"
  	deployment {
		/hwGroup {
			router = round-robin-group
			routees.paths = [ "akka.tcp://default@serverone:2552/user/hello1","akka.tcp://default@serverone:2552/user/hello2", "akka.tcp://default@servertwo:2552/user/hello1" ]
		}
	}
  }
...

Conclusion

As the picture on how the actor model can help you becomes clearer, you should become more aware of what to look for in a good implementation.
Whether you’re a Java guru or a Scala “activist”, Akka represents a great opportunity of writing tidy code. It’s elegant, consistent and productive at the same time, three good qualities that demonstrate a sharp vision and a solid project.
Since there still so much to talk about and the topic is very vivid in my head, I might get back at it and talk about some specific scenarios.

In the meantime, take care.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s