欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > TeamTalk消息服务器学习

TeamTalk消息服务器学习

2024/10/24 19:20:49 来源:https://blog.csdn.net/COOL_jack/article/details/141689418  浏览:    关键词:TeamTalk消息服务器学习

msg_server发送消息

信令

//service id  0x0003
message IMMsgData{//cmd id:		0x0301required uint32 from_user_id = 1;				//消息发送方required uint32 to_session_id = 2;				//消息接受方required uint32 msg_id = 3; // 非常重要:由谁产生?答:redis具体见下文required uint32 create_time = 4; required IM.BaseDefine.MsgType msg_type = 5; // 单聊或者群聊required bytes msg_data = 6;optional bytes attach_data = 20;
}message IMMsgDataAck{//cmd id:		0x0302required uint32 user_id = 1;			//发送此信令的用户idrequired uint32 session_id = 2;				required uint32 msg_id = 3;required IM.BaseDefine.SessionType session_type = 4;
}

流程图:
请添加图片描述

客户端A(葡萄)发送消息给客户端B(香蕉),信令为CID_MSG_DATA,msg_server收到后调用 CMsgConn::_HandleClientMsgData 函数

void CMsgConn::HandlePdu(CImPdu* pPdu)
{// request authorization checkif (pPdu->GetCommandId() != CID_LOGIN_REQ_USERLOGIN && !IsOpen() && IsKickOff()) {log("HandlePdu, wrong msg. ");throw CPduException(pPdu->GetServiceId(), pPdu->GetCommandId(), ERROR_CODE_WRONG_SERVICE_ID, "HandlePdu error, user not login. ");return;}switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑 case CID_MSG_DATA:_HandleClientMsgData(pPdu);break;case CID_MSG_DATA_ACK:_HandleClientMsgDataAck(pPdu);break;// ...... 省略无关逻辑 default:log("wrong msg, cmd id=%d, user id=%u. ", pPdu->GetCommandId(), GetUserId());break;}
}
void CMsgConn::_HandleClientMsgData(CImPdu* pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));if (msg.msg_data().length() == 0) {log("discard an empty message, uid=%u ", GetUserId());return;}if (m_msg_cnt_per_sec >= MAX_MSG_CNT_PER_SECOND) {log("!!!too much msg cnt in one second, uid=%u ", GetUserId());return;}if (msg.from_user_id() == msg.to_session_id() && CHECK_MSG_TYPE_SINGLE(msg.msg_type())){log("!!!from_user_id == to_user_id. ");return;}m_msg_cnt_per_sec++;uint32_t to_session_id = msg.to_session_id();uint32_t msg_id = msg.msg_id();uint8_t msg_type = msg.msg_type();string msg_data = msg.msg_data();if (g_log_msg_toggle) {log("HandleClientMsgData, %d->%d, msg_type=%u, msg_id=%u. ", GetUserId(), to_session_id, msg_type, msg_id);}uint32_t cur_time = time(NULL);CDbAttachData attach_data(ATTACH_TYPE_HANDLE, m_handle, 0);msg.set_from_user_id(GetUserId());msg.set_create_time(cur_time);msg.set_attach_data(attach_data.GetBuffer(), attach_data.GetLength());pPdu->SetPBMsg(&msg);// send to DB storage serverCDBServConn* pDbConn = get_db_serv_conn();if (pDbConn) {pDbConn->SendPdu(pPdu);}
}

该函数直接将数据包转发给db_proxy_server,db_proxy_server有一个map来映射信令所对应的处理函数。db_proxy_server收到信令为CID_MSG_DATA,后调用 DB_PROXY::sendMessage。

该函数主要做:

  1. 创建会话ID,并且两个会话ID独立
nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
  1. 创建关系ID
uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);
  1. 生成消息ID (msg_id),写入消息到数据库
nMsgId = pMsgModel->getMsgId(nRelateId);
if(nMsgId != INVALID_VALUE)
{pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());CSessionModel::getInstance()->updateSession(nSessionId, nNow);CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);
}

msg_id:两个人之间的映射关系。如果按照时间排列,两个客户端之间的时间可能不一样,所以按照序号生成消息id。

  1. 每条消息id唯一
  2. 使用redis 生成消息 id
uint32_t CMessageModel::getMsgId(uint32_t nRelateId)
{uint32_t nMsgId = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if(pCacheConn){string strKey = "msg_id_" + int2string(nRelateId);nMsgId = pCacheConn->incrBy(strKey, 1);pCacheManager->RelCacheConn(pCacheConn);}return nMsgId;
}
  1. db_proxy_server 回复 msg_server

// message content
m_handler_map.insert(make_pair(uint32_t(CID_MSG_DATA), DB_PROXY::sendMessage));void sendMessage(CImPdu* pPdu, uint32_t conn_uuid){IM::Message::IMMsgData msg;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){uint32_t nFromId = msg.from_user_id();uint32_t nToId = msg.to_session_id();uint32_t nCreateTime = msg.create_time();IM::BaseDefine::MsgType nMsgType = msg.msg_type();uint32_t nMsgLen = msg.msg_data().length();uint32_t nNow = (uint32_t)time(NULL);if (IM::BaseDefine::MsgType_IsValid(nMsgType)){if(nMsgLen != 0){CImPdu* pPduResp = new CImPdu;uint32_t nMsgId = INVALID_VALUE;uint32_t nSessionId = INVALID_VALUE;uint32_t nPeerSessionId = INVALID_VALUE;CMessageModel* pMsgModel = CMessageModel::getInstance();CGroupMessageModel* pGroupMsgModel = CGroupMessageModel::getInstance();if(nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_TEXT) {// ...... 省略无关逻辑 } else if (nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_AUDIO) {// ...... 省略无关逻辑 } else if(nMsgType== IM::BaseDefine::MSG_TYPE_SINGLE_TEXT) {if (nFromId != nToId) {nSessionId = CSessionModel::getInstance()->getSessionId(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);if (INVALID_VALUE == nSessionId) {// 创建会话nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);}nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);if(INVALID_VALUE ==  nPeerSessionId){// 创建会话关系IDnSessionId = CSessionModel::getInstance()->addSession(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE);}uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);if(nSessionId != INVALID_VALUE && nRelateId != INVALID_VALUE){nMsgId = pMsgModel->getMsgId(nRelateId);if(nMsgId != INVALID_VALUE){// 写入消息到数据库pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());CSessionModel::getInstance()->updateSession(nSessionId, nNow);CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);}else{log("msgId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);}}else{log("sessionId or relateId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);}}else{log("send msg to self. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);}} else if(nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO) {// ...... 省略无关逻辑 }log("fromId=%u, toId=%u, type=%u, msgId=%u, sessionId=%u", nFromId, nToId, nMsgType, nMsgId, nSessionId);msg.set_msg_id(nMsgId);pPduResp->SetPBMsg(&msg);pPduResp->SetSeqNum(pPdu->GetSeqNum());pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_DATA);CProxyConn::AddResponsePdu(conn_uuid, pPduResp);}else{log("msgLen error. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);}}else{log("invalid msgType.fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);}}else{log("parse pb failed");}}

msg_server收到CID_MSG_DATA信令后调用 CDBServConn::_HandleMsgData 函数

void CDBServConn::HandlePdu(CImPdu* pPdu)
{switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑 case CID_MSG_DATA:_HandleMsgData(pPdu);break;// ...... 省略无关逻辑 default:log("db server, wrong cmd id=%d ", pPdu->GetCommandId());}
}

CDBServConn::_HandleMsgData 处理消息有三点

  1. 首先 ack 客户端
  2. 然后发送到route_server广播
  3. 如果有多端登录,也要广播给其他客户端
void CDBServConn::_HandleMsgData(CImPdu *pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));if (CHECK_MSG_TYPE_GROUP(msg.msg_type())) {s_group_chat->HandleGroupMessage(pPdu);return;}uint32_t from_user_id = msg.from_user_id();uint32_t to_user_id = msg.to_session_id();uint32_t msg_id = msg.msg_id();if (msg_id == 0) {log("HandleMsgData, write db failed, %u->%u.", from_user_id, to_user_id);return;}uint8_t msg_type = msg.msg_type();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());uint32_t handle = attach_data.GetHandle();log("HandleMsgData, from_user_id=%u, to_user_id=%u, msg_id=%u.", from_user_id, to_user_id, msg_id);CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(from_user_id, attach_data.GetHandle());if (pMsgConn){IM::Message::IMMsgDataAck msg2;msg2.set_user_id(from_user_id);msg2.set_msg_id(msg_id);msg2.set_session_id(to_user_id);msg2.set_session_type(::IM::BaseDefine::SESSION_TYPE_SINGLE);CImPdu pdu;pdu.SetPBMsg(&msg2);pdu.SetServiceId(SID_MSG);pdu.SetCommandId(CID_MSG_DATA_ACK);pdu.SetSeqNum(pPdu->GetSeqNum());pMsgConn->SendPdu(&pdu);}CRouteServConn* pRouteConn = get_route_serv_conn();if (pRouteConn) {pRouteConn->SendPdu(pPdu);}msg.clear_attach_data();pPdu->SetPBMsg(&msg);CImUser* pFromImUser = CImUserManager::GetInstance()->GetImUserById(from_user_id);CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(to_user_id);pPdu->SetSeqNum(0);if (pFromImUser) {pFromImUser->BroadcastClientMsgData(pPdu, msg_id, pMsgConn, from_user_id);}if (pToImUser) {pToImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);}IM::Server::IMGetDeviceTokenReq msg3;msg3.add_user_id(to_user_id);msg3.set_attach_data(pPdu->GetBodyData(), pPdu->GetBodyLength());CImPdu pdu2;pdu2.SetPBMsg(&msg3);pdu2.SetServiceId(SID_OTHER);pdu2.SetCommandId(CID_OTHER_GET_DEVICE_TOKEN_REQ);SendPdu(&pdu2);
}

route_server 收到 msg_server的消息转发,信令为CID_MSG_DATA,调用CRouteConn::_BroadcastMsg 广播函数。

void CRouteConn::HandlePdu(CImPdu* pPdu)
{switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑 case CID_MSG_DATA:case CID_SWITCH_P2P_CMD:case CID_MSG_READ_NOTIFY:case CID_OTHER_SERVER_KICK_USER:case CID_GROUP_CHANGE_MEMBER_NOTIFY:case CID_FILE_NOTIFY:case CID_BUDDY_LIST_REMOVE_SESSION_NOTIFY:_BroadcastMsg(pPdu, this);break;// ...... 省略无关逻辑 default:log("CRouteConn::HandlePdu, wrong cmd id: %d ", pPdu->GetCommandId());break;}
}

其实就是广播给所有的msg_server

void CRouteConn::_BroadcastMsg(CImPdu* pPdu, CRouteConn* pFromConn)
{ConnMap_t::iterator it;for (it = g_route_conn_map.begin(); it != g_route_conn_map.end(); it++) {CRouteConn* pRouteConn = (CRouteConn*)it->second;if (pRouteConn != pFromConn) {pRouteConn->SendPdu(pPdu);}}
}

所有的msg_server收到广播后再广播给所有的客户端

void CRouteServConn::_HandleMsgData(CImPdu* pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));if (CHECK_MSG_TYPE_GROUP(msg.msg_type())) {s_group_chat->HandleGroupMessageBroadcast(pPdu);return;}uint32_t from_user_id = msg.from_user_id();uint32_t to_user_id = msg.to_session_id();uint32_t msg_id = msg.msg_id();log("HandleMsgData, %u->%u, msg_id=%u. ", from_user_id, to_user_id, msg_id);CImUser* pFromImUser = CImUserManager::GetInstance()->GetImUserById(from_user_id);if (pFromImUser){pFromImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);}CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(to_user_id);if (pToImUser){pToImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);}
}

msg_server广播给所有的客户端有讲究,会将消息id等封装成 msg_ack_t 结构体塞入m_send_msg_list 发送列表,等到收到对端的CID_MSG_DATA_ACK,再将此msg_ack_t 结构体从发送列表中移除。其实就是业务层的ACK机制,避免丢消息。

typedef struct {uint32_t msg_id;uint32_t from_id;uint64_t timestamp;
} msg_ack_t; // 业务层的ackvoid CImUser::BroadcastClientMsgData(CImPdu* pPdu, uint32_t msg_id, CMsgConn* pFromConn, uint32_t from_id)
{for (map<uint32_t, CMsgConn*>::iterator it = m_conn_map.begin(); it != m_conn_map.end(); it++){CMsgConn* pConn = it->second;if (pConn != pFromConn) {pConn->SendPdu(pPdu);pConn->AddToSendList(msg_id, from_id);}}
}void CMsgConn::AddToSendList(uint32_t msg_id, uint32_t from_id)
{//log("AddSendMsg, seq_no=%u, from_id=%u ", seq_no, from_id);msg_ack_t msg;msg.msg_id = msg_id;msg.from_id = from_id;msg.timestamp = get_tick_count();m_send_msg_list.push_back(msg);g_down_msg_total_cnt++;
}

收到确认 CID_MSG_DATA_ACK信令后,移除该确认结构体

void CMsgConn::_HandleClientMsgDataAck(CImPdu* pPdu)
{IM::Message::IMMsgDataAck msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));IM::BaseDefine::SessionType session_type = msg.session_type();if (session_type == IM::BaseDefine::SESSION_TYPE_SINGLE){uint32_t msg_id = msg.msg_id();uint32_t session_id = msg.session_id();DelFromSendList(msg_id, session_id);}
}void CMsgConn::DelFromSendList(uint32_t msg_id, uint32_t from_id)
{//log("DelSendMsg, seq_no=%u, from_id=%u ", seq_no, from_id);for (list<msg_ack_t>::iterator it = m_send_msg_list.begin(); it != m_send_msg_list.end(); it++) {msg_ack_t msg = *it;if ( (msg.msg_id == msg_id) && (msg.from_id == from_id) ) {m_send_msg_list.erase(it);break;}}
}

至此,消息就算发送成功了。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com