msg_server发送消息
信令
//service id 0x0003
message IMMsgData{//cmd id: 0x0301required uint32 from_user_id = 1; //消息发送方required uint32 to_session_id = 2; //消息接受方required uint32 msg_id = 3; // 非常重要:由谁产生?答:redis具体见下文required uint32 create_time = 4; required IM.BaseDefine.MsgType msg_type = 5; // 单聊或者群聊required bytes msg_data = 6;optional bytes attach_data = 20;
}message IMMsgDataAck{//cmd id: 0x0302required uint32 user_id = 1; //发送此信令的用户idrequired uint32 session_id = 2; required uint32 msg_id = 3;required IM.BaseDefine.SessionType session_type = 4;
}
流程图:
客户端A(葡萄)发送消息给客户端B(香蕉),信令为CID_MSG_DATA,msg_server收到后调用 CMsgConn::_HandleClientMsgData 函数
void CMsgConn::HandlePdu(CImPdu* pPdu)
{// request authorization checkif (pPdu->GetCommandId() != CID_LOGIN_REQ_USERLOGIN && !IsOpen() && IsKickOff()) {log("HandlePdu, wrong msg. ");throw CPduException(pPdu->GetServiceId(), pPdu->GetCommandId(), ERROR_CODE_WRONG_SERVICE_ID, "HandlePdu error, user not login. ");return;}switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑 case CID_MSG_DATA:_HandleClientMsgData(pPdu);break;case CID_MSG_DATA_ACK:_HandleClientMsgDataAck(pPdu);break;// ...... 省略无关逻辑 default:log("wrong msg, cmd id=%d, user id=%u. ", pPdu->GetCommandId(), GetUserId());break;}
}
void CMsgConn::_HandleClientMsgData(CImPdu* pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));if (msg.msg_data().length() == 0) {log("discard an empty message, uid=%u ", GetUserId());return;}if (m_msg_cnt_per_sec >= MAX_MSG_CNT_PER_SECOND) {log("!!!too much msg cnt in one second, uid=%u ", GetUserId());return;}if (msg.from_user_id() == msg.to_session_id() && CHECK_MSG_TYPE_SINGLE(msg.msg_type())){log("!!!from_user_id == to_user_id. ");return;}m_msg_cnt_per_sec++;uint32_t to_session_id = msg.to_session_id();uint32_t msg_id = msg.msg_id();uint8_t msg_type = msg.msg_type();string msg_data = msg.msg_data();if (g_log_msg_toggle) {log("HandleClientMsgData, %d->%d, msg_type=%u, msg_id=%u. ", GetUserId(), to_session_id, msg_type, msg_id);}uint32_t cur_time = time(NULL);CDbAttachData attach_data(ATTACH_TYPE_HANDLE, m_handle, 0);msg.set_from_user_id(GetUserId());msg.set_create_time(cur_time);msg.set_attach_data(attach_data.GetBuffer(), attach_data.GetLength());pPdu->SetPBMsg(&msg);// send to DB storage serverCDBServConn* pDbConn = get_db_serv_conn();if (pDbConn) {pDbConn->SendPdu(pPdu);}
}
该函数直接将数据包转发给db_proxy_server,db_proxy_server有一个map来映射信令所对应的处理函数。db_proxy_server收到信令为CID_MSG_DATA,后调用 DB_PROXY::sendMessage。
该函数主要做:
- 创建会话ID,并且两个会话ID独立
nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
- 创建关系ID
uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);
- 生成消息ID (msg_id),写入消息到数据库
nMsgId = pMsgModel->getMsgId(nRelateId);
if(nMsgId != INVALID_VALUE)
{pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());CSessionModel::getInstance()->updateSession(nSessionId, nNow);CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);
}
msg_id:两个人之间的映射关系。如果按照时间排列,两个客户端之间的时间可能不一样,所以按照序号生成消息id。
- 每条消息id唯一
- 使用redis 生成消息 id
uint32_t CMessageModel::getMsgId(uint32_t nRelateId)
{uint32_t nMsgId = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if(pCacheConn){string strKey = "msg_id_" + int2string(nRelateId);nMsgId = pCacheConn->incrBy(strKey, 1);pCacheManager->RelCacheConn(pCacheConn);}return nMsgId;
}
- db_proxy_server 回复 msg_server
// message content
m_handler_map.insert(make_pair(uint32_t(CID_MSG_DATA), DB_PROXY::sendMessage));void sendMessage(CImPdu* pPdu, uint32_t conn_uuid){IM::Message::IMMsgData msg;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){uint32_t nFromId = msg.from_user_id();uint32_t nToId = msg.to_session_id();uint32_t nCreateTime = msg.create_time();IM::BaseDefine::MsgType nMsgType = msg.msg_type();uint32_t nMsgLen = msg.msg_data().length();uint32_t nNow = (uint32_t)time(NULL);if (IM::BaseDefine::MsgType_IsValid(nMsgType)){if(nMsgLen != 0){CImPdu* pPduResp = new CImPdu;uint32_t nMsgId = INVALID_VALUE;uint32_t nSessionId = INVALID_VALUE;uint32_t nPeerSessionId = INVALID_VALUE;CMessageModel* pMsgModel = CMessageModel::getInstance();CGroupMessageModel* pGroupMsgModel = CGroupMessageModel::getInstance();if(nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_TEXT) {// ...... 省略无关逻辑 } else if (nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_AUDIO) {// ...... 省略无关逻辑 } else if(nMsgType== IM::BaseDefine::MSG_TYPE_SINGLE_TEXT) {if (nFromId != nToId) {nSessionId = CSessionModel::getInstance()->getSessionId(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);if (INVALID_VALUE == nSessionId) {// 创建会话nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);}nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);if(INVALID_VALUE == nPeerSessionId){// 创建会话关系IDnSessionId = CSessionModel::getInstance()->addSession(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE);}uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);if(nSessionId != INVALID_VALUE && nRelateId != INVALID_VALUE){nMsgId = pMsgModel->getMsgId(nRelateId);if(nMsgId != INVALID_VALUE){// 写入消息到数据库pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());CSessionModel::getInstance()->updateSession(nSessionId, nNow);CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);}else{log("msgId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);}}else{log("sessionId or relateId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);}}else{log("send msg to self. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);}} else if(nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO) {// ...... 省略无关逻辑 }log("fromId=%u, toId=%u, type=%u, msgId=%u, sessionId=%u", nFromId, nToId, nMsgType, nMsgId, nSessionId);msg.set_msg_id(nMsgId);pPduResp->SetPBMsg(&msg);pPduResp->SetSeqNum(pPdu->GetSeqNum());pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_DATA);CProxyConn::AddResponsePdu(conn_uuid, pPduResp);}else{log("msgLen error. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);}}else{log("invalid msgType.fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);}}else{log("parse pb failed");}}
msg_server收到CID_MSG_DATA信令后调用 CDBServConn::_HandleMsgData 函数
void CDBServConn::HandlePdu(CImPdu* pPdu)
{switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑 case CID_MSG_DATA:_HandleMsgData(pPdu);break;// ...... 省略无关逻辑 default:log("db server, wrong cmd id=%d ", pPdu->GetCommandId());}
}
CDBServConn::_HandleMsgData 处理消息有三点
- 首先 ack 客户端
- 然后发送到route_server广播
- 如果有多端登录,也要广播给其他客户端
void CDBServConn::_HandleMsgData(CImPdu *pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));if (CHECK_MSG_TYPE_GROUP(msg.msg_type())) {s_group_chat->HandleGroupMessage(pPdu);return;}uint32_t from_user_id = msg.from_user_id();uint32_t to_user_id = msg.to_session_id();uint32_t msg_id = msg.msg_id();if (msg_id == 0) {log("HandleMsgData, write db failed, %u->%u.", from_user_id, to_user_id);return;}uint8_t msg_type = msg.msg_type();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());uint32_t handle = attach_data.GetHandle();log("HandleMsgData, from_user_id=%u, to_user_id=%u, msg_id=%u.", from_user_id, to_user_id, msg_id);CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(from_user_id, attach_data.GetHandle());if (pMsgConn){IM::Message::IMMsgDataAck msg2;msg2.set_user_id(from_user_id);msg2.set_msg_id(msg_id);msg2.set_session_id(to_user_id);msg2.set_session_type(::IM::BaseDefine::SESSION_TYPE_SINGLE);CImPdu pdu;pdu.SetPBMsg(&msg2);pdu.SetServiceId(SID_MSG);pdu.SetCommandId(CID_MSG_DATA_ACK);pdu.SetSeqNum(pPdu->GetSeqNum());pMsgConn->SendPdu(&pdu);}CRouteServConn* pRouteConn = get_route_serv_conn();if (pRouteConn) {pRouteConn->SendPdu(pPdu);}msg.clear_attach_data();pPdu->SetPBMsg(&msg);CImUser* pFromImUser = CImUserManager::GetInstance()->GetImUserById(from_user_id);CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(to_user_id);pPdu->SetSeqNum(0);if (pFromImUser) {pFromImUser->BroadcastClientMsgData(pPdu, msg_id, pMsgConn, from_user_id);}if (pToImUser) {pToImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);}IM::Server::IMGetDeviceTokenReq msg3;msg3.add_user_id(to_user_id);msg3.set_attach_data(pPdu->GetBodyData(), pPdu->GetBodyLength());CImPdu pdu2;pdu2.SetPBMsg(&msg3);pdu2.SetServiceId(SID_OTHER);pdu2.SetCommandId(CID_OTHER_GET_DEVICE_TOKEN_REQ);SendPdu(&pdu2);
}
route_server 收到 msg_server的消息转发,信令为CID_MSG_DATA,调用CRouteConn::_BroadcastMsg 广播函数。
void CRouteConn::HandlePdu(CImPdu* pPdu)
{switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑 case CID_MSG_DATA:case CID_SWITCH_P2P_CMD:case CID_MSG_READ_NOTIFY:case CID_OTHER_SERVER_KICK_USER:case CID_GROUP_CHANGE_MEMBER_NOTIFY:case CID_FILE_NOTIFY:case CID_BUDDY_LIST_REMOVE_SESSION_NOTIFY:_BroadcastMsg(pPdu, this);break;// ...... 省略无关逻辑 default:log("CRouteConn::HandlePdu, wrong cmd id: %d ", pPdu->GetCommandId());break;}
}
其实就是广播给所有的msg_server
void CRouteConn::_BroadcastMsg(CImPdu* pPdu, CRouteConn* pFromConn)
{ConnMap_t::iterator it;for (it = g_route_conn_map.begin(); it != g_route_conn_map.end(); it++) {CRouteConn* pRouteConn = (CRouteConn*)it->second;if (pRouteConn != pFromConn) {pRouteConn->SendPdu(pPdu);}}
}
所有的msg_server收到广播后再广播给所有的客户端
void CRouteServConn::_HandleMsgData(CImPdu* pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));if (CHECK_MSG_TYPE_GROUP(msg.msg_type())) {s_group_chat->HandleGroupMessageBroadcast(pPdu);return;}uint32_t from_user_id = msg.from_user_id();uint32_t to_user_id = msg.to_session_id();uint32_t msg_id = msg.msg_id();log("HandleMsgData, %u->%u, msg_id=%u. ", from_user_id, to_user_id, msg_id);CImUser* pFromImUser = CImUserManager::GetInstance()->GetImUserById(from_user_id);if (pFromImUser){pFromImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);}CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(to_user_id);if (pToImUser){pToImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);}
}
msg_server广播给所有的客户端有讲究,会将消息id等封装成 msg_ack_t 结构体塞入m_send_msg_list 发送列表,等到收到对端的CID_MSG_DATA_ACK,再将此msg_ack_t 结构体从发送列表中移除。其实就是业务层的ACK机制,避免丢消息。
typedef struct {uint32_t msg_id;uint32_t from_id;uint64_t timestamp;
} msg_ack_t; // 业务层的ackvoid CImUser::BroadcastClientMsgData(CImPdu* pPdu, uint32_t msg_id, CMsgConn* pFromConn, uint32_t from_id)
{for (map<uint32_t, CMsgConn*>::iterator it = m_conn_map.begin(); it != m_conn_map.end(); it++){CMsgConn* pConn = it->second;if (pConn != pFromConn) {pConn->SendPdu(pPdu);pConn->AddToSendList(msg_id, from_id);}}
}void CMsgConn::AddToSendList(uint32_t msg_id, uint32_t from_id)
{//log("AddSendMsg, seq_no=%u, from_id=%u ", seq_no, from_id);msg_ack_t msg;msg.msg_id = msg_id;msg.from_id = from_id;msg.timestamp = get_tick_count();m_send_msg_list.push_back(msg);g_down_msg_total_cnt++;
}
收到确认 CID_MSG_DATA_ACK信令后,移除该确认结构体
void CMsgConn::_HandleClientMsgDataAck(CImPdu* pPdu)
{IM::Message::IMMsgDataAck msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));IM::BaseDefine::SessionType session_type = msg.session_type();if (session_type == IM::BaseDefine::SESSION_TYPE_SINGLE){uint32_t msg_id = msg.msg_id();uint32_t session_id = msg.session_id();DelFromSendList(msg_id, session_id);}
}void CMsgConn::DelFromSendList(uint32_t msg_id, uint32_t from_id)
{//log("DelSendMsg, seq_no=%u, from_id=%u ", seq_no, from_id);for (list<msg_ack_t>::iterator it = m_send_msg_list.begin(); it != m_send_msg_list.end(); it++) {msg_ack_t msg = *it;if ( (msg.msg_id == msg_id) && (msg.from_id == from_id) ) {m_send_msg_list.erase(it);break;}}
}
至此,消息就算发送成功了。