Scalaz(45)- concurrency :Task-函数式多线程编程核心配件

来源:转载

    我们在上一节讨论了scalaz Future,我们说它是一个不完善的类型,最起码没有完整的异常处理机制,只能用在构建类库之类的内部环境。如果scalaz在Future类定义中增加异常处理工具的话,用户就会经常遇到Future[Throwable\/A]这样的类型,那么在进行Monadic编程时就必须使用Monad Transformer来匹配类型,程序就会变得不必要的复杂。scalaz的解决方案就是把Future[Throwable\/A]包嵌在Task类里,然后把所有Future都统一升格成Task。Task是个Monad, 这样,我们就可以统一方便地用Task来进行多线程函数式编程了。我们先看看Task的定义:scalaz.concurrent/Task.scala

class Task[+A](val get: Future[Throwable \/ A]) {

def flatMap[B](f: A => Task[B]): Task[B] =

new Task(get flatMap {

case -\/(e) => Future.now(-\/(e))

case \/-(a) => Task.Try(f(a)) match {

case e @ -\/(_) => Future.now(e)

case \/-(task) => task.get

}

})

def map[B](f: A => B): Task[B] =

new Task(get map { _ flatMap {a => Task.Try(f(a))} })

...

Task实现了flatMap,所以是个Monad,我们可以在for-comprehension中使用Task。

Task的构建方式与Future一样:

1 val tnow = Task.now { println("run now ..."); 3+4 }

2 //> run now ...

3 //| tnow : scalaz.concurrent.Task[Int] = [email protected]

4 val tdelay = Task.delay { println("run delay ..."); 3+4 }

5 //> tdelay : scalaz.concurrent.Task[Int] = [email protected]

6 val tapply = Task { println("run apply ..."); 3+4 }

7 //> tapply : scalaz.concurrent.Task[Int] = [email protected]

同样,now函数是即时运算的。它就是一个lifter,能把一个普通运算直接升格为Task。

针对Task有几种运算方法:

 1 tnow.unsafePerformSync //> res0: Int = 7

2 tdelay.unsafePerformSync //> run delay ...

3 //| res1: Int = 7

4 tnow.unsafePerformAsync {

5 case \/-(a) => println(s"the result is: $a")

6 case -\/(e) => println(e.getMessage)

7 } //> the result is: 7

8 tdelay.unsafePerformAsync {

9 case \/-(a) => println(s"the result is: $a")

10 case -\/(e) => println(e.getMessage)

11 } //> run delay ...

12 //| the result is: 7

13 tapply.unsafePerformAsync {

14 case \/-(a) => println(s"the result is: $a")

15 case -\/(e) => println(e.getMessage)

16 }

17 Thread.sleep(1000) //> run apply ...

18 //| the result is: 7

从上面的例子我们可以得出:tnow已经完成了运算,因为运算结果没有"run now ..."提示了。tdelay和tapply都是存在trampoline结构里的。但tapply存在更深一层的结构里,所以我们必须拖时间来等待tapply的运算结果。tdelay存放在Future.Suspend结构里,而tapply是存放在Future.Async结构里的,所以tdelay是一种延迟运算,而tapply就是异步运算了:

 1 def delay[A](a: => A): Task[A] = suspend(now(a))

2 def suspend[A](a: => Task[A]): Task[A] = new Task(Future.suspend(

3 Try(a.get) match {

4 case -\/(e) => Future.now(-\/(e))

5 case \/-(f) => f

6 }))

7 //Future.suspend:

8 def suspend[A](f: => Future[A]): Future[A] = Suspend(() => f)

9

10 def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Task[A] =

11 new Task(Future(Try(a))(pool))

12 //Future.apply

13 def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Future[A] = Async { cb =>

14 pool.submit { new Callable[Unit] { def call = cb(a).run }}

15 }

好了,我们再看看Task是怎样处理异常情况的:

 1 def eval(value: => Int) = Task { Thread.sleep(1000); value }

2 //> eval: (value: => Int)scalaz.concurrent.Task[Int]

3 eval( 3 * 7 ).onFinish {

4 case None => Task { println("finished calculation successfully.") }

5 case Some(e) => Task { println(s"caught error [${e.getMessage}]") }

6 }.unsafePerformSyncAttempt match {

7 case -\/(e) => println(s"calculation error [${e.getMessage}]")

8 case \/-(a) => println(s"the result is: $a")

9 } //> finished calculation successfully.

10 //| the result is: 21

11 // 异常处理

12 eval( 3 * 7 / 0 ).onFinish {

13 case None => Task { println("finished calculation successfully.") }

14 case Some(e) => Task { println(s"caught error [${e.getMessage}]") }

15 }.unsafePerformAsync {

16 case -\/(e) => println(s"calculation error [${e.getMessage}]")

17 case \/-(a) => println(s"the result is: $a")

18 }

19 Thread.sleep(2000) //> caught error [/ by zero]

20 //| calculation error [/ by zero]

精准异常处理例子:

1 import java.util.concurrent._

2 val timedTask = Task {Thread.sleep(2000); 3+4}

3 //> timedTask : scalaz.concurrent.Task[Int] = [email protected]

4 timedTask.timed(1 second).handleWith {

5 case e: TimeoutException => Task { println(s"calculation exceeding time limit: ${e.getMessage}") }

6 }.unsafePerformSync //> calculation exceeding time limit: Timed out after 1000 milliseconds

7 //| res2: AnyVal{def getClass(): Class[_ >: Int with Unit <: AnyVal]} = ()

再看一些多线程编程例子:

 1 val tasks = (1 |-> 5).map(n => Task{ Thread.sleep(100); n })

2 //> tasks : List[scalaz.concurrent.Task[Int]] = List([email protected]

3 //| 8b19ad, [email protected], [email protected], s

4 //| [email protected], [email protected])

5 //并行运算list of tasks

6 Task.gatherUnordered(tasks).unsafePerformSync //> res3: List[Int] = List(1, 2, 3, 4, 5)

7 val sb = new StringBuffer //> sb : StringBuffer =

8 val t1 = Task.fork { Thread.sleep(100); sb.append("a"); Task.now("a")}

9 //> t1 : scalaz.concurrent.Task[String] = [email protected]

10 val t2 = Task.fork { Thread.sleep(800); sb.append("b"); Task.now("b")}

11 //> t2 : scalaz.concurrent.Task[String] = [email protected]

12 val t3 = Task.fork { Thread.sleep(200); sb.append("c"); Task.now("c")}

13 //> t3 : scalaz.concurrent.Task[String] = [email protected]

14 val t4 = Task.fork { Thread.sleep(100); sb.append("d"); Task.now("d")}

15 //> t4 : scalaz.concurrent.Task[String] = [email protected]

16 val t5 = Task.fork { Thread.sleep(400); sb.append("e"); Task.now("e")}

17 //> t5 : scalaz.concurrent.Task[String] = [email protected]

18 val t6 = Task.fork { Thread.sleep(100); sb.append("f"); Task.now("f")}

19 //> t6 : scalaz.concurrent.Task[String] = [email protected]

20 val r = Nondeterminism[Task].nmap6(t1,t2,t3,t4,t5,t6)(List(_,_,_,_,_,_))

21 //> r : scalaz.concurrent.Task[List[String]] = [email protected]

22 r.unsafePerformSync //> res4: List[String] = List(a, b, c, d, e, f)

看个耗时算法的并行运算吧:

 1 def seqFib(n: Int): Task[Int] = n match {

2 case 0 | 1 => Task now 1

3 case n => {

4 for {

5 x <- seqFib(n-1)

6 y <- seqFib(n-2)

7 } yield x + y

8 }

9 } //> seqFib: (n: Int)scalaz.concurrent.Task[Int]

10 //并行算法

11 def parFib(n: Int): Task[Int] = n match {

12 case 0 | 1 => Task now 1

13 case n => {

14 val ND = Nondeterminism[Task]

15 for {

16 pair <- ND.both(parFib(n-1), parFib(n-2))

17 (x,y) = pair

18 } yield x + y

19 }

20 } //> parFib: (n: Int)scalaz.concurrent.Task[Int]

21 def msFib(n: Int, fib: Int => Task[Int]) = for {

22 b <- Task now { System.currentTimeMillis() }

23 a <- fib(n)

24 e <- Task now { System.currentTimeMillis() }

25 } yield (a, (e-b)) //> msFib: (n: Int, fib: Int => scalaz.concurrent.Task[Int])scalaz.concurrent.T

26 //| ask[(Int, Long)]

27

28 msFib(20, parFib).unsafePerformSync //> res3: (Int, Long) = (10946,373)

29 msFib(20, seqFib).unsafePerformSync //> res4: (Int, Long) = (10946,17)

哎呀!奇怪了,为什么并行算法要慢很多呢?这个问题暂且放一放,以后再研究。当然,如果有读者能给出个解释就太感激了。Task的线程池是如何分配的呢?看看Task.apply和Task.fork:

 /** Create a `Task` that will evaluate `a` using the given `ExecutorService`. */

def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Task[A] =

new Task(Future(Try(a))(pool))

def fork[A](a: => Task[A])(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Task[A] =

apply(a).join

//Future.apply

/** Create a `Future` that will evaluate `a` using the given `ExecutorService`. */

def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Future[A] = Async { cb =>

pool.submit { new Callable[Unit] { def call = cb(a).run }}

这两个函数都包括了一个隐式参数implicit pool: ExecutorService。默认值是Strategy.DefultExecutorService。我们可以这样指定线程池:

1 Task {longProcess}(myExecutorService)

2 Task.fork { Task {longProcess} }(myExecutorService)

下面是一个动态指定线程池的例子:

 1 import java.util.concurrent.{ExecutorService,Executors}

2 type Delegated[A] = Kleisli[Task,ExecutorService,A]

3 def delegate: Delegated[ExecutorService] = Kleisli(e => Task.now(e))

4 //> delegate: => demo.ws.task.Delegated[java.util.concurrent.ExecutorService]

5 implicit def delegateTaskToPool[A](ta: Task[A]): Delegated[A] = Kleisli(x => ta)

6 //> delegateTaskToPool: [A](ta: scalaz.concurrent.Task[A])demo.ws.task.Delegated[A]

7 val tPrg = for {

8 p <- delegate

9 b <- Task("x")(p)

10 c <- Task("y")(p)

11 } yield c //> tPrg : scalaz.Kleisli[scalaz.concurrent.Task,java.util.concurrent.Executor

12 //| Service,String] = Kleisli(<function1>)

13 tPrg.run(Executors.newFixedThreadPool(3)).unsafePerformSync

14 //> res3: String = y

当然,Task和scala Future之间是可以相互转换的:

 1 import scala.concurrent.{Future => sFuture}

2 import scala.util.{Success,Failure}

3 import scala.concurrent.ExecutionContext

4 def futureToTask[A](fut: sFuture[A])(implicit ec: ExecutionContext): Task[A] =

5 Task.async {

6 cb =>

7 fut.onComplete {

8 case Success(a) => cb(a.right)

9 case Failure(e) => cb(e.left)

10 }

11 }

12 def taskToFuture[A](ta: Task[A]): sFuture[A] = {

13 val prom = scala.concurrent.Promise[A]

14 ta.unsafePerformAsync {

15 case -\/(e) => prom.failure(e)

16 case \/-(a) => prom.success(a)

17 }

18 prom.future

19 }

与Future不同的是:Task增加了异常处理机制。

 

 

 

 

 

 

 

分享给朋友:
您可能感兴趣的文章:
随机阅读: