更新時(shí)間:2022-10-18 11:14:27 來源:動(dòng)力節(jié)點(diǎn) 瀏覽1977次
這是一個(gè) Java、Maven、Akka 項(xiàng)目,演示了如何設(shè)置一個(gè)基本的 Akka 集群。
這個(gè)項(xiàng)目是一系列項(xiàng)目中的一個(gè),它從一個(gè)簡(jiǎn)單的 Akka Cluster 項(xiàng)目開始,逐步構(gòu)建到事件溯源和命令查詢職責(zé)分離的示例。
該項(xiàng)目系列由以下 GitHub 存儲(chǔ)庫(kù)組成:
akka-java-cluster (這個(gè)項(xiàng)目)
akka-java-集群感知
akka-java-cluster-singleton
akka-java-集群分片
akka-java-cluster-persistence
akka-java-cluster-persistence-query
每個(gè)項(xiàng)目都可以獨(dú)立于其他項(xiàng)目進(jìn)行克隆、構(gòu)建和運(yùn)行。
根據(jù)Akka 文檔,“ Akka Cluster 提供了一種容錯(cuò)分散的基于點(diǎn)對(duì)點(diǎn)的集群成員服務(wù),沒有單點(diǎn)故障或單點(diǎn)瓶頸。它使用 gossip 協(xié)議和自動(dòng)故障檢測(cè)器來做到這一點(diǎn)。
Akka 集群允許構(gòu)建分布式應(yīng)用程序,其中一個(gè)應(yīng)用程序或服務(wù)跨越多個(gè)節(jié)點(diǎn)。"
Akka 文檔中的上述段落包含了許多最初可能難以理解的概念。考慮一些僅在兩句話中就被拋棄的術(shù)語,例如“容錯(cuò)”、“去中心化”、“點(diǎn)對(duì)點(diǎn)”和“無單點(diǎn)故障”。最后一句幾乎是隨便說的“一個(gè)應(yīng)用程序或服務(wù)跨越多個(gè)節(jié)點(diǎn)”。等待; 什么?應(yīng)用程序或服務(wù)如何跨越多個(gè)節(jié)點(diǎn)?
答案是 Akka 提供了一個(gè)抽象層,該層由參與者在參與者系統(tǒng)中相互交互組成。Akka 是一個(gè)演員模型的實(shí)現(xiàn)。演員模型” (維基百科)將“演員”視為并發(fā)計(jì)算的通用原語。為了響應(yīng)它接收到的消息,演員可以:做出本地決策,創(chuàng)建更多演員,發(fā)送更多消息,并確定如何響應(yīng)收到的下一條消息。參與者可以修改自己的私有狀態(tài),但只能通過消息相互影響(避免需要任何鎖)。
Akka Actor 通過異步消息相互通信。Akka Actor 系統(tǒng)在 Java 虛擬機(jī)上運(yùn)行,??并且使用 Akka 集群,單個(gè) Actor 系統(tǒng)可以在邏輯上跨越多個(gè)網(wǎng)絡(luò) JVM。這個(gè)網(wǎng)絡(luò)化的參與者系統(tǒng)抽象層使參與者可以跨節(jié)點(diǎn)集群透明地與每個(gè)參與者進(jìn)行通信。一種思考方式是,從演員的角度來看,他們生活在一個(gè)演員系統(tǒng)中,演員系統(tǒng)在一個(gè)或多個(gè)節(jié)點(diǎn)上運(yùn)行的事實(shí)在很大程度上隱藏在抽象層中。
Akka Actor 是用 Java 或 Scala 實(shí)現(xiàn)的。您可以將參與者創(chuàng)建為 Java 或 Scala 類。有兩種實(shí)現(xiàn)actor的方法,無類型和有類型。這個(gè) Akka Java 集群示例項(xiàng)目系列中使用了無類型的 actor。
對(duì)于那些有興趣深入了解 Actor 如何工作以及如何實(shí)現(xiàn)的細(xì)節(jié)的人來說,關(guān)于Actors的 Akka 文檔部分 是一個(gè)很好的起點(diǎn)。
我們將要查看的第一個(gè) Actor 名為 ClusterListenerActor。該參與者設(shè)置為接收有關(guān)集群事件的消息。當(dāng)節(jié)點(diǎn)加入和離開集群時(shí),此參與者會(huì)收到有關(guān)這些事件的消息。然后將這些接收到的消息寫入記錄器。
ClusterListenerActor 提供了集群活動(dòng)的簡(jiǎn)單視圖。以下是日志輸出的示例:
03:20:29.569 INFO cluster-akka.actor.default-dispatcher-4 akka.tcp://cluster@127.0.0.1:2551/user/clusterListener - MemberUp(Member(address = akka.tcp://cluster@127.0.0.1:2553, status = Up)) sent to Member(address = akka.tcp://cluster@127.0.0.1:2551, status = Up)
03:20:29.570 INFO cluster-akka.actor.default-dispatcher-4 akka.tcp://cluster@127.0.0.1:2551/user/clusterListener - 1 (LEADER) (OLDEST) Member(address = akka.tcp://cluster@127.0.0.1:2551, status = Up)
03:20:29.570 INFO cluster-akka.actor.default-dispatcher-4 akka.tcp://cluster@127.0.0.1:2551/user/clusterListener - 2 Member(address = akka.tcp://cluster@127.0.0.1:2552, status = Up)
03:20:29.570 INFO cluster-akka.actor.default-dispatcher-4 akka.tcp://cluster@127.0.0.1:2551/user/clusterListener - 3 Member(address = akka.tcp://cluster@127.0.0.1:2553, status = Joining)
讓我們從完整的 ClusterListenerActor 源文件開始。請(qǐng)注意,此 actor 是作為擴(kuò)展基于 Akka 的類的單個(gè) Java 類實(shí)現(xiàn)的。
package cluster;
import akka.actor.AbstractLoggingActor;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
class ClusterListenerActor extends AbstractLoggingActor {
private final Cluster cluster = Cluster.get(context().system());
private Cancellable showClusterStateCancelable;
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ShowClusterState.class, this::showClusterState)
.matchAny(this::logClusterEvent)
.build();
}
private void showClusterState(ShowClusterState showClusterState) {
log().info("{} sent to {}", showClusterState, cluster.selfMember());
logClusterMembers(cluster.state());
showClusterStateCancelable = null;
}
private void logClusterEvent(Object clusterEventMessage) {
log().info("{} sent to {}", clusterEventMessage, cluster.selfMember());
logClusterMembers();
}
@Override
public void preStart() {
log().debug("Start");
cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
ClusterEvent.ClusterDomainEvent.class);
}
@Override
public void postStop() {
log().debug("Stop");
cluster.unsubscribe(self());
}
static Props props() {
return Props.create(ClusterListenerActor.class);
}
private void logClusterMembers() {
logClusterMembers(cluster.state());
if (showClusterStateCancelable == null) {
showClusterStateCancelable = context().system().scheduler().scheduleOnce(
Duration.ofSeconds(15),
self(),
new ShowClusterState(),
context().system().dispatcher(),
null);
}
}
private void logClusterMembers(CurrentClusterState currentClusterState) {
Optional<Member> old = StreamSupport.stream(currentClusterState.getMembers().spliterator(), false)
.reduce((older, member) -> older.isOlderThan(member) ? older : member);
Member oldest = old.orElse(cluster.selfMember());
StreamSupport.stream(currentClusterState.getMembers().spliterator(), false)
.forEach(new Consumer<Member>() {
int m = 0;
@Override
public void accept(Member member) {
log().info("{} {}{}{}", ++m, leader(member), oldest(member), member);
}
private String leader(Member member) {
return member.address().equals(currentClusterState.getLeader()) ? "(LEADER) " : "";
}
private String oldest(Member member) {
return oldest.equals(member) ? "(OLDEST) " : "";
}
});
}
private static class ShowClusterState {
@Override
public String toString() {
return ShowClusterState.class.getSimpleName();
}
}
}
這個(gè)類是一個(gè)簡(jiǎn)單的actor實(shí)現(xiàn)的例子。然而,這個(gè)actor的獨(dú)特之處在于它訂閱了Akka系統(tǒng)來接收集群事件消息。
@Override
public void preStart() {
log().debug("Start");
cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
ClusterEvent.ClusterDomainEvent.class);
}
Actor 被設(shè)置為接收集群事件消息。當(dāng)這些消息到達(dá)時(shí),參與者調(diào)用編寫的方法來記錄事件并記錄集群的當(dāng)前狀態(tài)。
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ShowClusterState.class, this::showClusterState)
.matchAny(this::logClusterEvent)
.build();
}
隨著集群中的每個(gè)節(jié)點(diǎn)啟動(dòng),ClusterListenerActor 的一個(gè)實(shí)例也會(huì)啟動(dòng)。然后,參與者記錄每個(gè)節(jié)點(diǎn)中發(fā)生的集群事件。您可以再次從每個(gè)節(jié)點(diǎn)的角度檢查來自每個(gè)集群節(jié)點(diǎn)的日志,以查看集群事件并查看集群節(jié)點(diǎn)的狀態(tài)。
在這個(gè)項(xiàng)目中,我們將從一個(gè)基于 Akka、Java 和 Maven 的示例的基本模板開始,其中包含運(yùn)行 Akka 集群的代碼和配置。Maven POM 文件使用兩個(gè)插件,一個(gè)用于使用mvn:exec命令運(yùn)行代碼,另一個(gè)插件構(gòu)建一個(gè)自包含 JAR 文件用于使用java -jar命令運(yùn)行代碼。
當(dāng)項(xiàng)目代碼被執(zhí)行時(shí),動(dòng)作在Runner類main方法中開始。
public static void main(String[] args) {
if (args.length == 0) {
startupClusterNodes(Arrays.asList("2551", "2552", "0"));
} else {
startupClusterNodes(Arrays.asList(args));
}
}
該main方法調(diào)用startupClusterNodes傳遞給它一個(gè)端口列表的方法。如果未提供參數(shù),則使用默認(rèn)的三個(gè)端口集。
private static void startupClusterNodes(List<String> ports) {
System.out.printf("Start cluster on port(s) %s%n", ports);
ports.forEach(port -> {
ActorSystem actorSystem = ActorSystem.create("cluster", setupClusterNodeConfig(port));
AkkaManagement.get(actorSystem).start();
actorSystem.actorOf(ClusterListenerActor.props(), "clusterListener");
addCoordinatedShutdownTask(actorSystem, CoordinatedShutdown.PhaseClusterShutdown());
actorSystem.log().info("Akka node {}", actorSystem.provider().getDefaultAddress());
});
}
這些startupClusterNodes方法循環(huán)通過端口列表。為每個(gè)端口創(chuàng)建一個(gè)參與者系統(tǒng)。
ActorSystem actorSystem = ActorSystem.create("cluster", setupClusterNodeConfig(port));
創(chuàng)建一個(gè)演員系統(tǒng)時(shí)會(huì)發(fā)生很多事情。許多決定如何運(yùn)行actor系統(tǒng)的細(xì)節(jié)是通過配置設(shè)置定義的。這個(gè)項(xiàng)目包括一個(gè)application.conf配置文件,它位于src/main/resources目錄中。最關(guān)鍵的配置設(shè)置之一定義了參與者系統(tǒng)主機(jī)和端口。當(dāng)參與者系統(tǒng)在集群中運(yùn)行時(shí),配置還定義了每個(gè)節(jié)點(diǎn)將如何定位和加入集群。在這個(gè)項(xiàng)目中,節(jié)點(diǎn)使用所謂的種子節(jié)點(diǎn)加入集群。
cluster {
seed-nodes = [
"akka.tcp://cluster@127.0.0.1:2551",
"akka.tcp://cluster@127.0.0.1:2552"]
}
讓我們來看看這個(gè)項(xiàng)目的集群?jiǎn)?dòng)場(chǎng)景。在此示例中,一個(gè) JVM 啟動(dòng)時(shí)沒有運(yùn)行時(shí)參數(shù)。當(dāng)不帶參數(shù)調(diào)用Runner類main方法時(shí),默認(rèn)是在端口 2551、2552 和端口 0 上創(chuàng)建三個(gè)參與者系統(tǒng)(零端口會(huì)導(dǎo)致隨機(jī)選擇非零端口號(hào))。
由于每個(gè)參與者系統(tǒng)都是在特定端口上創(chuàng)建的,因此它會(huì)查看種子節(jié)點(diǎn)配置設(shè)置。如果參與者系統(tǒng)的端口是種子節(jié)點(diǎn)之一,它知道它將與其他種子節(jié)點(diǎn)聯(lián)系以形成集群。如果參與者系統(tǒng)的端口不是種子節(jié)點(diǎn)之一,它將嘗試聯(lián)系種子節(jié)點(diǎn)之一。非種子節(jié)點(diǎn)需要向其中一個(gè)種子節(jié)點(diǎn)宣布自己并要求加入集群。
下面是使用默認(rèn)端口 2551、2552 和 0 的示例啟動(dòng)場(chǎng)景。在端口 2551 上創(chuàng)建了一個(gè)參與者系統(tǒng);查看配置它知道它是一個(gè)種子節(jié)點(diǎn)。端口 2551 上的種子節(jié)點(diǎn)參與者系統(tǒng)嘗試聯(lián)系端口 2552 上的參與者系統(tǒng),即另一個(gè)種子節(jié)點(diǎn)。當(dāng)創(chuàng)建端口 2552 上的參與者系統(tǒng)時(shí),它會(huì)經(jīng)歷相同的過程,在這種情況下,2552 會(huì)嘗試與 2551 聯(lián)系并加入。當(dāng)在隨機(jī)端口(例如端口 24242)上創(chuàng)建第三個(gè)參與者系統(tǒng)時(shí),它會(huì)從配置中知道它不是種子節(jié)點(diǎn),在這種情況下,它會(huì)嘗試與種子參與者系統(tǒng)之一進(jìn)行通信,宣布自己并加入集群。
您可能已經(jīng)注意到,在上面的示例中,三個(gè)參與者系統(tǒng)是在單個(gè) JVM 中創(chuàng)建的。雖然每個(gè) JVM 運(yùn)行多個(gè)參與者系統(tǒng)是可以接受的,但更常見的情況是每個(gè) JVM 運(yùn)行一個(gè)參與者系統(tǒng)。
讓我們看一個(gè)稍微現(xiàn)實(shí)一點(diǎn)的例子。使用提供的akka腳本啟動(dòng)一個(gè)三節(jié)點(diǎn)集群。
./akka cluster start 3
每個(gè)節(jié)點(diǎn)都在單獨(dú)的 JVM 中運(yùn)行。在這里,我們有三個(gè)參與者系統(tǒng),它們?cè)谌齻€(gè) JVM 中獨(dú)立啟動(dòng)。這三個(gè)參與者系統(tǒng)遵循與之前相同的啟動(dòng)場(chǎng)景,結(jié)果它們形成了一個(gè)集群。
當(dāng)然,最常見的場(chǎng)景是每個(gè)參與者系統(tǒng)都是在不同的 JVM 中創(chuàng)建的,每個(gè) JVM 都運(yùn)行在不同的服務(wù)器、虛擬服務(wù)器或容器上。同樣,相同的啟動(dòng)過程發(fā)生在各個(gè)參與者系統(tǒng)通過網(wǎng)絡(luò)找到彼此并形成集群的地方??。
讓我們回到創(chuàng)建actor系統(tǒng)的那一行代碼。
ActorSystem actorSystem = ActorSystem.create("cluster", setupClusterNodeConfig(port));
從這個(gè)簡(jiǎn)短的描述中,您可以看到在actor系統(tǒng)抽象層中發(fā)生了很多事情,而這個(gè)啟動(dòng)過程的總結(jié)只是冰山一角,這是抽象層應(yīng)該做的,它們隱藏了復(fù)雜性。
一旦多個(gè)參與者系統(tǒng)加入一個(gè)集群,從在這個(gè)虛擬參與者系統(tǒng)中運(yùn)行的參與者的角度來看,它們會(huì)形成一個(gè)單一的虛擬參與者系統(tǒng)。當(dāng)然,單個(gè)參與者實(shí)例物理上駐留在特定 JVM 內(nèi)的特定集群節(jié)點(diǎn)中,但在接收和發(fā)送參與者消息時(shí),節(jié)點(diǎn)邊界是透明的并且?guī)缀跸Я恕U沁@種透明性是構(gòu)建“一個(gè)應(yīng)用程序或服務(wù)跨越多個(gè)節(jié)點(diǎn)”的基礎(chǔ)。
此外,通過添加更多節(jié)點(diǎn)來擴(kuò)展集群的靈活性是消除單點(diǎn)瓶頸的機(jī)制。當(dāng)集群中的現(xiàn)有節(jié)點(diǎn)無法處理當(dāng)前負(fù)載時(shí),可以添加更多節(jié)點(diǎn)來擴(kuò)展容量。失敗也是如此。一個(gè)或多個(gè)節(jié)點(diǎn)的丟失并不意味著整個(gè)集群出現(xiàn)故障。可以替換故障節(jié)點(diǎn),并且可以將在故障節(jié)點(diǎn)上運(yùn)行的參與者重新定位到其他節(jié)點(diǎn)。
希望本概述能夠闡明 Akka 如何提供“無單點(diǎn)故障或單點(diǎn)瓶頸”以及“ Akka 集群如何允許構(gòu)建分布式應(yīng)用程序,其中一個(gè)應(yīng)用程序或服務(wù)跨越多個(gè)節(jié)點(diǎn)。 ”
git clone https://github.com/mckeeh3/akka-java-cluster.git
cd akka-java-cluster
mvn clean package
Maven 命令構(gòu)建項(xiàng)目并創(chuàng)建一個(gè)自包含的可運(yùn)行 JAR。
該項(xiàng)目包含一組腳本,可用于啟動(dòng)和停止單個(gè)集群節(jié)點(diǎn)或啟動(dòng)和停止節(jié)點(diǎn)集群。
提供主腳本./akka以運(yùn)行節(jié)點(diǎn)集群或啟動(dòng)和停止單個(gè)節(jié)點(diǎn)。用于./akka node start [1-9] | stop啟動(dòng)和停止單個(gè)節(jié)點(diǎn)以及./akka cluster start [1-9] | stop啟動(dòng)和停止節(jié)點(diǎn)集群。clusterand start 選項(xiàng)將node在端口 2551 到 2559 上啟動(dòng) Akka 節(jié)點(diǎn)。stdinand輸出都使用文件命名約定stderr發(fā)送到目錄中的文件。/tmp/tmp/-N.log
在端口 2551 上啟動(dòng)節(jié)點(diǎn) 1,在端口 2552 上啟動(dòng)節(jié)點(diǎn) 2。
./akka node start 1
./akka node start 2
在端口 2553 上停止節(jié)點(diǎn) 3。
./akka node stop 3
在端口 2551、2552、2553 和 2554 上啟動(dòng)一個(gè)由四個(gè)節(jié)點(diǎn)組成的集群。
./akka cluster start 4
停止所有當(dāng)前正在運(yùn)行的集群節(jié)點(diǎn)。
./akka cluster stop
您可以使用該./akka cluster start [1-9]腳本啟動(dòng)多個(gè)節(jié)點(diǎn),然后使用./akka node start [1-9]and./akka node stop [1-9] 啟動(dòng)和停止單個(gè)節(jié)點(diǎn)。
使用該./akka node tail [1-9]命令tail -f創(chuàng)建節(jié)點(diǎn) 1 到 9 的日志文件。
該命令使用Akka 管理 擴(kuò)展 Cluster Http Management./akka cluster status以 JSON 格式顯示當(dāng)前正在運(yùn)行的集群的狀態(tài) 。
以下 Maven 命令在端口 2551、2552 和 radmonly 選擇的端口上運(yùn)行帶有 3 個(gè) Akka 演員系統(tǒng)的signle JVM。
mvn exec:java
使用 CTRL-C 停止。
要在特定端口上運(yùn)行,請(qǐng)使用以下-D選項(xiàng)傳入命令行參數(shù)。
mvn exec:java -Dexec.args="2551"
默認(rèn)無參數(shù)等價(jià)于以下內(nèi)容。
mvn exec:java -Dexec.args="2551 2552 0"
運(yùn)行測(cè)試的一種常見方法是在多個(gè)命令窗口中啟動(dòng)單個(gè) JVM。這模擬了運(yùn)行多節(jié)點(diǎn) Akka 集群。例如,在 4 個(gè)命令窗口中運(yùn)行以下 4 個(gè)命令。
mvn exec:java -Dexec.args="2551" > /tmp/$(basename $PWD)-1.log
mvn exec:java -Dexec.args="2552" > /tmp/$(basename $PWD)-2.log
mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-3.log
mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-4.log
這將運(yùn)行一個(gè) 4 節(jié)點(diǎn) Akka 集群,在端口 2551 和 2552 上啟動(dòng) 2 個(gè)節(jié)點(diǎn),這些節(jié)點(diǎn)是配置的集群種子節(jié)點(diǎn)和application.conf文件。以及隨機(jī)選擇的端口號(hào)上的 2 個(gè)節(jié)點(diǎn)。可選重定向> /tmp/$(basename $PWD)-4.log是基于項(xiàng)目目錄名稱將日志輸出推送到文件名的示例。
為方便起見,在 Linux 命令 shell 中定義以下別名。
alias p1='cd ~/akka-java/akka-java-cluster'
alias p2='cd ~/akka-java/akka-java-cluster-aware'
alias p3='cd ~/akka-java/akka-java-cluster-singleton'
alias p4='cd ~/akka-java/akka-java-cluster-sharding'
alias p5='cd ~/akka-java/akka-java-cluster-persistence'
alias p6='cd ~/akka-java/akka-java-cluster-persistence-query'
alias m1='clear ; mvn exec:java -Dexec.args="2551" > /tmp/$(basename $PWD)-1.log'
alias m2='clear ; mvn exec:java -Dexec.args="2552" > /tmp/$(basename $PWD)-2.log'
alias m3='clear ; mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-3.log'
alias m4='clear ; mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-4.log'
p1-6 別名命令是 cd 到六個(gè)項(xiàng)目目錄之一的快捷方式。m1-4 別名命令使用適當(dāng)?shù)亩丝趩?dòng)和 Akka 節(jié)點(diǎn)。Stdout 也被重定向到 /tmp 目錄。
相關(guān)閱讀
0基礎(chǔ) 0學(xué)費(fèi) 15天面授
有基礎(chǔ) 直達(dá)就業(yè)
業(yè)余時(shí)間 高薪轉(zhuǎn)行
工作1~3年,加薪神器
工作3~5年,晉升架構(gòu)
提交申請(qǐng)后,顧問老師會(huì)電話與您溝通安排學(xué)習(xí)