您好,登錄后才能下訂單哦!
好程序員大數據學習路線分享Actor學習筆記,在scala中她能實現很強大的功能,他是基于并發機制的一個事件模型
我們現在學的scala2.10.x版本就是之前的Actor
?
同步:在主程序上排隊執行的任務,只有前一個任務執行完畢后,才能執行下一個任務
異步:指不進入主程序,而進入"任務對列"的任務,只有等主程序任務執行完畢,"任務對列"開始請求主程序,請求任務執行,該任務會進入主程序
?
java
共享變量 -- 加鎖
會出現鎖死問題
scala
Actor不共享數據
沒有鎖的概念
Actor通信之間需要message(通信)
Aactor執行順序
1.首先調用start()方法啟動Actor
2.調用start()方法后act()方法會被執行
3.Actor之間進行發送消息
Actor發送消息的三種方式
! -> 發送異步消息,沒有返回值
!? -> 發送同步消息,有返回值,會有線程等待
!! -> 發送異步消息,有返回值,返回值類型Future[Any](用來獲取異步操作結果)
?
Actor并行執行
//注意,這兩個actor會并行執行,當其中一個for循環結束后,actor結束 object ActorDemo01 { ??def main(args: Array[String]): Unit = { ????MyActor1.start() ????MyActor2.start() ??} } ? object MyActor1 extends Actor{ ??override def act(): Unit = { ????for (i <- 1 to 10){ ??????println(s"actor => $i") ??????Thread.sleep(2000) ????} ??} ? ??object MyActor2 extends Actor{ ????override def act(): Unit = { ??????for (i <- 1 to 5){ ????????println(s"actor2 => $i") ????????Thread.sleep(2000) ??????} ????} ??} } |
用Actor不斷接受消息
執行第一種方式,異步
object ActorDemo02 { ??def main(args: Array[String]): Unit = { ????val actor: MyActor = new MyActor ????actor.start() ? ????//并行執行 ????actor ! "start" ?// !->異步 ????actor ! "stop" ????println("發送完成") ? ??} } ? class MyActor extends Actor{ ??override def act(): Unit = { ????while (true){ ??//死循環 ??????receive { ??//接收 ????????case "start" => { ??????????println("starting") ??????????Thread.sleep(1000) ??????????println("started") ????????} ????????case "stop" => { ??????????println("stopping") ??????????Thread.sleep(1000) ??????????println("stopped") ????????} ??????} ????} ??} } |
第二種方式:利用react來代替receive,也就是說react線程可復用,比receive更高效
object ActorDemo03 { ??def main(args: Array[String]): Unit = { ????val actor: MyActor3 = new MyActor3 ????actor.start() ????actor ! "start" ????actor ! "stop" ????println("成功了") ??} } ? class MyActor3 extends Actor{ ??override def act(): Unit = { ????loop { ??????react{ ????????case "start" =>{ ??????????println("starting") ??????????Thread.sleep(1000) ??????????println("sarted") ????????} ????????case "stop" =>{ ??????????println("stoppting") ??????????Thread.sleep(1000) ??????????println("stopped") ????????} ??????} ????} ??} } |
結合樣例類練習Actor發送消息
//創建樣例類 case class AsyncMsg(id: Int, msg: String) case class SyncMsg(id: Int, msg: String) case class ReplyMsg(id: Int, msg: String) ? object ActorDemo01 extends Actor { ??override def act(): Unit = { ????while (true) { ??????receive { ????????case "start" => println("starting...") ????????case AsyncMsg(id, msg) => ????????{ ??????????println(s"id:$id,msg:$msg") ??????????sender ! ReplyMsg(1,"sucess") ?//接收到消息后返回響應消息 ????????} ????????case SyncMsg(id,msg) => { ??????????println(s"id:$id,msg:$msg") ??????????sender ! ReplyMsg(2,"sucess") ????????} ??????} ????} ??} } ? object ActorTest{ ??def main(args: Array[String]): Unit = { ? ????val actor: Actor = ActorDemo01.start() ? // ???//異步發送消息,沒有返回值 // ???actor ! AsyncMsg(3,"heihei") // ???println("異步消息發送完成,沒有返回值") ? // ???//同步發送消息,有返回值 // ???val text: Any = actor !? SyncMsg(4,"OK") // ???println(text) // ???println("同步消息發送成功") ? ????//異步發送消息,有返回值,返回類型為Future[Any] ????val reply: Future[Any] = actor !! SyncMsg(5,"OK is 不存在的") ????Thread.sleep(2000) ????if (reply.isSet){ ??????val applyMsg: Any = reply.apply() ??????println(applyMsg) ????}else{ ??????println("Nothing") ????} ??} } |
Actor并行化的wordcount
class Task extends Actor { ? ??override def act(): Unit = { ????loop { ??????react { ????????case SubmitTask(fileName) => { ??????????val contents = Source.fromFile(new File(fileName)).mkString ??????????val arr = contents.split("\r\n") ??????????val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length) ??????????//val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)) ??????????sender ! ResultTask(result) ????????} ????????case StopTask => { ??????????exit() ????????} ??????} ????} ??} } ? object WorkCount { ??def main(args: Array[String]) { ????val files = Array("c://words.txt", "c://words.log") ? ????val replaySet = new mutable.HashSet[Future[Any]] ????val resultList = new mutable.ListBuffer[ResultTask] ? ????for(f <- files) { ??????val t = new Task ??????val replay = t.start() !! SubmitTask(f) ??????replaySet += replay ????} ? ????while(replaySet.size > 0){ ??????val toCumpute = replaySet.filter(_.isSet) ??????for(r <- toCumpute){ ????????val result = r.apply() ????????resultList += result.asInstanceOf[ResultTask] ????????replaySet.remove(r) ??????} ??????Thread.sleep(100) ????} ????val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2)) ????println(finalResult) ??} } ? case class SubmitTask(fileName: String) case object StopTask case class ResultTask(result: Map[String, Int]) |
?
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。