您好,登錄后才能下訂單哦!
??Scala中的actor能夠實現并行編程的強大功能,他是基于事件模型的并發機制,scala是運用消息(message)的發送、接收來實現多線程的(Java是使用共享內存實現的)。使用 Scala 能夠更容易地實現多線程應用的開發。
??一個actor是一個容器,它包含狀態,行為,信箱,子actor和監督策略。所有這些包含在一個actorReference(Actor引用)里。一個actor需要與外界隔離才能從actor模型中獲益,所以actor是以actor引用的形式展現給外界的。
???? - Java 中的并發編程基本上滿足了事件之間相互獨立,但是事件不能夠同時發生的場景的需要。
???? - Java 中的并發編程是基于共享數據和加鎖的一種機制,即會有一個共享的數據,然后有 若干個線程去訪問這個共享的數據(主要是對這個共享的數據進行修改),同時 Java 利用加鎖 的機制(即 synchronized)來確保同一時間只有一個線程對我們的共享數據進行訪問,進而保 證共享數據的一致性。
???? - Java 中的并發編程存在資源爭奪和死鎖等多種問題,因此程序越大問題越麻煩。
???? - Scala 中的 Actor 是一種不共享數據,依賴于消息傳遞的一種并發編程模式,避免了死鎖、資源爭奪等情況。在具體實 現的過程中,Scala 中的 Actor 會不斷的循環自己的郵箱,并通過 receive 偏函數進行消息的模式匹配并進行相應的處理。
???? - 如果 Actor A 和 Actor B 要相互溝通的話,首先 A 要給 B 傳遞一個消息,B 會有一個收件箱,然后 B 會不斷的循環自己的收件箱,若看見 A 發過來的消息,B 就會解析 A 的消息并執行,處理完之后就有可能將處理的結果通過郵件的方式發送給 A。
??對于 Java,我們都知道它的多線程實現需要對共享資源(變量、對象等)使用 synchronized 關鍵字進行代碼塊同步、對象鎖互斥等等。而且,常常一大塊的 try…catch 語句塊中加上 wait 方法、notify 方法、notifyAll 方法是讓人很頭疼的。原因就在于 Java 中多數使用的是可變狀態的對象資源,對這些資源進行共享來實現多線程編程的話,控制好資源競爭與防止對象狀態被意外修改是非常重要的,而對象狀態的不變性也是較難以保證的。而在 Scala 中, 我們可以通過復制不可變狀態的資源(即對象,Scala 中一切都是對象,連函數、方法也是) 的一個副本,再基于 Actor 的消息發送、接收機制進行并行編程。
pom.xml:
<properties>
<scala.version>2.11.8</scala.version>
<scala.actors.version>2.11.8</scala.actors.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-actors -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.actors.version}</version>
</dependency>
</dependencies>
object Base_Actor {
def main(args: Array[String]): Unit = {
//調用start方法
new MyActor1().start()
new MyActor2().start()
}
}
////繼承Actor類,相當于Java中的Thread
class MyActor1 extends Actor{
//重新 act 方法,相當于Java中的run方法
override def act(): Unit = {
println("hahaha")
}
}
//繼承Actor類,相當于Java中的Thread
class MyActor2 extends Actor{
//重新 act 方法,相當于Java中的run方法
override def act(): Unit = {
println("hello world")
}
}
// 注意:上面分別調用了兩個單例對象的 start()方法,他們的 act()方法會被執行,相同與在 Java 中開啟了兩個線程,線程的 run()方法會被執行,這兩個 Actor 是并行執行的。
?
??Akka 基于 Actor 模型,提供了一個用于構建可擴展的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程序的平臺。
????????
??Actor 模型:在計算機科學領域,Actor 模型是一個并行計算(Concurrent Computation)模型, 它把 actor 作為并行計算的基本元素來對待:為響應一個接收到的消息,一個 actor 能夠自己做出一些決策,如創建更多的 actor,或發送更多的消息,或者確定如何去響應接收到的下一個消息。
??Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的對象,Actor之間可以通過交換消息的方式進行通信,每一個actor都有自己的收件箱。通過Actor能夠簡化鎖及線程管理,可以非常容易的開發出正確的并發程序和并行系統。
??Actor 特性:? 提供了一種高級抽象,能夠簡化在并發(Concurrency)/并行(Parallelism)應用場景下 的編程開發? 提供了異步非阻塞的、高性能的事件驅動編程模型? 超級輕量級事件處理(每 GB 堆內存幾百萬 Actor)
??- ActorSystem:在 Akka 中,ActorSystem 是一個重量級的結構,他需要分配多個線程,所以在實際應用中, ActorSystem 通常是一個單例對象,我們可以使用這個 ActorSystem 的 actorOf 方法創建很多 Actor。
??- Actor:在 Akka 中,Actor 負責通信,在 Actor 中有一些重要的生命周期方法。
??- preStart()方法:該方法在 Actor 對象構造方法執行后執行,整個 Actor 生命周期中僅執行一次。
??- receive()方法:該方法在 Actor 的 preStart 方法執行完成后執行,用于接收消息,會被反復執行。
??????
pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<scala.actors.version>2.11.8</scala.actors.version>
<akka.version>2.4.17</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-actors -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.actors.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
master:
import java.text.SimpleDateFormat
import java.util._
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
class MyMaster extends Actor{
def doHell(): Unit ={
println("我是master,我接受到了worker的消息!")
}
/**
* 這就是一個偏函數, 用來處理當前這個actor對象接收到的所有的消息
*/
override def receive: Receive = {
case "hello" =>{
doHell
//用以發送信息到相應的worker,!表示 異步無返回值
sender() ! "hi"
}
case "getNow" =>{
doHell
sender() ! new SimpleDateFormat("yyyy-MM-dd").format(new Date())
}
}
}
object MyMaster{
def main(args: Array[String]): Unit = {
//1.構建一個:ActorSystem
val strConfig=
"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = localhost
|akka.remote.netty.tcp.port = 9527
""".stripMargin
val config: Config = ConfigFactory.parseString(strConfig)
val myMaster: ActorSystem = ActorSystem("ActorSystemMaster",config)
//2.通過actorsystem創建actor
myMaster.actorOf(Props(new MyMaster()),"MasterActor")
}
}
worker:
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
class MyWorker extends Actor{
override def preStart(): Unit = {
val hostname="localhost"
val serveractorsystem="ActorSystemMaster"
val serveractor="MasterActor"
val port="9527"
//在創建worker actor之前向master發送一個消息
val master=context.actorSelection(s"akka.tcp://${serveractorsystem}@${hostname}:${port}/user/${serveractor}")
val message="getNow"
//獲得master相關對象,向master發送信息
master ! message
}
//處理相應的來自master返回的信息
override def receive: Receive = {
case date:String => {
println("時間日期:"+date)
}
case "hi" =>{
println("我是worker,接收到master發送過來的結果: hi")
}
}
}
object MyWorker{
def main(args: Array[String]): Unit = {
//1.構建一個:ActorSystem
val strConfig:String=
"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = localhost
""".stripMargin
val config: Config = ConfigFactory.parseString(strConfig)
val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem",config)
workerActorSystem.actorOf(Props(new MyWorker()),"workerActor")
}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。