更新時間:2022-12-02 10:38:53 來源:動力節(jié)點(diǎn) 瀏覽1847次
Java實(shí)現(xiàn)消息隊(duì)列的方法有哪些?動力節(jié)點(diǎn)小編來告訴大家。
消息隊(duì)列是MQ是一種系統(tǒng)間相互協(xié)作的通信機(jī)制
Broker:消息處理中心,負(fù)責(zé)消息的接收、存儲、轉(zhuǎn)發(fā)等;
Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到消息處理中心;
Consumer:消息消費(fèi)者,負(fù)責(zé)從消息處理中心獲取消息,并進(jìn)行相應(yīng)的處理。
其結(jié)構(gòu)如下所示:
(1)消息處理中心
作為消息處理中心,至少有一個數(shù)據(jù)容器來保存接收到的消息。這里采用java中隊(duì)列(Queue)的一個子類ArrayBockingQueue來實(shí)現(xiàn)。
如下是消息處理中心Broker的實(shí)現(xiàn):
import java.util.concurrent.ArrayBlockingQueue;
public class Broker {
private final static int MAX_SIZE = 3;
private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
public static void produce(String msg){
if(messageQueue.offer(msg)){
System.out.println("成功向消息處理中心投遞消息: " + msg + ",當(dāng)前緩存的消息數(shù)量是:"+ messageQueue.size());
} else{
System.out.println("消息處理中心內(nèi)暫存的消息達(dá)到最大負(fù)荷,不能繼續(xù)放入消息!");
}
System.out.println("==============================");
}
public static String consume(){
String msg = messageQueue.poll();
if(msg != null){
System.out.println("已經(jīng)消費(fèi)消息:" + msg + ",當(dāng)前暫存的消息數(shù)量是:" + messageQueue.size());
} else {
System.out.println("消息處理中心內(nèi)沒有消息可供消費(fèi)!");
}
System.out.println("==============================");
return msg;
}
}
有了消息處理中心類后,需要將該類的功能暴露出去,這樣別人才能夠用它來發(fā)送和接收消息。我們定義了BrokerServer類用來對外提供Broker類的服務(wù)。
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class BrokerServer implements Runnable{
public static int SERVICE_PORT = 9999;
private final Socket socket;
public BrokerServer(Socket socket){
this.socket = socket;
}
@Override
public void run(){
try(
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream()))
{
while (true){
String str = in.readLine();
if (str == null){
continue;
}
System.out.println("接收到原始數(shù)據(jù): " + str);
if (str.equals("CONSUME")){
String message = Broker.consume();
out.println(message);
out.flush();
}else {
Broker.produce(str);
}
}
} catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception{
ServerSocket server = new ServerSocket(SERVICE_PORT);
while(true){
BrokerServer brokerServer = new BrokerServer(server.accept());
new Thread(brokerServer).start();
}
}
}
在java中設(shè)計(jì)服務(wù)其功能的軟件一般少不了套接字(Socket)和線程(Thread),需要通過線程的方式將應(yīng)用啟動起來,而服務(wù)器和應(yīng)用的客戶端需要用Socket進(jìn)行網(wǎng)絡(luò)通信。
(2)客戶端訪問
有了消息處理中心服務(wù)器后,自然需要相應(yīng)客戶端來與之通信,來發(fā)送和接收消息。
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
public class MyClient {
public static void produce(String message) throws Exception{
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try(
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
out.println(message);
out.flush();
}
}
public static String consume() throws Exception{
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try(
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
out.println("CONSUME");
out.flush();
String message = in.readLine();
return message;
}
}
}
以上是通用的客戶端訪問代碼,接下來分別看一個生產(chǎn)消息和消費(fèi)消息的示例:
生產(chǎn)消息:
public class ProduceClient {
public static void main(String[] args) throws Exception{
MyClient client = new MyClient();
client.produce("hello World.");
}
}
消費(fèi)消息:
public class ConsumeClient {
public static void main(String[] args) throws Exception{
MyClient client = new MyClient();
String message = client.consume();
System.out.println("獲得的消息為: " + message);
}
}
(3)運(yùn)行效果
開啟BrokerServer服務(wù)
生產(chǎn)消息:ProduceClient
消費(fèi)消息:ConsumeClient
以上就是動力節(jié)點(diǎn)小編介紹的"Java實(shí)現(xiàn)消息隊(duì)列的簡單方法",希望對大家有幫助,想了解更多可查看Java教程。動力節(jié)點(diǎn)在線學(xué)習(xí)教程,針對沒有任何Java基礎(chǔ)的讀者學(xué)習(xí),讓你從入門到精通,主要介紹了一些Java基礎(chǔ)的核心知識,讓同學(xué)們更好更方便的學(xué)習(xí)和了解Java編程,感興趣的同學(xué)可以關(guān)注一下。
相關(guān)閱讀
初級 202925
初級 203221
初級 202629
初級 203743