摘要:
hanne1imServer2订阅redisChanne2imServer3订阅redisChanne3imServer4订阅redisChanne4业务方(webApi)端根据接收方的clientId后四位16进制与节点总数取模,定位到对应的redisChannel,进行redis->publish操作将消息定位到相应的imServer。每个imServer管理着对应的终端连接,当接收到r
前言
ImCore 是一款 .NETCore 下利用 WebSocket 实现的简易、高性能、集群即时通讯组件,支持点对点通讯、群聊通讯、上线下线事件消息等众多实用性功能。
rncc;16}创建MQClientInstance实例:1publicMQClientInstance(ClientConfigclientConfig,intinstanceIndex,Strin
开源地址:https://github.com/2881099/im ,求 star~~
快速开始
dotnet add package ImCore
<String,MQConsumerInner>entry=it.next();60MQConsumerInnerimpl=entry.getValue();61if(impl!=null
IM服务端
public void Configure(IApplicationBuilder app)
{
app.UseImServer(new ImServerOptions
{
Redis = new CSRedis.CSRedisClient("127.0.0.1:6379,poolsize=5"),
Servers = new[] { "127.0.0.1:6001" }, //集群配置
Server = "127.0.0.1:6001"
});
}
一套永远不需要迭代更新的IM服务端
回调处理?我们可以这样设定,所有用户的主动行为走业务方(webApi),imServer只负责即时消息推送。什么意思?用户A向好友B发送消息:客户端请求业务方(webApi)接口,由业务方(webAp
WebApi业务端
public void Configure(IApplicationBuilder app)
{
//...
ImHelper.Initialization(new ImClientOptions
{
Redis = new CSRedis.CSRedisClient("127.0.0.1:6379,poolsize=5"),
Servers = new[] { "127.0.0.1:6001" }
});
ImHelper.EventBus(
t => Console.WriteLine(t.clientId + "上线了"),
t => Console.WriteLine(t.clientId + "下线了"));
}
PrevConnectServer |
(clientId, string) |
在终端准备连接 WebSocket 前调用 |
SendMessage |
(发送者, 接收者, 消息内容, 是否回执) |
发送消息 |
GetClientListByOnline |
- |
返回所有在线clientId |
EventBus |
(上线委托, 离线委托) |
socket上线与下线事件 |
JoinChan |
(clientId, 频道名) |
加入 |
LeaveChan |
(clientId, 频道名) |
离开 |
GetChanClientList |
(频道名) |
获取群聊频道所有clientId |
GetChanList |
- |
获取所有群聊频道和在线人数 |
GetChanListByClientId |
(clientId) |
获取用户参与的所有群聊频道 |
GetChanOnline |
(频道名) |
获取群聊频道的在线人数 |
SendChanMessage |
(clientId, 频道名, 消息内容) |
发送群聊消息,所有在线的用户将收到消息 |
说明:clientId 应该与 webApi的用户id相同,或者有关联。
f(prev!=null){10instance=prev;11log.warn("ReturnedPreviousMQClientInstanceforclientId:[{}]",clientId
Html5终端
本方案支持集群分区,前端连接 websocket 前,应该先请求 webApi 获得地址(ImHelper.PrevConnectServer)。
24return"ThreadLocalIndex{"+25"threadLocalIndex="+threadLocalIndex.get()+26"}";27}28}通过Threa
运行示例
运行环境:.NETCore 2.1 + redis-server 2.8
oFromNameServer(topic,1000*3);23}24if(topicRouteData!=null){25TopicRouteDataold=this.topicRouteTable
下载Redis-x64-2.8.2402.zip,点击 start.bat 运行;
cd imServer && dotnet run
AULT_PRODUCER_GROUP产生冲突1publicstaticfinalStringDEFAULT_PRODUCER_GROUP="DEFAULT_PRODUCER";接下来实例化mQCli
cd web && dotnet run
alse;3privatebooleanhaveTopicRouterInfo=false;4privateList<MessageQueue>messageQueueList=newAr
打开多个浏览器,访问 http://127.0.0.1:5000 发送群消息

设计思路
imServer 是 websocket 服务中心,可部署多实例,按clientId分区管理socket连接;
QProducer.getProducerGroup(),this);12if(!registerOK){13this.serviceState=ServiceState.CREATE_JUST;14
webApi 或其他应用端,使用 ImHelper 调用相关方法(如:SendMessage、群聊相关方法);
stalready.",group);9returnfalse;10}1112returntrue;13}在MQClientInstance初始化时,会创建producerTable、consumer
消息发送利用了 redis 订阅发布技术。每个 imServer 订阅相应的频道,收到消息,指派 websocket 向终端(如浏览器)发送消息;
tione){48log.error("ScheduledTaskpersistAllConsumerOffsetexception",e);49}50}51},1000*10,this.client
1、可缓解并发推送消息过多的问题;
查:1privatebooleanisBrokerAddrExistInTopicRouteTable(finalStringaddr){2Iterator<Entry<String,To
2、可解决连接数过多的问题;
();30}3132log.info("theproducer[{}]startOK.sendMessageWithVIPChannel={}",this.defaultMQProducer.getP
客户端连接流程:client -> websocket -> imserver
motingClient进行更新,而此时的remotingClient也就是刚才创建的NettyRemotingClientNettyRemotingClient的updateNameServerAd
imserver 订阅消息:client <- imserver <- redis channel
==this.clientConfig.getNamesrvAddr()){3this.scheduledExecutorService.scheduleAtFixedRate(newRunnable
推送消息流程:web1 -> sendmsg方法 -> redis channel -> imserver
67Iterator<Entry<String,HashMap<Long,String>>>itBrokerTable=this.brokerAddrTable.e
imserver 充当消息转发,及维护连接中心,代码万年不变不需要重启维护;
tcher){4try{5traceDispatcher.start(this.getNamesrvAddr());6}catch(MQClientExceptione){7log.warn("tra
WebSocket
比较笨的办法是浏览器端使用websocket,其他端socket,这种混乱的设计非常难维护。
nnewThread(r,"NettyClientPublicExecutor_"+this.threadIndex.incrementAndGet());22}23});2425this.event
强烈建议所有端都使用websocket协议,adorid/ios/h5/小程序全部支持websocket客户端。
sponse返回,在这里接收到进行处理,封装成TopicRouteData在invokeSync方法中采用懒加载的方式,尝试获取已经建立好连接的Channel,若是没有,则需要通过bootstrap的
业务与通讯协议
im系统一般涉及【我的好友】、【我的群】、【历史消息】等等。。
SSAGE_WITH_VIP_CHANNEL_PROPERTY和vipChannelEnabled:决定是否使用VIP通道,即高优先级回到DefaultMQProducer的构造方法,其会创建Defa
那么,imServer与业务方(webApi)该保持何种关系呢?
tConfigclientConfig){4this.clientConfig=clientConfig;5topAddressing=newTopAddressing(MixAll.getWSAdd
用户A向好友B发送消息,分析一下:
取名称地址HttpTinyClient的httpGet方法:1staticpublicHttpResulthttpGet(Stringurl,List<String>headers,Lis
- 需要判断B是否为A好友;
- 需要判断A是否有权限;
- 等等。。
诸如此类业务判断会很复杂,我们试想一下,如果使用imServer做业务协议,它是不是会变成巨无霸难以维护?
returngetTopicRouteInfoFromNameServer(topic,timeoutMillis,true);4}56publicTopicRouteDatagetTopicRout
又比如获取历史聊天记录,难道客户端要先websocket.send("gethistory"),再在onmessage里定位回调处理?
efaultMQProducer(finalStringproducerGroup){6this(producerGroup,null);7}89publicDefaultMQProducer(fin
我们可以这样设定,所有用户的主动行为走业务方(webApi),imServer只负责即时消息推送。什么意思?
r()!=null){11this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());12
用户A向好友B发送消息:客户端请求业务方(webApi)接口,由业务方(webApi)后端向imServer发起推送请求,imServer收到指令后,向前端用户B的websocket发送数据,用户B收到了消息。
nnameSrvAddr;16}这里首先根据topAddressing的fetchNSAddr方法获取名称服务地址,若是获取到了,则判断是否需要更新名称服务列表以及原来的nameSrvAddrtopA
获取历史消息:客户端请求业务方(webApi)接口,返回json(历史消息)
verException",e);76}77}finally{78this.lockNamesrv.unlock();79}80}else{81log.warn("updateTopicRouteIn
回执:用户A如何知道消息发送状态(成功或失败或不在线)?imServer端向用户B发送消息时,把状态以消息的方式推给用户A即可(按上面的逻辑),具体请看源码吧。。。
数据。事件消息IM系统比较常用的有上线、下线,在imServer层才能准确捕捉事件,但业务代码就不合适在这上面编写了。采用redis发布订阅技术,将上线、下线等事件向指定频道发布,业务方(webApi
发送消息
采用 redis 轻量级的订阅发布功能,实现消息缓冲发送。
tance这里在实例化MQClientInstance时,并没有直接传入clientConfig,而是通过cloneClientConfig方法复制了一份,来保证安全性:1publicClientCo
集群分区
单个imServer实例支持多少个客户端连接,两千个没问题?
dr()){8this.mQClientAPIImpl.fetchNameServerAddr();9}10//Startrequest-responsechannel11this.mQClientA
如果在线用户有10万人,怎么办???
lishInfo(topic,topicRouteData);43publishInfo.setHaveTopicRouterInfo(true);44Iterator<Entry<Str
比如部署4个imServer:
rver订阅相应的频道,收到消息,指派websocket向终端(如浏览器)发送消息;1、可缓解并发推送消息过多的问题;2、可解决连接数过多的问题;客户端连接流程:client->websocke
imServer1 订阅 redisChanne1
imServer2 订阅 redisChanne2
imServer3 订阅 redisChanne3
imServer4 订阅 redisChanne4
Manager(){9}1011publicstaticMQClientManagergetInstance(){12returninstance;13}14}其中factoryTable是所有生产者
业务方(webApi)端根据接收方的clientId后四位16进制与节点总数取模,定位到对应的redisChannel,进行redis->publish操作将消息定位到相应的imServer。
his.brokerName=brokerName;13this.queueId=queueId;14}15}可以看到这是一个简单的pojo,其封装了topic,brokerName以及queueId
每个 imServer 管理着对应的终端连接,当接收到 redis 订阅消息后,向对应的终端连接推送数据。
r){7try{8if(this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS,TimeUnit.MILLISECONDS)){9try{10TopicRouteDa
事件消息
IM 系统比较常用的有上线、下线,在 imServer 层才能准确捕捉事件,但业务代码就不合适在这上面编写了。
,//集群配置Server="127.0.0.1:6001"});}一套永远不需要迭代更新的IM服务端WebApi业务端publicvoidConfigure(IApplicati
采用 redis 发布订阅技术,将上线、下线等事件向指定频道发布,业务方(webApi) 通过 ImHelper.EventBus 方法进行订阅捕捉。
27this.serviceState=ServiceState.RUNNING;28break;首先更改serviceState状态为START_FAILED,防止中途的失败checkConfig方

结束语
谢谢支持!
>topicList=newHashSet<String>();34//Consumer5{6Iterator<Entry<String,MQConsumerInner&
发表评论