Skip to main content

Netty+springboot实现IM即时通讯服务端

📣 📣 📣 📢📢📢 ☀️☀️ 你好啊!小伙伴,我是小冷。是一个兴趣驱动自学练习两年半的的 Java 工程师。 📒 一位十分喜欢将知识分享出来的 Java 博主 ⭐️⭐️⭐️,擅长使用 Java 技术开发 web 项目和工具 📒 文章内容丰富:覆盖大部分 java 必学技术栈,前端,计算机基础,容器等方面的文章 📒 如果你也对 Java 感兴趣,关注小冷吧,一起探索 Java 技术的生态与进步,一起讨论 Java 技术的使用与学习 ✏️ 高质量技术专栏专栏链接: 微服务数据结构netty单点登录SSMSpringCloudAlibaba😝 公众号 😝想全栈的小冷,分享一些技术上的文章,以及解决问题的经验 ⏩当前专栏Netty 实战系列专栏代码地址: Netty 练手项目仓库地址

IM 即时通讯系统

复用 web-im 开源项目的前端代码

地址: https://giuhub.com/javanf/web-im

使用时 安装 node 启动服务端

我们重写的时候只需要修改app.vue 中的 WebSocket 的连接地址即可

image-20220420133353118

修改之后 用 npm run dev 启动项目即可,同时也将服务端启动

此时我们只需要发送一个信息

image-20220420133804750

就可以看到前端传给我们的数据格式了

image-20220420133821948

数据分析

  1. 此时我们将 websocket demo 整合到了 springboot 中 确保客户端和服务端可以正常的通信
  2. 分析客户端的数据结构 根据不同的逻辑返回对应的数据 “ 数据是启动项目的第一步”

当前回传的功能分析

  1. 创建昵称登录
  2. 登陆后可以查看在线用户 和与已存在的群组
  3. 可以和其他用户一对一聊天
  4. 可以创建群组和加入群组 让后发送消息 可以一对多聊天

处理方式区别

按照处理方式的不同 可以分为操作类别 (操作用户 操作群组等) 消息类别 (一对一 一对多)

请求逻辑划分

可以分为

【用户登录】(创造链接)、【用户注销】(断开连接)

【创建群组】【加入群组】

【发送消息】(消息内部划分 私聊 &群聊)

数据模型设计

  1. 用户 : 昵称 nickname 和 id
  2. 群组 : 群组 id 群组名称 name 用户列表
  3. 消息(可以设计单独模型)

此时接续分析我们客户端发送给我们的 msg

msg:{"uid":"web_im_1650432464367","type":1,"nickname":"1","bridge":[],"groupId":""}

bridge : 【uid , otheruid】 不为空代表一对一消息 uid 发送给 other Uid 的消息,

为空代表一对多消息 需要 groupId

此时我们还需要考虑连接类型 从客户端 server/index.js 中就可以发现

      // 创建连接
case 1:
......
// 注销
case 2:
......
// 创建群
case 10:
.......
// 加入群
case 20:
......
// 发送消息
default:
......

由此可以分析出 消息 type 状态码的几种类型

type类型 :

1 : 创建连接

2 : 断开连接

10 : 创建群

20:加入群

默认(100):发消息

接口设计

数据模型

只展示字段 GETTER/SETTER 等自行添加

用户模型

public class UserModel {
private String uid;
private String nickName;
//状态 1 在线 0 离线
private int status;
}

群组模型

public class GroupModel {
private String id;
private String name;
//用户群组
private List<UserModel> users;
}

请求模型

@Data
public class ReqModel {
//请求类型
private int type;
//用户 id
private String uid;
//用户昵称
private String nickname;
//群组id
private String groupId;
//群组名称
private String groupName;
//消息内容
private String msg;
//群发列表
private List<String> bridge;
}

请求结果

@Data
public class RespModel {
//响应类型
private int type;
//日期
private String date;
//用户 id
private String uid;
//用户昵称
private String nickname;
//状态
private int status;
//返回联系人和群组列表
private List<UserModel> users;
private List<GroupModel> groups;
private String groupId;

//消息内容
private String msg;
//群发列表
private List<String> bridge;
}

列表存储

//存放本地数据
public class LocalData {
//存储连接的通道 分发消息使用
public static final List<UserModel> userlist = new ArrayList<>();
//存储通道和用户id的映射关系 用来获取消息通知的通道
public static final Map<String, Channel> channelUserUrl = new HashMap<>();
//在线用户列表
public static final List<GroupModel> grouplist = new ArrayList<>();
//群组列表
public static final ChannelGroup channellist = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

//拿到存储连接通道列表中的通道
public static List<Channel> getAllChannels() {
List<Channel> channels = new ArrayList<>();
Iterator<Channel> iterator = channellist.iterator();
while (iterator.hasNext()) {
Channel channel = iterator.next();
channels.add(channel);
}
return channels;
}

//通过群组id 取出对应的群组
public static GroupModel getGroupById(String id) {
for (GroupModel groupModel : grouplist) {
if (groupModel.getId().equals(id)) {
return groupModel;
}
}
return null;
}
}

用户上线 下线逻辑

根据前端客户端传回来的格式 我们可以设计出四个模型和定义请求响应逻辑

{
"uid": "web_im_1650112539438",
"type": 1,
"nickname": "冷环渊",
"bridge": [],
"groupId": ""
}
设置枚举处理类型

请求类型

@Getter
public enum ReqType {
//枚举内容
CONN(1, "建立连接"),
CANCEL(2, "断开连接"),
ADD_GROUP(10, "创建群组"),
JOIN_GROUP(20, "加入群组"),
SEND_MSG(100, "发送消息");

//编号
private int num;
//信息
private String desc;

ReqType(int num, String desc) {
this.num = num;
this.desc = desc;
}

// 增加一个根据数值遍历枚举类型
public static ReqType getTypeByNum(int num) {
ReqType[] reqTypes = ReqType.values();
for (ReqType reqType : reqTypes) {
if (num == reqType.getNum()) {
return reqType;
}
}
return ReqType.SEND_MSG;
}
}

响应类型

@Getter
public enum RespType {
//处理分类
OPERA(1, "操作类处理"),
MSG(2, "消息类处理");
//编号
private int num;
//信息
private String desc;

RespType(int num, String desc) {
this.num = num;
this.desc = desc;
}
}

service 与 实现类

接口

public interface ChatService {
//新增用户
void addUser(ReqModel reqModel, RespModel respModel);

//用户下线
void delUser(ReqModel reqModel, RespModel respModel);

//新增群组
void addGroup(ReqModel reqModel, RespModel respModel);

//加入群组
void joinGroup(ReqModel reqModel, RespModel respModel);

//发送群组消息
void sendGroupMsg(ReqModel reqModel, RespModel respModel);

//细聊
void sendPrivateMsg(ReqModel reqModel, RespModel respModel);
}


用户上线提示

    @Override
public void addUser(ReqModel reqModel, RespModel respModel) {
respModel.setMsg(reqModel.getNickname() + " : 加入聊天室");
UserModel userModel = new UserModel(reqModel.getUid(), reqModel.getNickname(), 1);
LocalData.userlist.add(userModel);
respModel.setUsers(LocalData.userlist);
respModel.setGroups(LocalData.grouplist);
}

用户下线提示

    @Override
public void delUser(ReqModel reqModel, RespModel respModel) {
respModel.setMsg(reqModel.getNickname() + " : 离开聊天室");
UserModel userModel = null;
//遍历在线用户的列表
for (int i = 0; i < LocalData.userlist.size(); i++) {
UserModel temp = LocalData.userlist.get(i);
if (temp.getUid().equals(reqModel.getUid())) {
userModel = temp;
break;
}
}
//在用户列表删除要下线的用户
LocalData.userlist.remove(userModel);
respModel.setUsers(LocalData.userlist);
respModel.setGroups(LocalData.grouplist);
}

通道处理器

    @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("msg : " + msg.text());
// 获取请求数据 解析json形式
ReqModel model = new Gson().fromJson(msg.text(), ReqModel.class);
RespModel respModel = new RespModel();
//获取当前时间
LocalDateTime now = LocalDateTime.now();
String date = now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
respModel.setDate(date);
respModel.setUid(model.getUid());
respModel.setNickname(model.getNickname());
//色湖之bridge 初始值 要默认空值
List<String> defaultList = new ArrayList<>();
respModel.setBridge(defaultList);
// 默认类型
respModel.setType(RespType.OPERA.getNum());
//判断请求类型
ReqType type = ReqType.getTypeByNum(model.getType());
switch (type) {
case CONN:
System.out.println(model.getNickname() + ": 用户上线了");
//记录并返回在线用户列表 以及已经创建的群组列表
chatService.addUser(model, respModel);
break;
case CANCEL:
System.out.println(model.getNickname() + ": 用户下线了");
chatService.delUser(model, respModel);
break;
case ADD_GROUP:
break;
case JOIN_GROUP:
break;
case SEND_MSG:
respModel.setType(RespType.MSG.getNum());
break;
default:
}

System.out.println(new Gson().toJson(respModel));
List<Channel> channels = LocalData.getAllChannels();
notifyChannels(channels, respModel);

}
//分发信息
private void notifyChannels(List<Channel> channels, RespModel respModel) {
for (Channel channel : channels) {
TextWebSocketFrame resp = new TextWebSocketFrame(new Gson().toJson(respModel));
channel.writeAndFlush(resp);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//下线
LocalData.channellist.remove(ctx.channel());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//上线
LocalData.channellist.add(ctx.channel());
}

响应效果

msg : {"uid":"web_im_1650112539438","type":1,"nickname":"冷环渊","bridge":[],"groupId":""}

冷环渊: 用户上线了

{"type":1,"date":"2022-04-22 23:36:35","uid":"web_im_1650112539438","nickname":"冷环渊","status":0,

"users":[
{"uid":"web_im_1650619948362","nickName":"222","status":1},
{"uid":"web_im_1650112539438","nickName":"冷环渊","status":1}
],

"groups":[],"msg":"冷环渊 : 加入聊天室","bridge":[]}

创建群聊

msg : {
"uid":"web_im_1650112539438","type":10,"nickname":"冷环渊","groupName":"就将计就计","bridge":[]
}
   case ADD_GROUP:
System.out.println(model.getNickname() + "用户创建了群组" + model.getGroupName());
chatService.addGroup(model, respModel);
break;

当时判断到枚举类型到创建群组的时候,我们就需要在 localdata 的群组集合中加入一个新建的群组并且将创建的用户加入到群组中

实现 chatService 接口中的新增群组方法

    @Override
public void addGroup(ReqModel reqModel, RespModel respModel) {
respModel.setMsg(reqModel.getNickname() + ":创建了群 :" + reqModel.getGroupName());
// 把创建者加入到群组的成员列表中
UserModel self = new UserModel(reqModel.getUid(), reqModel.getNickname());
List<UserModel> users = new ArrayList<>();
users.add(self);
//此处 开源项目设计的是 UID由客户端创建 GroupID由服务端创建。思考 一般来说id都是服务端创建的
String groupId = "group_" + reqModel.getUid() + "_" + reqModel.getGroupName();
GroupModel groupModel = new GroupModel(groupId, reqModel.getGroupName(), users);
LocalData.grouplist.add(groupModel);
respModel.setGroups(LocalData.grouplist);
}

在之后 点击创建群组就可以发现 群组新增成功

群聊操作

加入群聊

加入群聊的思路也是类似的

        case JOIN_GROUP:
System.out.println(model.getNickname() + "用户加入了群组" + model.getGroupName());
chatService.joinGroup(model, respModel);
break;

当请求的类型是加入群组的时候 我们需要将当前用户加入到对应的群组 users 中

实现 chatService 接口中的新增群组方法

   @Override
public void joinGroup(ReqModel reqModel, RespModel respModel) {
respModel.setMsg(reqModel.getNickname() + ":加入了群 :" + reqModel.getGroupName());
for (GroupModel groupModel : LocalData.grouplist) {
if (groupModel.getId().equals(reqModel.getGroupId())) {
UserModel self = new UserModel(reqModel.getUid(), reqModel.getNickname());
groupModel.getUsers().add(self);
}
}
respModel.setGroups(LocalData.grouplist);
}

信息发送处理

思路

首先根据请求的类型我们发现,客户端发起信息处理请求的时候 是依靠 bridgeGroupid来判断是群聊还是私聊的

        case SEND_MSG:
//识别响应类型 消息类型是更改
respModel.setType(RespType.MSG.getNum());

//判断一对一消息还是一对多消息
if (model.getBridge().size() == 0) {
// 一对多
chatService.sendGroupMsg(model, respModel);
} else {
chatService.sendPrivateMsg(model, respModel);
}
break;

实现对应的接口方法

    //发送群组消息
@Override
public void sendGroupMsg(ReqModel reqModel, RespModel respModel) {
respModel.setMsg(reqModel.getMsg());
respModel.setGroupId(reqModel.getGroupId());
respModel.setStatus(1);
}

//发送一对一消息
@Override
public void sendPrivateMsg(ReqModel reqModel, RespModel respModel) {
respModel.setMsg(reqModel.getMsg());
respModel.setBridge(reqModel.getBridge());
respModel.setGroupId("");
respModel.setStatus(1);
}

发送通道处理

一对一

一对一的时候 bridge 数组的 第 0 位 就是我我们自身 第一位 就是我们需要发送消息的人,

接下来只需要根据用户的 id 来获取到对应的通道,之后创建集合 使用分发方法

        //    根据一对一 或者一对多的类型来找到接受通知的用户
if (model.getBridge().size() > 0) {
// 代表一对一,只需要通知自身和需要接受消息的用户
String selfId = model.getBridge().get(0);
Channel selfChannel = LocalData.channelUserUrl.get(selfId);

//接受信息的通道
String otherId = model.getBridge().get(1);
Channel otherChannel = LocalData.channelUserUrl.get(otherId);

List<Channel> channels = new ArrayList<>();
channels.add(selfChannel);
channels.add(otherChannel);
notifyChannels(channels, respModel);
return;
}

一对多

通过群 id 来获取群对象 之后遍历群的 user 表 根据用户 id 来获取通道 分发

//    一对多群组消息
List<Channel> channels = new ArrayList<>();
// 通过群id来找到群对象 获取用户列表 根据列表uid 获取对应的通道
GroupModel groupModel = LocalData.getGroupById(model.getGroupId());
for (UserModel userModel : groupModel.getUsers()) {
Channel channel = LocalData.channelUserUrl.get(userModel.getUid());
channels.add(channel);
}
notifyChannels(channels, respModel);

可以改经的点

使用 bridge 作为一对一或者一对多的判断比较繁琐,可以通过状态码来判断:

type 200 代表一对多

type 100 代表私聊

WebSocket 协议处理器(最后整合的部分)

// 泛型 代表的是处理数据的单位
// TextWebSocketFrame 是文本信息帧
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Autowired
private ChatService chatService;

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("msg : " + msg.text());
// 获取请求数据 解析json形式
ReqModel model = new Gson().fromJson(msg.text(), ReqModel.class);
RespModel respModel = new RespModel();

//设置用户信息
respModel.setUid(model.getUid());
respModel.setNickname(model.getNickname());

//获取当前时间
LocalDateTime now = LocalDateTime.now();
String date = now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
respModel.setDate(date);

//bridge 初始值 要默认空值
List<String> defaultList = new ArrayList<>();
respModel.setBridge(defaultList);
// 默认类型
respModel.setType(RespType.OPERA.getNum());
//判断请求类型
ReqType type = ReqType.getTypeByNum(model.getType());
switch (type) {
case CONN:
System.out.println(model.getNickname() + ": 用户上线了");
//记录并返回在线用户列表 以及已经创建的群组列表
//记录用户和通道的关联关系
LocalData.channelUserUrl.put(model.getUid(), ctx.channel());
chatService.addUser(model, respModel);
break;
case CANCEL:
System.out.println(model.getNickname() + ": 用户下线了");
LocalData.channelUserUrl.remove(LocalData.channelUserUrl.get(model.getUid()));
chatService.delUser(model, respModel);
break;
case ADD_GROUP:
System.out.println(model.getNickname() + "用户创建了群组" + model.getGroupName());
chatService.addGroup(model, respModel);
break;
case JOIN_GROUP:
System.out.println(model.getNickname() + "用户加入了群组" + model.getGroupName());
chatService.joinGroup(model, respModel);
break;
case SEND_MSG:
//识别响应类型 消息类型是更改
respModel.setType(RespType.MSG.getNum());

//判断一对一消息还是一对多消息
if (model.getBridge().size() == 0) {
// 一对多
chatService.sendGroupMsg(model, respModel);
} else {
chatService.sendPrivateMsg(model, respModel);
}
break;
default:
}

System.out.println(new Gson().toJson(respModel));
if (respModel.getType() == RespType.OPERA.getNum()) {
List<Channel> channels = LocalData.getAllChannels();
notifyChannels(channels, respModel);
return;
}
// 根据一对一 或者一对多的类型来找到接受通知的用户
if (model.getBridge().size() > 0) {
// 代表一对一,只需要通知自身和需要接受消息的用户
String selfId = model.getBridge().get(0);
Channel selfChannel = LocalData.channelUserUrl.get(selfId);

//接受信息的通道
String otherId = model.getBridge().get(1);
Channel otherChannel = LocalData.channelUserUrl.get(otherId);

List<Channel> channels = new ArrayList<>();
channels.add(selfChannel);
channels.add(otherChannel);
notifyChannels(channels, respModel);
return;
}
// 一对多群组消息
List<Channel> channels = new ArrayList<>();
// 通过群id来找到群对象 获取用户列表 根据列表uid 获取对应的通道
GroupModel groupModel = LocalData.getGroupById(model.getGroupId());
for (UserModel userModel : groupModel.getUsers()) {
Channel channel = LocalData.channelUserUrl.get(userModel.getUid());
channels.add(channel);
}
notifyChannels(channels, respModel);
}

private void notifyChannels(List<Channel> channels, RespModel respModel) {
for (Channel channel : channels) {
TextWebSocketFrame resp = new TextWebSocketFrame(new Gson().toJson(respModel));
channel.writeAndFlush(resp);
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//下线
LocalData.channellist.remove(ctx.channel());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//上线
LocalData.channellist.add(ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}

总结至此 IM 即时通讯系统完结

  1. 使用 http 协议 整合 Netty 和 springBoot 实现项目后端
  2. 学习阅读开源项目的源码和数据交换格式来设计数据模型和接口开发思路
  3. 熟悉 netty 操作 服务器编写思路