统计大文件里,频率最高的10个单词,(Scala akka版)

来源:转载

用C#写完之后,拿akka重写了一遍,算是对我最近学习Scala的一个总结

小弟也是初学,各位大侠见笑了!

Akka核心为actor,actor是一种模式,以消息传递的并发模式

使用的是idea 开发,库管理用的maven

下图是我的设计:

首先是消息类,使用样本类作为消息体:

/**

* 消息体,使用样本类作为消息体

*/

trait WordMessage

/**

* 启动主控Actor消息

*/

case class MasterStartMessage() extends WordMessage

/**

* 单词抽取Actor消息

* @param ws 单词枚举类

*/

case class BufferMessage(ws: WordStream) extends WordMessage

/**

* 单词抽取Actor完成消息

*/

case class CompleteMessage() extends WordMessage

/**

* 返回结果消息

* @param map 单词Hash表

*/

case class Result(map: scala.collection.mutable.HashMap[String, Int])

接下来是工作Actor

/**

* 单词抽取Actor

*/

class BufferWorker extends Actor {

val map = scala.collection.mutable.HashMap.empty[String, Int]

/**

* 接收消息体

*/

override def receive: Actor.Receive = {

/**

* 处理单词抽取

*/

case BufferMessage(value) => {

value.foreach(p=> map(p) = map.getOrElse(p,0)+1)

}

/**

* 完成消息,发送完成消息给主控actor

*/

case CompleteMessage => {

sender ! Result(map)

}

}

}

然后是主控Acotr:

/**

* 主控Actor

* @param fileName 抽取文件名称

* @param nrOfWorker 工作数

*/

class Master(fileName: String, nrOfWorker: Int) extends Actor {

//结果Hash表

private val map = scala.collection.mutable.HashMap.empty[String, Int]

//文件读取流

private val reader = new InputStreamReader((Files.newInputStream(Paths.get(fileName), StandardOpenOption.READ)));

//工作Actor集合

private val actorList: IndexedSeq[ActorRef] = CreateActor()

implicit val timeout = Timeout(20 seconds)

//工作Actor路由

private val workerRouter = context.actorOf(

Props[BufferWorker].withRouter(RoundRobinRouter(actorList)), name = "BufferWorkerRoute"

)

/**

* 根据工作数创建工作Actor数量

* @return Actor集合

*/

def CreateActor(): IndexedSeq[ActorRef] = {

for (i <- 1 to nrOfWorker)

yield context.actorOf(Props[BufferWorker])

}

override def receive: Actor.Receive = {

case MasterStartMessage => {

//读取文件buffer循环发送给工作Actor

var isRead = 1

while (isRead != -1) {

val buffer = new Array[Char](32* 1024 *1024);

isRead = reader.read(buffer)

workerRouter ! BufferMessage(new WordStream(buffer))

}

for(work <- actorList)

{

//发送ASK CompleteMessage消息,阻塞等待消息处理完成

val future = work ? CompleteMessage

val result = Await.result(future,timeout.duration).asInstanceOf[Result]

result.map.foreach(p=>map(p._1) = map.getOrElse(p._1,0)+p._2 )

}

println(map.toList.sortBy(x=> -x._2).take(10))

println(System.currentTimeMillis())

context.system.shutdown()

}

}

}

然后是枚举类,本来想把这里写成尾递归的,后来懒得费时了,就用whlie了:

/**

* 单词枚举器

*/

class WordStream(buffer: Array[Char]) extends Iterator[String] {

var index = -1

var pos = 0

var end = 0

var _hasNext = true

override def hasNext: Boolean = _hasNext

override def next(): String = {

while (index < buffer.length - 1) {

index += 1

val c = buffer(index)

if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z')) {

if (end == 0) {

pos = index

end += 1

}

else {

end += 1

}

}

else if (end != 0) {

_hasNext = true

val tempString = new String(buffer, pos, end)

end = 0

return tempString

}

if (c == '\0')

_hasNext = false

}

_hasNext = false

null

}

}

调用:

object AkkaTest extends App {

println(System.currentTimeMillis())

val system = ActorSystem("word")

val master = system.actorOf(Props(new

Master("D:\\t8.shakespeare.txt",8)),name = "MasterName")

master ! MasterStartMessage

}

 

就到这里,算是我对scala学习的一个总结,2G文件20秒内

代码下载:http://files.cnblogs.com/files/qugangf/ArticleETL.rar


分享给朋友:
您可能感兴趣的文章:
随机阅读: