这两个月有点累,我就不做代码解释了。直接上代码!另外这篇文章代码有一些待优化的地方,如下
WebSocketConfigurer.java
package cc.tanblog.online.config;
import cc.tanblog.online.socket.MyWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import javax.annotation.Resource;
/**
* @Description
* @Author Ocean_IT
* @Email 14312400@qq.com
* @Data 2022/8/16 9:42
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
/**
* 注入拦截器
*/
@Resource
private MyHandshakeInterceptor myHandshakeInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry
//添加myHandler消息处理对象,和websocket访问地址
.addHandler(myHandler(), "/ws")
//设置允许跨域访问
.setAllowedOrigins("*")
//添加拦截器可实现用户链接前进行权限校验等操作
.addInterceptors(myHandshakeInterceptor);
}
@Bean
public WebSocketHandler myHandler() {
return new MyWebSocketHandler();
}
}
MyHandshakeInterceptor.java
package cc.tanblog.online.config;
import cc.tanblog.online.dao.CreateRoomDao;
import cc.tanblog.online.dao.UserDao;
import cc.tanblog.online.dao.VideoroomDao;
import cc.tanblog.online.entity.User;
import cc.tanblog.online.entity.Videoroom;
import cc.tanblog.online.util.TokenUtils;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.List;
import java.util.Map;
/**
* @Description
* @Author Ocean_IT
* @Email 14312400@qq.com
* @Data 2022/8/16 14:31
*/
@Component
public class MyHandshakeInterceptor implements HandshakeInterceptor {
private static final Log logger = LogFactory.getLog(GlobalExceptionHandler.class);
@Autowired
private UserDao userDao;
@Autowired
private VideoroomDao videoroomDao;
/**
* 握手之前,若返回false,则不建立链接 *
*
* @param request
* @param response
* @param wsHandler
* @param attributes
* @return
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse
response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
//将用户id放入socket处理器的会话(WebSocketSession)中
ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
try{
//获取参数
User user = userDao.get_user(TokenUtils.getOpenid(serverHttpRequest.getServletRequest().getHeader("token")));
attributes.put("md5_openid",user.getOpenid());
attributes.put("uName",user.getUName());
}catch(Exception e){
logger.error("ws链接token认证失败");
return false;
}
List<Videoroom> videoroom = videoroomDao.get_videoroom(Long.valueOf(serverHttpRequest.getServletRequest().getParameter("vrid")));
// 如果查询结果总数等于0则房间号验证失败
if(videoroom.size() == 0){
logger.error("ws链接未查询到房间");
return false;
}
attributes.put("vr_id", videoroom.get(0).getVrId().toString());
logger.info("开始握手");
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse
response, WebSocketHandler wsHandler, Exception exception) {
logger.info("握手完成");
}
}
MyWebSocketHandler.java
package cc.tanblog.online.socket;
import cc.tanblog.online.dao.CreateRoomDao;
import cc.tanblog.online.dao.UserDao;
import cc.tanblog.online.entity.CreateRoom;
import cc.tanblog.online.entity.SocketClient;
import cc.tanblog.online.service.SocketService;
import cc.tanblog.online.util.SocketUtils;
import cn.hutool.json.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description
* @Author Ocean_IT
* @Email 14312400@qq.com
* @Data 2022/8/16 14:34
*/
public class MyWebSocketHandler extends TextWebSocketHandler {
@Autowired
private SocketService socketService;
@Autowired
private UserDao userDao;
@Autowired
private CreateRoomDao createRoomDao;
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static AtomicInteger onlineNum = new AtomicInteger();
//concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
private static ConcurrentHashMap<String, WebSocketSession> sessionPools = new ConcurrentHashMap<>();
// 房间人数记录
public static HashMap roomNum = new HashMap();
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
//TODO: 接收消息
SocketClient socketClient = JSONUtil.toBean(message.getPayload(), SocketClient.class);
String md5_openid = session.getAttributes().get("md5_openid").toString();
String vrid = session.getAttributes().get("vr_id").toString();
String msg ="";
switch (socketClient.getType()){
case "BARRAGE" :
//XXX:{"type":"BARRAGE","msg":"弹幕消息"}
msg = SocketUtils.BARRAGE_MSG(session.getAttributes().get("md5_openid").toString(),userDao.get_user(md5_openid),socketClient.getMsg());
sendMessageToAll_room(msg,session.getAttributes().get("vr_id").toString());
break;
case "PROGRESS":
//XXX: {"type":"PROGRESS","num":"进度条数字"}
List<CreateRoom> createRoomList = createRoomDao.get_createRoomList(md5_openid);
if(createRoomList.size() == 0){
msg = SocketUtils.DEFAULT_MSG();
session.sendMessage(new TextMessage(msg));
}else if(createRoomList.get(0).getVrId().toString().equals(vrid)){
System.out.println("----------");
msg = SocketUtils.PROGRESS_MSG(session.getAttributes().get("md5_openid").toString(),socketClient.getNum().toString());
// sendMessageToAll_room(msg,session.getAttributes().get("vr_id").toString());
sendMessageToAll_roomDelMy(msg,session.getAttributes().get("vr_id").toString(),session);
}
break;
case "PING":
//xxx:{"type":"PING"}
msg = SocketUtils.PONG_MSG();
session.sendMessage(new TextMessage(msg));
break;
default:
msg = SocketUtils.DEFAULT_MSG();
session.sendMessage(new TextMessage(msg));
break;
}
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 用户首次建立连接
String md5_openid = session.getAttributes().get("md5_openid").toString();
//关闭重复连接
if(sessionPools.get(md5_openid)!=null){
sessionPools.get(md5_openid).close();
}
sessionPools.put(md5_openid, session);
String room = session.getAttributes().get("vr_id").toString();
addOnlineCount(room);
// 发送房间信息(私发)
session.sendMessage(new TextMessage(SocketUtils.SYSTEM_MSG(socketService.get_videoroomInfo(Long.valueOf(room)),userDao.get_user(md5_openid))));
// 房间欢迎广播(指定房间,需要获取用户信息)
sendMessageToAll_room(
SocketUtils.WELCOME_MSG("欢迎 " + session.getAttributes().get("uName").toString() + "加入房间!",userDao.get_user(md5_openid),roomNum.get(room).toString()),
session.getAttributes().get("vr_id").toString()
);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
throws Exception {
System.out.println("断开连接!");
sessionPools.remove(session.getAttributes().get("md5_openid").toString());
String room = session.getAttributes().get("vr_id").toString();
subOnlineCount(room);
String msg = session.getAttributes().get("uName").toString() + "已退出房间!";
sendMessageToAll_room(SocketUtils.QUIT_MSG(msg,roomNum.get(room).toString()),session.getAttributes().get("vr_id").toString());
}
/**
* 添加链接人数
*/
public static void addOnlineCount(String room) {
onlineNum.incrementAndGet();
if(roomNum.containsKey(room)){
roomNum.put(room,Integer.parseInt(String.valueOf(roomNum.get(room)))+1);
}else{
roomNum.put(room,1);
}
}
/**
* 移除链接人数
*/
public static void subOnlineCount(String room) {
onlineNum.decrementAndGet();
roomNum.put(room,Integer.parseInt(String.valueOf(roomNum.get(room)))-1);
}
/**
* 指定聊天室群发
*/
private static void sendMessageToAll_room(String msg,String vrId) {
sessionPools.forEach((id, session) -> {
try {
if(session.getAttributes().get("vr_id").toString().equals(vrId)){
session.sendMessage(new TextMessage(msg));
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
/**
* 指定聊天室群发
*/
private static void sendMessageToAll_roomDelMy(String msg,String vrId,WebSocketSession session1) {
sessionPools.forEach((id, session) -> {
try {
if(session.getAttributes().get("vr_id").toString().equals(vrId)&&session1.getAttributes().get("md5_openid").equals(session.getAttributes().get("md5_openid"))!=true){
session.sendMessage(new TextMessage(msg));
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
SocketUtils.java
package cc.tanblog.online.util;
import cc.tanblog.online.entity.User;
import cn.hutool.json.JSONUtil;
import java.util.HashMap;
import java.util.Map;
/**
* @Description
* @Author Ocean_IT
* @Email 14312400@qq.com
* @Data 2022/10/8 20:12
*/
public class SocketUtils {
// 系统消息
public static String SYSTEM_MSG(Object data, User user){
Map<String,Object> map = new HashMap<>();
map.put("type","SYSTEM");
map.put("data",data);
map.put("userinfo",user);
return JSONUtil.toJsonStr(map);
}
// 欢迎消息
public static String WELCOME_MSG(String msg, User user,String roomNum){
Map<String,Object> map = new HashMap<>();
map.put("type","WELCOME");
map.put("msg",msg);
map.put("userinfo",user);
map.put("roomPersonCount",roomNum);
return JSONUtil.toJsonStr(map);
}
// 退出房间消息
public static String QUIT_MSG(String msg,String roomNum){
Map<String,Object> map = new HashMap<>();
map.put("type","QUIT");
map.put("msg",msg);
map.put("roomPersonCount",roomNum);
return JSONUtil.toJsonStr(map);
}
// 弹幕消息
public static String BARRAGE_MSG(String to,User user,String msg){
Map<String,Object> map = new HashMap<>();
map.put("type","BARRAGE");
map.put("msg",msg);
map.put("to",to);
map.put("userinfo",user);
return JSONUtil.toJsonStr(map);
}
// 进度条
public static String PROGRESS_MSG(String to,String num){
Map<String,Object> map = new HashMap<>();
map.put("type","PROGRESS");
map.put("to",to);
map.put("num",num);
return JSONUtil.toJsonStr(map);
}
// 心跳包
public static String PONG_MSG(){
Map<String,Object> map = new HashMap<>();
map.put("type","PONG");
return JSONUtil.toJsonStr(map);
}
// 默认
public static String DEFAULT_MSG(){
Map<String,Object> map = new HashMap<>();
map.put("type","ERROR");
map.put("msg","你发送的请求错误!");
return JSONUtil.toJsonStr(map);
}
}