当前位置: 动力学知识库 > 问答 > 编程问答 >

Most correct way work with IO/blocking operations in Akka

问题描述:

I have a question about a correct way work with IO blocking operations, i read that the right model it's wrap blocking actions into Future and work with this object instead of call blocking operations inside actor.

But i see this have as minimum two solutions

case class AccountBalance(id: Int, userId: Int, total: Int)

object AccountBalance {

def getUserAccountBalance(userId: Int)(

implicit ec: ExecutionContext): Future[AccountBalance] = {

Future {

AccountBalance(1, userId, 1)

}

}

def updateAccountBalance(id: Int, total: Int)(implicit ec: ExecutionContext): Future[AccountBalance] = {

Future {

AccountBalance(id, 1, total)

}

}

// Main logic

def getAndInc(userId: Int)(implicit ec: ExecutionContext) = {

getUserAccountBalance(userId).flatMap { balance =>

updateAccountBalance(balance.id, balance.total + 1)

}

}

}

In first approach i use AccountBalanece.getAndInc method inside actor:

class Approach1 extends Actor {

implicit val executionContext = context.dispatcher

def receive = {

case Calculate(userId) =>

AccountBalance.getAndInc(userId) pipeTo sender

}

}

For another solution (more comfortable for me)

class Approach2 extends Actor {

implicit val executionContext = context.dispatcher

var firstSender: ActorRef = null

var userId: Int = -1

def receive = {

case Calculate(givenUserId) =>

userId = givenUserId

firstSender = sender

AccountBalance.getUserAccountBalance(userId) pipeTo self

case AccountBalance(id, _, total) =>

AccountBalance.updateAccountBalance(id, total + 1) pipeTo firstSender

}

}

What solution is better (or both dangerous and not usable)?

And second question about ExecutionContext, for examples i use context.dispatcher for real application i use the next:

class A extends Actor {

implicit val executionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10))

override def postStop = {

executionContext.shutdown()

}

def receive = { case _ => }

}

If i use executionContext.shutdown after actor is stopped, this is correct way to shutdown thread pool and free all resources?

网友答案:

As reactive programming defines is desired to make sure the execution thread is not blocked. So, any blocking operation should be handled by a different execution context in order to not block threads that dispatch actors commands/messages. I use the second pattern where my actor is using "become" and goes to a different behavior waiting a response from the blocking operation. This is desired to not handle any new request and lose the original sender actorRef. If you still want the actor to receive other messages and dispatch those you need to make a link between original senders and received responses.

Yep, stop it that way, will work, but I suggest to be very carefull when thread pools are created and to share them between blocking actors. Take care that actor A does not get restarted by supervisor if shutdown throws exception and you just want to shutdown actor A only, and not all akka system.

分享给朋友:
您可能感兴趣的文章:
随机阅读: