前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >十一. websocker服务与ws请求头拦截器

十一. websocker服务与ws请求头拦截器

作者头像
用户8988577
发布2022-12-27 15:59:41
1.1K0
发布2022-12-27 15:59:41
举报
文章被收录于专栏:言云纪

这两个月有点累,我就不做代码解释了。直接上代码!另外这篇文章代码有一些待优化的地方,如下

  1. 服务逻辑类的switch块待改成反射
  2. 房主退出连接应该将房间权限赋给其他人

ws配置

WebSocketConfigurer.java

代码语言:javascript
复制
package cc.tanblog.online.config;

import cc.tanblog.online.socket.MyWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import javax.annotation.Resource;

/**
 * @Description
 * @Author Ocean_IT
 * @Email 14312400@qq.com
 * @Data 2022/8/16 9:42
 */

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    /**
     * 注入拦截器
     */
    @Resource
    private MyHandshakeInterceptor myHandshakeInterceptor;


    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {

        webSocketHandlerRegistry
                //添加myHandler消息处理对象,和websocket访问地址
                .addHandler(myHandler(), "/ws")
                //设置允许跨域访问
                .setAllowedOrigins("*")
                //添加拦截器可实现用户链接前进行权限校验等操作
                .addInterceptors(myHandshakeInterceptor);
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyWebSocketHandler();

    }

}

ws请求头拦截器

MyHandshakeInterceptor.java

代码语言:javascript
复制
package cc.tanblog.online.config;

import cc.tanblog.online.dao.CreateRoomDao;
import cc.tanblog.online.dao.UserDao;
import cc.tanblog.online.dao.VideoroomDao;
import cc.tanblog.online.entity.User;
import cc.tanblog.online.entity.Videoroom;
import cc.tanblog.online.util.TokenUtils;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;


import java.util.List;
import java.util.Map;

/**
 * @Description
 * @Author Ocean_IT
 * @Email 14312400@qq.com
 * @Data 2022/8/16 14:31
 */

@Component
public class MyHandshakeInterceptor implements HandshakeInterceptor {
    private static final Log logger = LogFactory.getLog(GlobalExceptionHandler.class);
    @Autowired
    private UserDao userDao;
    @Autowired
    private VideoroomDao videoroomDao;
    /**
     * 握手之前,若返回false,则不建立链接 *
     *
     * @param request
     * @param response
     * @param wsHandler
     * @param attributes
     * @return
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse
            response, WebSocketHandler wsHandler, Map<String, Object> attributes) {

        //将用户id放入socket处理器的会话(WebSocketSession)中
        ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;

        try{
            //获取参数
            User user = userDao.get_user(TokenUtils.getOpenid(serverHttpRequest.getServletRequest().getHeader("token")));
            attributes.put("md5_openid",user.getOpenid());
            attributes.put("uName",user.getUName());
        }catch(Exception e){
            logger.error("ws链接token认证失败");
            return false;
        }

        List<Videoroom> videoroom = videoroomDao.get_videoroom(Long.valueOf(serverHttpRequest.getServletRequest().getParameter("vrid")));
        // 如果查询结果总数等于0则房间号验证失败
        if(videoroom.size() == 0){
            logger.error("ws链接未查询到房间");
            return false;
        }
        attributes.put("vr_id", videoroom.get(0).getVrId().toString());
        logger.info("开始握手");
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse
            response, WebSocketHandler wsHandler, Exception exception) {
        logger.info("握手完成");
    }
}

websocket服务逻辑

MyWebSocketHandler.java

代码语言:javascript
复制
package cc.tanblog.online.socket;


import cc.tanblog.online.dao.CreateRoomDao;
import cc.tanblog.online.dao.UserDao;
import cc.tanblog.online.entity.CreateRoom;
import cc.tanblog.online.entity.SocketClient;
import cc.tanblog.online.service.SocketService;
import cc.tanblog.online.util.SocketUtils;
import cn.hutool.json.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Description
 * @Author Ocean_IT
 * @Email 14312400@qq.com
 * @Data 2022/8/16 14:34
 */

public class MyWebSocketHandler extends TextWebSocketHandler {
    @Autowired
    private SocketService socketService;
    @Autowired
    private UserDao userDao;
    @Autowired
    private CreateRoomDao createRoomDao;

    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static AtomicInteger onlineNum = new AtomicInteger();
    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
    private static ConcurrentHashMap<String, WebSocketSession> sessionPools = new ConcurrentHashMap<>();
    // 房间人数记录
    public static HashMap roomNum = new HashMap();
    @Override
    public void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
    //TODO: 接收消息
        SocketClient socketClient = JSONUtil.toBean(message.getPayload(), SocketClient.class);
        String md5_openid = session.getAttributes().get("md5_openid").toString();
        String vrid = session.getAttributes().get("vr_id").toString();
        String msg ="";
        switch (socketClient.getType()){
            case "BARRAGE" :
                //XXX:{"type":"BARRAGE","msg":"弹幕消息"}
                msg = SocketUtils.BARRAGE_MSG(session.getAttributes().get("md5_openid").toString(),userDao.get_user(md5_openid),socketClient.getMsg());
                sendMessageToAll_room(msg,session.getAttributes().get("vr_id").toString());
                break;
            case "PROGRESS":
                //XXX: {"type":"PROGRESS","num":"进度条数字"}
                List<CreateRoom> createRoomList = createRoomDao.get_createRoomList(md5_openid);
                if(createRoomList.size() == 0){
                    msg = SocketUtils.DEFAULT_MSG();
                    session.sendMessage(new TextMessage(msg));
                }else if(createRoomList.get(0).getVrId().toString().equals(vrid)){
                    System.out.println("----------");
                    msg = SocketUtils.PROGRESS_MSG(session.getAttributes().get("md5_openid").toString(),socketClient.getNum().toString());
//                    sendMessageToAll_room(msg,session.getAttributes().get("vr_id").toString());
                    sendMessageToAll_roomDelMy(msg,session.getAttributes().get("vr_id").toString(),session);
                }
                break;
            case "PING":
                //xxx:{"type":"PING"}
                msg = SocketUtils.PONG_MSG();
                session.sendMessage(new TextMessage(msg));
                break;
            default:
                msg = SocketUtils.DEFAULT_MSG();
                session.sendMessage(new TextMessage(msg));
                break;
        }

    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 用户首次建立连接
        String md5_openid = session.getAttributes().get("md5_openid").toString();
        //关闭重复连接
        if(sessionPools.get(md5_openid)!=null){
            sessionPools.get(md5_openid).close();
        }
        sessionPools.put(md5_openid, session);
        String room = session.getAttributes().get("vr_id").toString();
        addOnlineCount(room);
        // 发送房间信息(私发)
        session.sendMessage(new TextMessage(SocketUtils.SYSTEM_MSG(socketService.get_videoroomInfo(Long.valueOf(room)),userDao.get_user(md5_openid))));
        // 房间欢迎广播(指定房间,需要获取用户信息)
        sendMessageToAll_room(
                SocketUtils.WELCOME_MSG("欢迎 " + session.getAttributes().get("uName").toString() + "加入房间!",userDao.get_user(md5_openid),roomNum.get(room).toString()),
                session.getAttributes().get("vr_id").toString()
        );
    }


    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
            throws Exception {
        System.out.println("断开连接!");
        sessionPools.remove(session.getAttributes().get("md5_openid").toString());
        String room = session.getAttributes().get("vr_id").toString();
        subOnlineCount(room);
        String msg = session.getAttributes().get("uName").toString() + "已退出房间!";
        sendMessageToAll_room(SocketUtils.QUIT_MSG(msg,roomNum.get(room).toString()),session.getAttributes().get("vr_id").toString());
    }

    /**
     * 添加链接人数
     */
    public static void addOnlineCount(String room) {
        onlineNum.incrementAndGet();
        if(roomNum.containsKey(room)){
            roomNum.put(room,Integer.parseInt(String.valueOf(roomNum.get(room)))+1);
        }else{
            roomNum.put(room,1);
        }
    }

    /**
     * 移除链接人数
     */
    public static void subOnlineCount(String room) {
        onlineNum.decrementAndGet();
        roomNum.put(room,Integer.parseInt(String.valueOf(roomNum.get(room)))-1);
    }

    /**
     * 指定聊天室群发
     */
    private static void sendMessageToAll_room(String msg,String vrId) {
        sessionPools.forEach((id, session) -> {
            try {
                if(session.getAttributes().get("vr_id").toString().equals(vrId)){
                    session.sendMessage(new TextMessage(msg));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 指定聊天室群发
     */
    private static void sendMessageToAll_roomDelMy(String msg,String vrId,WebSocketSession session1) {
        sessionPools.forEach((id, session) -> {
            try {
                if(session.getAttributes().get("vr_id").toString().equals(vrId)&&session1.getAttributes().get("md5_openid").equals(session.getAttributes().get("md5_openid"))!=true){
                    session.sendMessage(new TextMessage(msg));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

}

websocket静态方法工具类

SocketUtils.java

代码语言:javascript
复制
package cc.tanblog.online.util;

import cc.tanblog.online.entity.User;
import cn.hutool.json.JSONUtil;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description
 * @Author Ocean_IT
 * @Email 14312400@qq.com
 * @Data 2022/10/8 20:12
 */

public class SocketUtils {
    //    系统消息
    public static String SYSTEM_MSG(Object data, User user){
        Map<String,Object> map = new HashMap<>();
        map.put("type","SYSTEM");
        map.put("data",data);
        map.put("userinfo",user);
        return JSONUtil.toJsonStr(map);
    }
    //  欢迎消息
    public static String WELCOME_MSG(String msg, User user,String roomNum){
        Map<String,Object> map = new HashMap<>();
        map.put("type","WELCOME");
        map.put("msg",msg);
        map.put("userinfo",user);
        map.put("roomPersonCount",roomNum);
        return JSONUtil.toJsonStr(map);
    }
    // 退出房间消息
    public static String QUIT_MSG(String msg,String roomNum){
        Map<String,Object> map = new HashMap<>();
        map.put("type","QUIT");
        map.put("msg",msg);
        map.put("roomPersonCount",roomNum);
        return JSONUtil.toJsonStr(map);
    }
    //  弹幕消息
    public static String BARRAGE_MSG(String to,User user,String msg){
        Map<String,Object> map = new HashMap<>();
        map.put("type","BARRAGE");
        map.put("msg",msg);
        map.put("to",to);
        map.put("userinfo",user);
        return JSONUtil.toJsonStr(map);
    }
    //    进度条
    public static String PROGRESS_MSG(String to,String num){
        Map<String,Object> map = new HashMap<>();
        map.put("type","PROGRESS");
        map.put("to",to);
        map.put("num",num);
        return JSONUtil.toJsonStr(map);
    }
    // 心跳包
    public static String PONG_MSG(){
        Map<String,Object> map = new HashMap<>();
        map.put("type","PONG");
        return JSONUtil.toJsonStr(map);
    }
    // 默认
    public static  String DEFAULT_MSG(){
        Map<String,Object> map = new HashMap<>();
        map.put("type","ERROR");
        map.put("msg","你发送的请求错误!");
        return JSONUtil.toJsonStr(map);
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-11-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ws配置
  • ws请求头拦截器
  • websocket服务逻辑
  • websocket静态方法工具类
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档