java实现简单的消息队列
目录
java实现简单的消息队列
设计一个简单的消息队列
简单的消息中间件
代码部分
消息中间件
package com.test;
import java.util.concurrent.ArrayBlockingQueue;
public class Broker {
//队列存储消息的最大数量
private final static int MAX_SIZE = 3;
//保存消息数据的容器
private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
//生产消息
public static void produce(String msg){
System.out.println("[消息中间件]=============================================");
if(messageQueue.offer(msg)){
System.out.println("[消息中间件]收到消息:" + msg + ",暂存消息:" + messageQueue.size() + "条。");
}else{
System.out.println("[消息中间件]消息已满!");
}
System.out.println("[消息中间件]=============================================");
}
//消费消息
public static String consume(){
System.out.println("[消息中间件]=============================================");
String msg = messageQueue.poll();
if(msg != null){
System.out.println("[消息中间件]消费消息:" + msg + ",暂存消息:" + messageQueue.size() + "条。");
}else{
System.out.println("[消息中间件]消息已空!");
}
System.out.println("[消息中间件]=============================================");
return msg;
}
}
消息中间件服务器
package com.test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class BrokerServer implements Runnable{
public static int SERVICE_PORT = 9999;
private final Socket socket;
public BrokerServer(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream());
while (socket.getInputStream().available() != 0){
String str = in.readLine();
if(str == null){
continue;
}
System.out.println("[服务器]收到消息:" + str);
if("CONSUME".equals(str)){
String message = Broker.consume();
out.println(message);
out.flush();
}else{
Broker.produce(str);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(SERVICE_PORT);
while (true){
BrokerServer brokerServer = new BrokerServer(server.accept());
new Thread(brokerServer).start();
}
}
}
消息中间件客户端
package com.test;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
public class MqClient {
public static void produce(String message) throws Exception {
Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
PrintWriter out = new PrintWriter(socket.getOutputStream());
out.println(message);
out.flush();
}
public static String consume() throws Exception {
Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream());
out.println("CONSUME");
out.flush();
String message = in.readLine();
return message;
}
}
生产者客户端
package com.test;
public class ProduceClient {
public static void main(String[] args) throws Exception {
MqClient mqClient = new MqClient();
mqClient.produce("Linux is very much!");
}
}
消费者客户端
package com.test;
public class ConsumeClient {
public static void main(String[] args) throws Exception {
MqClient mqClient = new MqClient();
String message = mqClient.consume();
System.out.println("消费消息:" + message);
}
}
演示效果
生产消息
消息中间件
生产者
消费消息
消息中间件
消息者
过度生产消息
消息中间件
过度消费消息
消息中间件
消费者
总结
本文采取一个由数组结构组成的有界阻塞队列 ArrayBlockingQueue,套接字通讯,自定义简单的消息中间件。