您现在的位置是:主页 > news > 开发一个微信小程序价格/宁波seo哪家好快速推广

开发一个微信小程序价格/宁波seo哪家好快速推广

admin2025/6/14 13:27:52news

简介开发一个微信小程序价格,宁波seo哪家好快速推广,网页制作教程免费下载,国内最好的在线网站建设简单消息队列的实现 wj振藩 分类专栏: 消息中间件 分布式消息中间件实践 在消息队列的完整使用场景中至少包含三个角色: 消息处理中心:负责消息的接收、存储、转发等生产者: 负责产生和发送消息到消息处理中心负责从消息处理中心获取消息…

开发一个微信小程序价格,宁波seo哪家好快速推广,网页制作教程免费下载,国内最好的在线网站建设简单消息队列的实现 wj振藩 分类专栏: 消息中间件 分布式消息中间件实践 在消息队列的完整使用场景中至少包含三个角色: 消息处理中心:负责消息的接收、存储、转发等生产者: 负责产生和发送消息到消息处理中心负责从消息处理中心获取消息…

简单消息队列的实现

wj振藩

分类专栏: 消息中间件

分布式消息中间件实践
在消息队列的完整使用场景中至少包含三个角色:

  • 消息处理中心:负责消息的接收、存储、转发等
  • 生产者: 负责产生和发送消息到消息处理中心
  • 负责从消息处理中心获取消息,并进行相应的处理

先看消息处理中心的代码:
处理中心类 Broker.java

import java.util.concurrent.ArrayBlockingQueue;public class Broker {//队列储存消息的最大容量private final static int MAX_SIZE = 3;//保存消息的数据容器private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);//生产消息public static void produce(String msg){if(messageQueue.offer(msg)){System.out.println("向队列添加了消息:"+msg);}else{System.out.println("队列已满");}}//消费消息public static String consume(){String msg = messageQueue.poll();if(msg != null){System.out.println("取出了队列的消息:"+msg+";队列剩余消息数:"+messageQueue.size());}else{System.out.println("队列是空的");}return msg;}
}

把处理中心暴露给外部访问的服务类 BrokerServer.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;public class BrokerServer implements Runnable {public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerServer(Socket socket){this.socket = socket;}@Overridepublic void run() {try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())){while(true){String str = in.readLine();if(str==null){continue;}System.out.println("接收到数据:"+str);if(str.equals("CONSUME")){String msg = Broker.consume();out.println(msg);out.flush();}else{Broker.produce(str);}}}catch (IOException e) {e.printStackTrace();}}
}

Main.java 启动服务

import java.io.IOException;
import java.net.ServerSocket;public class Main {public static void main(String[] args) throws IOException {ServerSocket socket = new ServerSocket(BrokerServer.SERVICE_PORT);while(true){BrokerServer server = new BrokerServer(socket.accept());new Thread(server).start();}}
}

 

然后看与之通信进行发送和就收消息的客户端:
MqClient.java 与消息处理中心之间通信的客户端类

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;public class MqClient {private final static int SERVER_PORT = 9999;//生产消息public static void produce(String msg) throws IOException {Socket socket = new Socket(InetAddress.getLocalHost(),SERVER_PORT);try(PrintWriter out = new PrintWriter(socket.getOutputStream())){out.println(msg);out.flush();}}//消费消息public static String consume() throws IOException {Socket socket = new Socket(InetAddress.getLocalHost(),SERVER_PORT);try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())){//先向消息队列发送字符串"CONSUME"表示消费out.println("CONSUME");out.flush();//再从消息队列中获取一条消息String msg = in.readLine();return  msg;}}
}

 

Main.java 启动客户端,进行发送和消费操作

import java.io.IOException;
import java.util.Scanner;public class Main {public static void main(String[] args) throws IOException {System.out.println("请输入1或2选择写入还是取出:");System.out.println("1.写入消息  2.消费消息");int in;int i = 1;while(( in = new Scanner(System.in).nextInt())!=-1){if(in==1){//写入消息MqClient mqClient = new MqClient();mqClient.produce("Hello world-"+i);i++;}else if(in==2){MqClient mqClient = new MqClient();String msg = mqClient.consume();System.out.println("获取的消息是:"+msg);}else{System.out.println("请输入正确的选项");}System.out.println("请输入1或2选择写入还是取出:");System.out.println("1.写入消息  2.消费消息");}}
}