websocket集群部署遇到的一些事

最近刚好有个场景,业务处理一份报告需要关注实时处理的进度。

本来打算使用前端轮训方式,但是考虑到这样效率比较低,也无法精确知道处理进度,就想到用websocket和前端实时交互,进度有更新就通知前端,避免了无用的空轮训请求。

websocket通过session链接,和前端保持链接,将客户的信息存储在内存中,用户每个请求都有一个唯一的uuid,后续通过uuid查找对应客户的session发送websocket。

系统架构图

流程图

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author 
 * @since 2024/8/26
 */
@Configuration
@EnableAutoConfiguration
@EnableWebSocket
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}


import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author 
 * @since 2024/8/26
 */
@ServerEndpoint(value = "/websocket/progress")
@Component
@Slf4j
public class WebSocketServer {

    //静态变量,记录当前在线连接数
    private static int onlineCount = 0;
    //存储客户端对应的WebSocket对象
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet();
    //与某个客户端的连接会话
    private Session session;
    //客户端sid
    private String sid = "";

    //建立连接成功后调用的方法
    @OnOpen
    public void onOpen(Session session) {
        //获取当前会话
        this.session = session;

        //存储WebSocket对象
        String sid = getUuid();
        //客户端标识
        this.sid = sid;
        webSocketSet.add(this);
        //在线连接计数
        addOnlineCount();
        try {
            //调用服务端发送信息方法
            sendMessage("连接成功" + this.sid);
        } catch (Exception e) {
            log.error("IO异常:{} ", e);
        }
    }

    private String getUuid() {
        IdWorker idWorker = new IdWorker();
        long l = idWorker.nextId();
        return String.valueOf(l);
    }

    @OnClose
    public void onClose() {
        //移除WebSocket对象
        webSocketSet.remove(this);
        subOnlineCount();
    }

    @OnMessage
    public void onMessage(String message) {
        log.info("WebSocketServer 收到message{}", this.sid);
    }

    @OnError
    public void onError(Throwable error) {
        log.error("onError,{}", error);
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    public static void sendInfo(String message, String sid) {
        if (sid == null) {
            return;
        }
        for (WebSocketServer item : webSocketSet) {
            try {
                if (item.sid.equals(sid)) {
                    //发给单一用户
                    item.sendMessage(message);
                }
            } catch (IOException e) {
                log.error("发送websocket失败{}", e);
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount += 1;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount -= 1;
    }
}

和前端本地测试的好好的,一点毛病没有,以为万事大吉了。

结果一发布到测试环境,gg了,进度条时好时坏,一开始还没查出来问题,没有往集群的方面想,还以为是哪里不小心自动关闭了websocket,查了n久之后,突然顿悟了,有集群啊,多台服务器,前端链接之后,session是保持了,但是一开始链接的可能是服务器A,后续的请求可能就到服务器B了,那就拿不到客户的session,就无法发送消息了。

找到问题,就得想解决方法了

方法一 固定ip请求,用nginx的ip_hash负载均衡请求,可惜行不通,问了运维那边,ip是动态了,搞不定。

方法二 改网关的负载均衡策略,因为原来的负载均衡策略是均衡分发,改到源地址策略,就会固定ip请求到同一服务器了,和nginx ip_hash类似,但是也行不通,不能改策略。

方法三 最后只剩这个方法,通过rabbitmq广播各个服务器,根据服务器本身存储的session信息(uuid),判断这个消息时候需要处理。

customer自己发送消息,自己监听,发送mq的时候会自动在头尾带上",以及会对字符串中的“引号进行转义,加上\,所以都要进行替换


import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 
 * @since 2024/9/19
 */
@Component
public class WebsocketMqCustomer {

    @RabbitListener(queues = "#{psQueue.name}")
    public void pubsubMqMsg(String message) throws IOException {
        String[] xxxx = message.replace("\\", "").split(";;;");
        WebSocketServer.sendInfo(xxxx[1].substring(0, xxxx[1].length() - 1).replace("\\", ""), xxxx[0].substring(1));
    }

    public static void sendInfo(String message, String sid) {
        if (StringUtils.isNotEmpty(sid)) {
            RabbitTemplate rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class);
            rabbitTemplate.convertAndSend(MainRabbitMqCreateConfig.EXCHANGE_WEBSOCKET, sid, sid + ";;;" + message);
        }
    }
}

还遇见了一个问题,@ServerEndpoint注解添加的类无法通过@Autowired直接注入,因为websocket处理是基于Servlet的,而Servlet容器并不像Spring mvc那样自动管理bean的依赖注入,可以通过Application来获取其他bean。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/885855.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

视频集成与融合项目中需要视频编码,但是分辨率不兼容怎么办?

在众多视频整合项目中&#xff0c;一个显著的趋势是融合多元化的视频资源&#xff0c;以实现统一监管与灵活调度。这一需求促使项目团队不断探索新的集成方案&#xff0c;确保不同来源的视频流能够无缝对接&#xff0c;共同服务于统一的调看与管理平台&#xff0c;进而提升整体…

MobaXterm基本使用 -- 服务器状态、批量操作、显示/切换中文字体、修复zsh按键失灵

监控服务器资源 参考网址&#xff1a;https://www.cnblogs.com/144823836yj/p/12126314.html 显示效果 MobaXterm提供有这项功能&#xff0c;在会话窗口底部&#xff0c;显示服务器资源使用情况 如内存、CPU、网速、磁盘使用等&#xff1a; &#xff08;完整窗口&#xff0…

GAMES101(17~18节,物理材质模型)

材质 BRDF 材质&#xff1a;决定了光线与物体不同的作用方式 BRDF定义了物体材质,包含漫反射和镜面部分 BSDF &#xff08;scattering散射&#xff09; BRDF&#xff08;reflect反射&#xff09; BTDF 光线打击到物体上会向四面八方散射 反射 光线打击到物体上反射出去…

MATLAB案例 | Copula的密度函数和分布函数图

本文介绍各种类型&#xff08;Gaussian、t、Gumbel、Clayton、Frank&#xff09;Copula的密度函数和分布函数图的绘制 完整代码 clc close all clear%% ********************计算Copula的密度函数和分布函数图************************ [Udata,Vdata] meshgrid(linspace(0,1…

C#自定义工具类-数组工具类

目录 数组工具类基本操作 1.排序&#xff1a;升序&#xff0c;降序 2.查找 1&#xff09;查找最值&#xff1a;最大值&#xff0c;最小值 2&#xff09;查找满足条件的单个对象 3&#xff09;查找满足条件的所有对象 4&#xff09;选取数组中所有对象的某一字段 完整代…

查缺补漏----程序查询方式和中断方式计算题

1.程序查询方式 总结下来就是&#xff1a; 必须在外设传输完端口大小的数据时访问端口&#xff0c;以防止数据未被及时读出而丢失。 占CPU总时间&#xff1a;就是某段时间内设备用了多少时钟周期/PCU有多少个时钟周期 CPU的时钟周期数&#xff1a;就看主频&#xff0c;主频表示…

大数据开发--1.1大数据概论

目录 一.大数据的概念 什么是大数据&#xff1f; 二. 大数据的特点 三. 大数据应用场景 四. 大数据分析业务步骤 大数据分析的业务流程&#xff1a; 五.大数据职业规划 职业方向 岗位技术要求 六. 大数据学习路线 一.大数据的概念 什么是大数据&#xff1f; 数据 世界…

Spring Boot技术:构建高效网上购物平台

第3章 系统分析 3.1 可行性分析 在系统开发之初要进行系统可行分析&#xff0c;这样做的目的就是使用最小成本解决最大问题&#xff0c;一旦程序开发满足用户需要&#xff0c;带来的好处也是很多的。下面我们将从技术上、操作上、经济上等方面来考虑这个系统到底值不值得开发。…

车辆重识别(注意力 U-Net:学习在哪些区域寻找胰腺)论文阅读2024/10/01

什么是注意力机制&#xff1f; 什么是加性注意力&#xff1f; 大致说一下流程&#xff1a; 对于一张特征图来说&#xff0c;对于这张图中的每一个像素向量&#xff08;例如a&#xff09;&#xff0c;计算该向量与所有像素向量的相似度&#xff0c;对这些相似度进行激活函数…

【重学 MySQL】四十五、数据库的创建、修改与删除

【重学 MySQL】四十五、数据库的创建、修改与删除 一条数据存储的过程数据输入数据验证数据处理数据存储数据持久化反馈与日志注意事项 标识符命名规则基本规则长度限制保留字与特殊字符命名建议示例 MySQL 中的数据类型创建数据库创建数据库时指定字符集和排序规则 查看数据库…

数据库重建索引的作用?

重建索引是数据库管理中的一个重要操作&#xff0c;主要用于优化数据库性能和提高查询效率。以下是重建索引的一些主要用途&#xff1a; 提高查询性能&#xff1a;随着时间的推移&#xff0c;数据的插入、更新和删除会导致索引碎片化&#xff0c;重建索引可以减少碎片&#xf…

DNS with libevent

DNS with libevent: high-level and low-level functionality libevent提供了少量用于解析DNS名字的API&#xff0c;以及用于实现简单DNS服务器的机制。 我们从用于名字查询的高层机制开始介绍&#xff0c;然后介绍底层机制和服务器机制。 Portable blocking name resolution…

15年408计算机网络

第一题&#xff1a; 解析&#xff1a; 接收方使用POP3向邮件服务器读取邮件&#xff0c;使用的TCP连接&#xff0c;TCP向上层提供的是面向连接的&#xff0c;可靠的数据传输服务。 第二题&#xff1a; 解析&#xff1a;物理层-不归零编码和曼彻斯特编码 编码1&#xff1a;电平在…

CSS中字体图标的使用

引言&#xff1a; 在网页设计当中&#xff0c;会有很多很简洁的图标&#xff0c;比如箭头&#xff0c;照相机&#xff0c;放大镜等 这些大概率都是使用字体图标来完成的&#xff0c;因为字体图标比较简洁高效&#xff0c;不会像图片一样需要向浏览器请求数据。那么字体图标该…

网络协议详解--IPv6

IPv6产生背景 &#xff08;1&#xff09;地址空间的耗尽&#xff1a;因特网呈指数级发展&#xff0c;导致IPv4地址空间几乎耗尽。虽然采用了子网划分、CIDR和NAT地址转换技术&#xff0c;但这没有从根源解决地址耗尽的问题 &#xff08;2&#xff09;IP层安全需求的增长&#x…

Oracle exadata存储节点更换内存操作及报错处理

1.报错信息 在进行Oracle exadata巡检时&#xff0c;发现cell节点有一根内存报错&#xff0c;报错信息如下&#xff1a; 报错内存位置为&#xff1a;CPU1 P1/D2槽位 报错内存信息&#xff1a; 根据报错信息确认内存PN号、大小等息&#xff0c;并将信息反馈公司&#xff0c;及…

Hadoop框架及应用场景说明

Hadoop是一个开源的分布式系统基础架构。由多个组件组成&#xff0c;组件之间协同工作&#xff0c;进行大规模数据集的存储和处理。 本文将探讨Hadoop的架构以及应用场景。 一Hadoop框架 Hadoop的核心组件包含&#xff1a; 1. Hadoop分布式文件系统&#xff08;HDFS&#xff…

Qt --- 常用控件的介绍---Widget属性介绍

一、控件概述 编程&#xff0c;讲究的是站在巨人的肩膀上&#xff0c;而不是从头发明轮子。一个图形化界面上的内容&#xff0c;不需要咱们全都从零区实现&#xff0c;Qt中已经提供了很多内置的控件了&#xff08;按钮&#xff0c;文本框&#xff0c;单选按钮&#xff0c;复选…

【Java SE 题库】移除元素(暴力解法)--力扣

&#x1f525;博客主页&#x1f525;&#xff1a;【 坊钰_CSDN博客 】 欢迎各位点赞&#x1f44d;评论✍收藏⭐ 目录 1. 题目 2. 解法(快慢“指针”) 3. 源码 4. 小结 1. 题目 给你一个数组 nums 和一个值 val&#xff0c;你需要原地移除所有数值等于 val 的元素。元素的顺…

C0007.Clion中添加ui文件及运行的完整步骤

1.创建ui文件 选择Ui文件目录&#xff0c;右击&#xff0c;打开Qt Designer&#xff1b; 创建完成后&#xff0c;保存ui界面&#xff0c;并且命名为test.ui&#xff1b; 2.新建头文件test.h 在include目录中&#xff0c;新建头文件&#xff0c;文件名为test.h 3.新建test.…