开发者

Using akka futures and actors for parallelizing a list

开发者 https://www.devze.com 2023-04-12 04:39 出处:网络
I want to send a list of messages to an actor, receive a reply immediately in a future and then wait for all futures to complete before returning to the calling method. From reading the akka docs, I b

I want to send a list of messages to an actor, receive a reply immediately in a future and then wait for all futures to complete before returning to the calling method. From reading the akka docs, I believe Future.sequence is the way to go but I have not been able to get the following code to work correctly. I get this error from the compiler:

  found   : akka.dispatch.ActorCompletableFuture
  required: akka.dispatch.Future[Integer]
Error occurred in an application involving default arguments.
            futures += secondary ? GetRandom
                                 ^

I'm sure I am missing something obvious but the code below seems to be "correct" per the examples and API docs.

import java.util.Random
import akka.dispatch.Future
import akka.actor._
import Commands._
import collection.mutable.ListBuffer

object Commands {
    trait Command

    case class GetRandom() extends Command
    case class GenRandomList() extends Command  
}

class Secondary() extends Actor {
    val randomGenerator = new Random()

    override def receive = {
        case GetRandom() =>
            self reply randomGenerator.nextInt(100)
    }
}

class Primary() extends Actor {
    private val secondary = Actor.actorOf[Secondary]

    override def receive = {

        case GenRandomList() 开发者_C百科=>

            val futures = new ListBuffer[Future[Integer]]

            for (i <- 0 until 10) {
                futures += secondary ? GetRandom
            }

            val futureWithList = Future.sequence(futures)

            futureWithList.map { foo =>
                println("Shouldn't foo be an integer now? " + foo)
            }.get
    }

    override def preStart() = {
        secondary.start()
    }
}

class Starter extends App {
    println("Starting")
    var master = Actor.actorOf(new Primary()).start()
    master ! GenRandomList()
}

What is the correct way to send a series of messages to an actor, receive a future and return once all the futures have completed (optionally storing the results from each future in a List and returning it).


(secondary ? GetRandom).mapTo[Int]


Akka ? returns a Future[Any] but you need a Future[Int].

Thus you can either define a list which accept all kind of futures:

val futures = new ListBuffer[Future[Any]]

or cast the result as an Int as soon as it is available:

for (i <- 0 until 10) {
  futures += (secondary ? GetRandom) map {
    _.asInstanceOf[Int]
  }
}

BTW, to make it work, you need to change GetRandom definition:

case object GetRandom extends Command

and match it with:

case GetRandom =>
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号