目录

Java客户端调用Websocket服务端,基于Springboot

Java客户端调用Websocket服务端,基于Springboot

目录


WebSocket服务端

简单说一下WebSocket,本身就是一种基于TCP的有状态的双向通信协议,可以实现即时通讯、消息推送等需要长连接的业务场景。

创建Springboot工程,引入WebSocket依赖

本示例引入的pom依赖如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

配置WebSocketConfig

代码如下

package com.websocket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**

- WebSocket 配置类
- @author Neo
  */
  @Configuration
  public class WebSocketConfig {

      @Bean
      public ServerEndpointExporter serverEndpointExporter(){

          return new ServerEndpointExporter();

      }

  }

配置 WebSocketServer

代码如下

package com.websocket.server;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

/**

- WebSocket 服务类
- @author Neo
  */
  @Slf4j
  @ServerEndpoint(value = "/ws/server")
  @Component
  public class WebSocketServer {

      //存储每一个客户端会话信息的线程安全的集合
      private static final CopyOnWriteArraySet<Session> sessions = new CopyOnWriteArraySet<>();
      //使用线程安全的计数器,记录在线数
      private static final AtomicInteger onlineCount = new AtomicInteger(0);

      /**
       * 连接成功时调用的方法
       * @param session
       */
      @OnOpen
      public void onOpen(Session session) {
          //存储会话信息
          sessions.add(session);
          //计数+1
          int cnt = onlineCount.incrementAndGet();
          //打印日志
          log.info("有连接加入,当前连接数为:" + cnt);
          //给客户端发消息
          this.sendMessage(session, "连接成功");
      }

      /**
       * 连接关闭时调用的方法
       * @param session
       */
      @OnClose
      public void onClose(Session session) {
          //删除会话信息
          sessions.remove(session);
          //计数-1
          int cnt = onlineCount.decrementAndGet();
          //打印日志
          log.info("有连接关闭,当前连接数为:" + cnt);
      }

      /**
       * 收到客户端消息时调用的方法
       * @param message
       * @param session
       */
      @OnMessage
      public void onMessage(String message, Session session) {
          //打印日志
          log.info("来自客户端的消息:" + message);
          //给客户端发消息
          this.sendMessage(session, "收到消息,消息内容:" + message);
      }

      /**
       * 出现异常时调用的方法
       * @param session
       * @param error
       */
      @OnError
      public void onError(Session session, Throwable error) {
          //打印日志
          log.error("发生错误:Session ID:" + error.getMessage() + session.getId());
      }

      /**
       * 发送消息
       * @param session
       * @param message
       */
      public void sendMessage(Session session, String message) {
          try {
              //发送消息
              session.getBasicRemote().sendText("SID:::" + session.getId() + ":::" + message);
          } catch (IOException e) {
              //打印日志
              log.error("发送消息出错:" + e.getMessage());
          }
      }

      /**
       * 群发消息
       * 这个方法可以升级为消息推送的工具,给在线的客户端弹个广告啥的
       * @param message
       * @throws IOException
       */
      public void sendMessage(String message) throws IOException {
          for (Session session : sessions) {
              //判断连接是否开着
              if(session.isOpen()){
                  //一个一个发
                  this.sendMessage(session, message);
              }
          }
      }

      /**
       * 给指定的客户端发消息
       * 这个方法可以升级为即时通讯的工具,例如客户端A、客户端B、服务端WS
       * 首先,客户端要有身份id与WS的session进行绑定
       * 然后,A要给B发送消息,先获取B的身份id,然后将消息和B的身份id发给WS
       * WS收到A的消息后,拿着B的身份id去查询B与WS的会话信息,找到了,就把A的消息发送给B
       * @param sessionId
       * @param message
       * @throws IOException
       */
      public void sendMessage(String sessionId,String message) throws IOException {
          Session session = null;
          //遍历找会话信息
          for (Session s : sessions) {
              if(s.getId().equals(sessionId)){
                  session = s;
                  break;
              }
          }
          if(session!=null){
              //找到了,发消息
              this.sendMessage(session, message);
          } else{
              //打印日志
              log.warn("没有找到指定的会话:" + sessionId);
          }
      }

  }

到这里,WebSocket 的服务端就配置完成了。

Java 客户端

这里的 java 客户端,其实就是用 java 代码,去建立与 websocket 服务端连接的,java 工程

创建 Springboot 工程,引入 java 客户端依赖

pom 依赖如下:

<dependencies>
  <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <optional>true</optional>
  </dependency>
  <dependency>
  <groupId>org.java-websocket</groupId>
  <artifactId>Java-WebSocket</artifactId>
  <version>1.4.0</version>
  </dependency>
  <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
  </dependency>
  </dependencies>

配置 JavaClient

代码如下:

package com.websocket.javaclient;

import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.stereotype.Component;

import java.net.URI;
import java.net.URISyntaxException;

@Slf4j
@Component
public class JavaClient {

    /**
     * 获取客户端连接实例
     * @param uri
     * @return
     */
    public WebSocketClient getClient(String uri){

        try {

            //创建客户端连接对象
            WebSocketClient client = new WebSocketClient(new URI(uri),new Draft_6455()) {

                /**
                 * 建立连接调用
                 * @param serverHandshake
                 */
                @Override
                public void onOpen(ServerHandshake serverHandshake) {
                    log.info("建立连接");
                }

                /**
                 * 收到服务端消息调用
                 * @param s
                 */
                @Override
                public void onMessage(String s) {
                    log.info("收到来自服务端的消息:::" + s);
                }

                /**
                 * 断开连接调用
                 * @param i
                 * @param s
                 * @param b
                 */
                @Override
                public void onClose(int i, String s, boolean b) {
                    log.info("关闭连接:::" + "i = " + i + ":::s = " + s +":::b = " + b);
                }

                /**
                 * 连接报错调用
                 * @param e
                 */
                @Override
                public void onError(Exception e) {
                    log.error("报错了:::" + e.getMessage());
                }

            };

            //请求与服务端建立连接
            client.connect();

            //判断连接状态,0为请求中  1为已建立  其它值都是建立失败
            while(client.getReadyState().ordinal() == 0){

                try {

                    Thread.sleep(200);

                }catch (Exception e){

                    log.warn("延迟操作出现问题,但并不影响功能");

                }

                log.info("连接中。。。");

            }

            //连接状态不再是0请求中,判断建立结果是不是1已建立
            if (client.getReadyState().ordinal() == 1){

                return client;

            }

        }catch (URISyntaxException e){

            log.error(e.getMessage());
        }

        return null;
    }

}

创建连接

这里创建连接的方式采用了 web 接口访问的方式,其实可以直接用 main 函数跑,没办法,web 职业病

代码如下:

package com.websocket.controller;

import com.websocket.javaclient.JavaClient;
import org.java_websocket.client.WebSocketClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/jc")
public class JavaClientController {

    @Autowired
    private JavaClient javaClient;

    @GetMapping(value = "hello")
    public void hello(String str){

        WebSocketClient client = this.javaClient.getClient("ws://localhost:8080/ws/server");

        if (client != null){

            client.send(str);

        }

    }

}

测试效果

测试 1:

https://i-blog.csdnimg.cn/blog_migrate/bb6a613cc84bddbd04017cde2a6bdbcb.png

客户端:

https://i-blog.csdnimg.cn/blog_migrate/db76d0340bedd103a88686f8d86129a3.png

服务端:

https://i-blog.csdnimg.cn/blog_migrate/e4e66e01a69b37a9d2a0d9d63e0043da.png

测试 2:

https://i-blog.csdnimg.cn/blog_migrate/e9306ea4d33acaf131fe825c33b98e00.png

客户端:

https://i-blog.csdnimg.cn/blog_migrate/4966d862ddd17224ac02955640e41cef.png

服务端:

https://i-blog.csdnimg.cn/blog_migrate/7b0f8ea1d1f874cc2c22ebfae98fd152.png