欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > 微服务即时通讯系统的实现(服务端)----(2)

微服务即时通讯系统的实现(服务端)----(2)

2024/12/3 3:11:49 来源:https://blog.csdn.net/m0_65558082/article/details/144042168  浏览:    关键词:微服务即时通讯系统的实现(服务端)----(2)

目录

  • 1. 语音识别子服务的实现
    • 1.1 功能设计
    • 1.2 模块划分
    • 1.3 模块功能示意图
    • 1.4 接口的实现
  • 2. 文件存储子服务的实现
    • 2.1 功能设计
    • 2.2 模块划分
    • 2.3 模块功能示意图
    • 2.4 接口的实现
  • 3. 用户管理子服务的实现
    • 3.1 功能设计
    • 3.2 模块划分
    • 3.3 功能模块示意图
    • 3.4 数据管理
      • 3.4.1 关系数据库的数据管理
      • 3.4.2 内存数据库的数据管理
      • 3.4.3 文档数据库的数据管理
    • 3.5 接口的实现
      • 3.5.1 用户子服务所用到的protobuf接口实现
      • 3.5.2 用户的注册和登录接口实现
      • 3.5.3 获取短信验证码接口实现
      • 3.5.4 手机号的登录和注册接口实现
      • 3.5.5 对用户信息修改接口的实现
      • 3.5.6 搭建Rpc服务和创建用户子服务的工厂类
  • 4. 消息转发子服务的实现
    • 4.1 功能设计
    • 4.2 模块划分
    • 4.3 功能模块示意图
    • 4.4 数据库的数据管理
    • 4.5 接口的实现
  • 5. 服务端小结
    • 5.1 语言识别模块总结
    • 5.2 文件存储模块总结
    • 5.3 用户管理模块总结
    • 5.4 消息转发模块总结

1. 语音识别子服务的实现

1.1 功能设计

(1)语音转换子服务,用于调用语音识别 SDK,进行语音识别,将语音转为文字后返回给网关即可,因此提供的功能性接口只有一个:

  1. 语音消息的文字转换:客户端进行语音消息的文字转换。

1.2 模块划分

(1)以下是语言识别的模块划分:

  1. 参数/配置文件解析模块:基于 gflags 框架直接使用进行参数/配置文件解析。
  2. 日志模块:基于 spdlog 框架封装的模块直接使用进行日志输出。
  3. 服务注册模块:基于 etcd 框架封装的注册模块直接使用进行语音识别子服务的服务注册。
  4. rpc 服务模块:基于 brpc 框架搭建 rpc 服务器。
  5. 语音识别 SDK 模块:基于语音识别平台提供的 sdk 直接使用,完成语音的识别转文字。

1.3 模块功能示意图

(1)如下是模块功能图:

服务器进行服务注册后客户端进行服务发现,语言内容会经过rpc服务器调用语言平台进行语言识别。将识别的结果返回给客户端。

1.4 接口的实现

(1)该模块使用到了语音识别模块封装、服务注册、日志模块。该模块的protobuf接口如下:

syntax = "proto3";
package bite_im;option cc_generic_services = true;message SpeechRecognitionReq {string request_id = 1;              //请求IDbytes speech_content = 2;           //语音数据optional string user_id = 3;        //用户IDoptional string session_id = 4;     //登录会话ID -- 网关进行身份鉴权
}message SpeechRecognitionRsp {string request_id = 1;              //请求IDbool success = 2;                   //请求处理结果标志optional string errmsg = 3;         //失败原因optional string recognition_result = 4;      //识别后的文字数据
}//语音识别Rpc服务及接口的定义
service SpeechService {rpc SpeechRecognition(SpeechRecognitionReq) returns (SpeechRecognitionRsp);
}

(2)让SpeechServiceImpl类去继承bite_im::SpeechService当中的类并且重写该类的功能:

#pragma once
#include <brpc/server.h>
#include <butil/logging.h>
#include "asr.hpp"      // 语音识别模块封装
#include "etcd.hpp"     // 服务注册模块封装
#include "logger.hpp"   // 日志模块封装
#include "speech.pb.h"  // protobuf框架代码namespace MyTest
{class SpeechServiceImpl : public bite_im::SpeechService{public:SpeechServiceImpl(const ASRClient::ptr &asr_client):_asr_client(asr_client){}void SpeechRecognition(google::protobuf::RpcController* controller,const ::bite_im::SpeechRecognitionReq* request,::bite_im::SpeechRecognitionRsp* response,::google::protobuf::Closure* done){LOG_DEBUG("收到语音转文字请求!");brpc::ClosureGuard rpc_guard(done);//1. 取出请求中的语音数据//2. 调用语音sdk模块进行语音识别,得到响应std::string err;std::string res = _asr_client->recognize(request->speech_content(), err);if(res.empty()){LOG_ERROR("{} 语音识别失败!", request->request_id());response->set_request_id(request->request_id());response->set_success(false);response->set_errmsg("语音识别失败:" + err);return;}//3. 组织响应response->set_request_id(request->request_id());response->set_success(true);response->set_recognition_result(res);}~SpeechServiceImpl(){}private:ASRClient::ptr _asr_client;};
}

(3)构建SpeechServer类来搭建Rpc服务。该类包含服务注册、服务发现和语言转换模块:

namespace MyTest
{class SpeechServer {public:using ptr = std::shared_ptr<SpeechServer>;SpeechServer(const ASRClient::ptr asr_client,const Registry::ptr reg_client,const std::shared_ptr<brpc::Server> &rpc_server):_asr_client(asr_client),_reg_client(reg_client),_rpc_server(rpc_server){}// 搭建RPC服务器,并启动服务器void start(){_rpc_server->RunUntilAskedToQuit();}~SpeechServer(){}private:ASRClient::ptr _asr_client;Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;};}

(4)创建工厂类SpeechServerBuilder来实现语言识别子服务的创建以及Rpc服务器的创建:

namespace MyTest
{class SpeechServerBuilder {public://构造语音识别客户端对象void make_asr_object(const std::string &app_id,const std::string &api_key,const std::string &secret_key){_asr_client = std::make_shared<ASRClient>(app_id, api_key, secret_key);}//用于构造服务注册客户端对象void make_reg_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host){_reg_client = std::make_shared<Registry>(reg_host);_reg_client->registry(service_name, access_host);}//构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads){if(!_asr_client) {LOG_ERROR("还未初始化语音识别模块!");abort();}_rpc_server = std::make_shared<brpc::Server>();SpeechServiceImpl *speech_service = new SpeechServiceImpl(_asr_client);int ret = _rpc_server->AddService(speech_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if(ret == -1){LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if(ret == -1){LOG_ERROR("服务启动失败!");abort();}}SpeechServer::ptr build(){if(!_asr_client) {LOG_ERROR("还未初始化语音识别模块!");abort();}if(!_reg_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if(!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}SpeechServer::ptr server = std::make_shared<SpeechServer>(_asr_client, _reg_client, _rpc_server);return server;}private:ASRClient::ptr _asr_client;Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;};
}

(5)实现语音识别子服务的服务器的搭建:

//主要实现语音识别子服务的服务器的搭建
#include "speech_server.hpp"DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");DEFINE_string(registry_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(instance_name, "/speech_service/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:10001", "当前实例的外部访问地址");DEFINE_int32(listen_port, 10001, "Rpc服务器监听端口");
DEFINE_int32(rpc_timeout, -1, "Rpc调用超时时间");
DEFINE_int32(rpc_threads, 1, "Rpc的IO线程数量");DEFINE_string(app_id, "60694095", "语音平台应用ID");
DEFINE_string(api_key, "PWn6zlsxym8VwpBW8Or4PPGe", "语音平台API密钥");
DEFINE_string(secret_key, "Bl0mn74iyAkr3FzCo5TZV7lBq7NYoms9", "语音平台加密密钥");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);MyTest::SpeechServerBuilder ssb;ssb.make_asr_object(FLAGS_app_id, FLAGS_api_key, FLAGS_secret_key);ssb.make_rpc_server(FLAGS_listen_port, FLAGS_rpc_timeout, FLAGS_rpc_threads);ssb.make_reg_object(FLAGS_registry_host, FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);auto server = ssb.build();server->start();return 0;
}

(6)cmake构建代码:

# 1. 添加cmake版本说明
cmake_minimum_required(VERSION 3.1.3)
# 2. 声明工程名称
project(speech_server)set(target "speech_server")
set(test_client "speech_client")# 3. 检测并生成ODB框架代码
#   1. 添加所需的proto映射代码文件名称
set(proto_path ${CMAKE_CURRENT_SOURCE_DIR}/../proto)
set(proto_files speech.proto)
#   2. 检测框架代码文件是否已经生成
set(proto_hxx "")
set(proto_cxx "")
set(proto_srcs "")
foreach(proto_file ${proto_files})
#   3. 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码string(REPLACE ".proto" ".pb.cc" proto_cc ${proto_file})string(REPLACE ".proto" ".pb.h" proto_hh  ${proto_file})if (NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}${proto_cc})add_custom_command(PRE_BUILDCOMMAND protocARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR} -I ${proto_path} --experimental_allow_proto3_optional ${proto_path}/${proto_file}DEPENDS ${proto_path}/${proto_file}OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc}COMMENT "生成Protobuf框架代码文件:" ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})endif()list(APPEND proto_srcs ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})
endforeach()# 4. 获取源码目录下的所有源码文件
set(src_files "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/source src_files)
# 5. 声明目标及依赖
add_executable(${target} ${src_files} ${proto_srcs})
# 7. 设置需要连接的库
target_link_libraries(${target} -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl -ljsoncpp)set(test_files "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/test test_files)
add_executable(${test_client} ${test_files} ${proto_srcs})
target_link_libraries(${test_client} -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl -ljsoncpp)# 6. 设置头文件默认搜索路径
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../common)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../third/include)#8. 设置安装路径
INSTALL(TARGETS ${target} ${test_client} RUNTIME DESTINATION bin)

(7)所以语言识别服务的整体流程是:

  1. 接收请求,从请求中取出语音数据。
  2. 基于语音识别 sdk 进行语音识别,获取识别后的文本内容。
  3. 组织响应进行返回。

2. 文件存储子服务的实现

2.1 功能设计

(1)文件管理子服务,主要用于管理用户的头像,以及消息中的文件存储,因此需要提供以下接口:

  1. 文件的上传
    • 单个文件的上传:这个接口基本用于后台部分,收到文件消息后将文件数据转发给文件子服务进行存储。
    • 多个文件的上传:这个接口基本用于后台部分,收到文件消息后将文件数据转发给文件子服务进行存储。
  2. 文件的下载
    • 单个文件的下载:在后台用于获取用户头像文件数据,以及客户端用于获取文件/语音/图片消息的文件数据。
    • 多个文件的下载:在后台用于大批量获取用户头像数据(比如获取用户列表的时候),以及前端的批量文件下载。

2.2 模块划分

(1)以下是文件存储模块划分:

  1. 参数/配置文件解析模块:基于 gflags 框架直接使用进行参数/配置文件解析。
  2. 日志模块:基于 spdlog 框架封装的模块直接使用进行日志输出。
  3. 服务注册模块:基于 etcd 框架封装的注册模块直接使用进行文件存储管理子服务的服务注册。
  4. rpc 服务模块:基于 brpc 框架搭建 rpc 服务器。
  5. 文件操作模块:基于标准库的文件流操作实现文件读写的封装。

2.3 模块功能示意图

(1)下图是本服务的模块图:

2.4 接口的实现

(1)该模块使用了日志模块和服务注册模块。该模块的protobuf接口如下:

syntax = "proto3";
package bite_im;
import "base.proto";option cc_generic_services = true;message GetSingleFileReq {string request_id = 1;string file_id = 2;optional string user_id = 3;optional string session_id = 4;
}
message GetSingleFileRsp {string request_id = 1;bool success = 2;string errmsg = 3; optional FileDownloadData file_data = 4;
}message GetMultiFileReq {string request_id = 1;optional string user_id = 2;optional string session_id = 3;repeated string file_id_list = 4;
}
message GetMultiFileRsp {string request_id = 1;bool success = 2;string errmsg = 3; map<string, FileDownloadData> file_data = 4;//文件ID与文件数据的映射map
}message PutSingleFileReq {string request_id = 1; //请求ID,作为处理流程唯一标识optional string user_id = 2;optional string session_id = 3;FileUploadData file_data = 4;
}
message PutSingleFileRsp {string request_id = 1;bool success = 2;string errmsg = 3;FileMessageInfo file_info = 4; //返回了文件组织的元信息
}message PutMultiFileReq {string request_id = 1;optional string user_id = 2;optional string session_id = 3;repeated FileUploadData file_data = 4;
}
message PutMultiFileRsp {string request_id = 1;bool success = 2;string errmsg = 3; repeated FileMessageInfo file_info = 4;
}service FileService {rpc GetSingleFile(GetSingleFileReq) returns (GetSingleFileRsp);rpc GetMultiFile(GetMultiFileReq) returns (GetMultiFileRsp);rpc PutSingleFile(PutSingleFileReq) returns (PutSingleFileRsp);rpc PutMultiFile(PutMultiFileReq) returns (PutMultiFileRsp);
}

(2)让FileServiceImpl类继承bite_im::FileService重写上面的四个函数。单个文件的上传实现:

  1. 获取文件元数据(大小、文件名、文件内容)。
  2. 为文件分配文件 ID。
  3. 以文件 ID 为文件名打开文件,并写入数据。
  4. 组织响应进行返回。
namespace MyTest
{class FileServiceImpl : public bite_im::FileService{FileServiceImpl(const std::string &storage_path):_storage_path(storage_path){umask(0);mkdir(storage_path.c_str(), 0775);if(_storage_path.back() != '/'){_storage_path.push_back('/');}}void GetSingleFile(google::protobuf::RpcController *controller,const ::bite_im::GetSingleFileReq *request,::bite_im::GetSingleFileRsp *response,::google::protobuf::Closure *done){brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 取出请求中的文件ID(起始就是文件名)std::string fid = request->file_id();std::string filename = _storage_path + fid;// 2. 将文件ID作为文件名,读取文件数据std::string body;bool ret = readFile(filename, body);if(ret == false){response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_data()->set_file_id(fid);response->mutable_file_data()->set_file_content(body);}~FileServiceImpl() {}private:std::string _storage_path;};
}

(3)多个文件的上传。其实相较于单文件上传,就是将处理的过程循环进行了而已:

  1. 从请求中获取文件元数据。
  2. 为文件分配文件 ID。
  3. 以文件 ID 为文件名打开文件,并写入数据。
  4. 回到第一步进行下一个文件的处理。
  5. 当所有文件数据存储完毕,组织响应进行返回。
namespace MyTest
{class FileServiceImpl : public bite_im::FileService{void GetMultiFile(google::protobuf::RpcController *controller,const ::bite_im::GetMultiFileReq *request,::bite_im::GetMultiFileRsp *response,::google::protobuf::Closure *done){brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 循环取出请求中的文件ID,读取文件数据进行填充for (int i = 0; i < request->file_id_list_size(); i++){std::string fid = request->file_id_list(i);std::string filename = _storage_path + fid;std::string body;bool ret = readFile(filename, body);if (ret == false){response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}bite_im::FileDownloadData data;data.set_file_id(fid);data.set_file_content(body);response->mutable_file_data()->insert({fid, data});}response->set_success(true);}};
}

(4)单个文件的下载:

  1. 从请求中获取文件 ID。
  2. 以文件 ID 作为文件名打开文件,获取文件大小,并从中读取文件数据。
  3. 组织响应进行返回。
namespace MyTest
{class FileServiceImpl : public bite_im::FileService{void PutSingleFile(google::protobuf::RpcController *controller,const ::bite_im::PutSingleFileReq *request,::bite_im::PutSingleFileRsp *response,::google::protobuf::Closure *done){brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 为文件生成一个唯一uudi作为文件名 以及 文件IDstd::string fid = uuid();std::string filename = _storage_path + fid;// 2. 取出请求中的文件数据,进行文件数据写入bool ret = writeFile(filename, request->file_data().file_content());if (ret == false){response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_info()->set_file_id(fid);response->mutable_file_info()->set_file_size(request->file_data().file_size());response->mutable_file_info()->set_file_name(request->file_data().file_name());}};
}

(5)多个文件的下载其实相较于单文件下载,就是将处理的过程循环进行了而已:

  1. 从请求中获取文件 ID。
  2. 以文件 ID 作为文件名打开文件,获取文件大小,并从中读取文件数据。
  3. 回到第一步进行下一个文件的处理。
  4. 当所有文件数据获取完毕,组织响应进行返回。
namespace MyTest
{class FileServiceImpl : public bite_im::FileService{void PutMultiFile(google::protobuf::RpcController *controller,const ::bite_im::PutMultiFileReq *request,::bite_im::PutMultiFileRsp *response,::google::protobuf::Closure *done){brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());for (int i = 0; i < request->file_data_size(); i++){std::string fid = uuid();std::string filename = _storage_path + fid;bool ret = writeFile(filename, request->file_data(i).file_content());if (ret == false){response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}bite_im::FileMessageInfo *info = response->add_file_info();info->set_file_id(fid);info->set_file_size(request->file_data(i).file_size());info->set_file_name(request->file_data(i).file_name());}response->set_success(true);}};
}

(6)构建FileServer类来搭建Rpc服务。该类包含服务注册和Rpc服务器:

namespace MyTest
{class FileServer{public:using ptr = std::shared_ptr<FileServer>;FileServer(const Registry::ptr &reg_client,const std::shared_ptr<brpc::Server> &server) :_reg_client(reg_client),_rpc_server(server) {}// 搭建RPC服务器,并启动服务器void start(){_rpc_server->RunUntilAskedToQuit();}~FileServer() {}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;};
}

(7)创建工厂类FileServerBuilder来实现文件存储子服务的创建以及Rpc服务器的创建:

namespace MyTest
{class FileServerBuilder{public:// 用于构造服务注册客户端对象void make_reg_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host){_reg_client = std::make_shared<Registry>(reg_host);_reg_client->registry(service_name, access_host);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout,uint8_t num_threads, const std::string &path = "./data/"){_rpc_server = std::make_shared<brpc::Server>();FileServiceImpl *file_service = new FileServiceImpl(path);int ret = _rpc_server->AddService(file_service,brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if(ret == -1){LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if(ret == -1){LOG_ERROR("服务启动失败!");abort();}}FileServer::ptr build(){if(!_reg_client){LOG_ERROR("还未初始化服务注册模块!");abort();}if(!_rpc_server){LOG_ERROR("还未初始化RPC服务器模块!");abort();}FileServer::ptr server = std::make_shared<FileServer>(_reg_client, _rpc_server);return server;}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;};
}

(8)实现文件存储子服务的服务器的搭建:

#include "file_server.hpp"//按照流程完成服务器的搭建
//1. 参数解析
//2. 日志初始化
//3. 构造服务器对象,启动服务器DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");DEFINE_string(registry_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(instance_name, "/file_service/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:10002", "当前实例的外部访问地址");DEFINE_string(storage_path, "./data/", "当前实例的外部访问地址");DEFINE_int32(listen_port, 10002, "Rpc服务器监听端口");
DEFINE_int32(rpc_timeout, -1, "Rpc调用超时时间");
DEFINE_int32(rpc_threads, 1, "Rpc的IO线程数量");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);MyTest::FileServerBuilder fsb;fsb.make_rpc_server(FLAGS_listen_port, FLAGS_rpc_timeout, FLAGS_rpc_threads, FLAGS_storage_path);fsb.make_reg_object(FLAGS_registry_host, FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);auto server = fsb.build();server->start();return 0;
}

(9)cmake构建代码:

# 1. 添加cmake版本说明
cmake_minimum_required(VERSION 3.1.3)
# 2. 声明工程名称
project(file_server)set(target "file_server")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g")# 3. 检测并生成ODB框架代码
#   1. 添加所需的proto映射代码文件名称
set(proto_path ${CMAKE_CURRENT_SOURCE_DIR}/../proto)
set(proto_files file.proto base.proto)
#   2. 检测框架代码文件是否已经生成
set(proto_hxx "")
set(proto_cxx "")
set(proto_srcs "")
foreach(proto_file ${proto_files})
#   3. 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码string(REPLACE ".proto" ".pb.cc" proto_cc ${proto_file})string(REPLACE ".proto" ".pb.h" proto_hh  ${proto_file})if (NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}${proto_cc})add_custom_command(PRE_BUILDCOMMAND protocARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR} -I ${proto_path} --experimental_allow_proto3_optional ${proto_path}/${proto_file}DEPENDS ${proto_path}/${proto_file}OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc}COMMENT "生成Protobuf框架代码文件:" ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})endif()list(APPEND proto_srcs ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})
endforeach()# 4. 获取源码目录下的所有源码文件
set(src_files "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/source src_files)
# 5. 声明目标及依赖
add_executable(${target} ${src_files} ${proto_srcs})
# 7. 设置需要连接的库
target_link_libraries(${target} -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl -ljsoncpp)set(test_client "file_client")
set(test_files "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/test test_files)
add_executable(${test_client} ${test_files} ${proto_srcs})
target_link_libraries(${test_client} -lgtest -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl -ljsoncpp)# 6. 设置头文件默认搜索路径
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../common)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../third/include)# 8. 设置安装路径
INSTALL(TARGETS ${target} ${test_client} RUNTIME DESTINATION bin)

3. 用户管理子服务的实现

3.1 功能设计

(1)用户管理子服务,主要用于管理用户的数据,以及关于用户信息的各项操作,因此在上述项目功能中,用户子服务需要提供以下接口:

  1. 用户注册:用户输入用户名(昵称),以及密码进行用户名的注册。
  2. 用户登录:用户通过用户名和密码进行登录。
  3. 短信验证码获取:当用户通过手机号注册或登录的时候,需要获取短信验证码。
  4. 手机号注册:用户输入手机号和短信验证码进行手机号的用户注册。
  5. 手机号登录:用户输入手机号和短信验证码进行手机号的用户登录。
  6. 用户信息获取:当用户登录之后,获取个人信息进行展示。
  7. 头像修改:设置用户头像。
  8. 昵称修改:设置用户昵称。
  9. 签名修改:设置用户签名。
  10. 手机号修改:修改用户的绑定手机号。

3.2 模块划分

(1)以下是用户管理模块划分:

  1. 参数/配置文件解析模块:基于 gflags 框架直接使用进行参数/配置文件解析。
  2. 日志模块:基于 spdlog 框架封装的模块直接使用进行日志输出。
  3. 服务注册模块:基于 etcd 框架封装的注册模块直接使用,进行聊天消息存储子服务的注册。
  4. 数据库数据操作模块:基于 odb-mysql 数据管理封装的模块,实现关系型数据库中数据的操作。
    • 用户进行用户名/手机号注册的时候在数据库中进行新增信息。
    • 用户修改个人信息的时候修改数据库中的记录。
    • 用户登录的时候,在数据库中进行用户名密码的验证。
  5. redis 客户端模块:基于 redis++封装的客户端进行内存数据库数据操作
    • 当用户登录的时候需要为用户创建登录会话,会话信息保存在 redis 服务器中。
    • 当用户手机号进行获取/验证验证码的时候,验证码与对应信息保存在 redis 服务器中。
  6. rpc 服务模块:基于 brpc 框架搭建 rpc 服务器。
  7. rpc 服务发现与调用模块:基于 etcd 框架与 brpc 框架封装的服务发现与调用模块,
    • 连接文件管理子服务:获取用户信息的时候,用户头像是通过文件的形式存储在文件子服务中的。
    • 连接消息管理子服务:在打开聊天会话的时候,需要获取最近的一条消息进行展示。
  8. ES 客户端模块:基于 elasticsearch 框架实现访问客户端,向 ES 服务器中存储用户简息,以便于用户的搜索。
  9. 短信平台客户端模块:基于短信平台 SDK 封装使用,用于向用户手机号发送指定验证码。

3.3 功能模块示意图

(1)如下图是该模块的功能图:

3.4 数据管理

3.4.1 关系数据库的数据管理

在关系型数据库中,对于用户子服务来说,总体只进行了一个信息数据的存储与管理,那就是用户信息数据,因此只需要构建好用户信息表,提供好对应的操作即可。

(1)用户数据表:

  • 包含的字段:

    1. 主键 ID:自动生成。
    2. 用户 ID:用户唯一标识。
    3. 用户昵称:用户的昵称,也可用作登录用户名。
    4. 用户签名:用户对自己的描述。
    5. 登录密码:登录验证。
    6. 绑定手机号:用户可以绑定手机号,绑定后可以通过手机号登录。
    7. 用户头像文件 ID:头像文件存储的唯一标识,具体头像数据存储在文件子服务器中。
  • 提供的操作:

    1. 通过昵称获取用户信息。
    2. 通过手机号获取用户信息。
    3. 通过用户 ID 获取用户信息。
    4. 新增用户。
    5. 更新用户信息。

(2)ODB映射数据结构user.hxx的实现:

#pragma once
#include <string>
#include <cstddef> 
#include <odb/nullable.hxx>
#include <odb/core.hxx>namespace MyTest
{#pragma db object table("user")class User{public:User(){}//用户名--新增用户 -- 用户ID, 昵称,密码User(const std::string &uid, const std::string &nickname, const std::string password):_user_id(uid),_nickname(nickname),_password(password){}//手机号--新增用户 -- 用户ID, 手机号, 随机昵称User(const std::string &uid, const std::string &phone):_user_id(uid),_nickname(uid),_phone(phone){}void user_id(const std::string &val) { _user_id = val; }std::string user_id() { return _user_id; }void nickname(const std::string &val) { _nickname = val; }std::string nickname() { if(_nickname){return *_nickname; }return std::string();}void description(const std::string &val) { _description = val; }std::string description(){if(!_description){return std::string();}return *_description; }void password(const std::string &val) { _password = val; }std::string password() { if(!_password){return std::string();}return *_password; }void phone(const std::string &val) { _phone = val; }std::string phone() { if(!_phone){return std::string();}return *_phone; }void avatar_id(const std::string &val) { _avatar_id = val; }std::string avatar_id() { if(!_avatar_id){return std::string();}return *_avatar_id; }private:friend class odb::access;#pragma db id autounsigned long _id;#pragma db type("varchar(64)") index unique std::string _user_id;#pragma db type("varchar(64)") index uniqueodb::nullable<std::string> _nickname; //用户昵称-不一定存在odb::nullable<std::string> _description; //用户签名 - 不一定存在#pragma db type("varchar(64)")odb::nullable<std::string> _password; //用户密码 - 不一定存在#pragma db type("varchar(64)") index uniqueodb::nullable<std::string> _phone; //用户手机号 - 不一定存在#pragma db type("varchar(64)")odb::nullable<std::string> _avatar_id; //用户头像文件ID - 不一定存在};
}

(3)运行如下命令可以通过odb生成mysql代码:

odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time user.hxx # 最后所要填写的取决与文件所在的路径

(4)生成的user.sql代码:

/* This file was generated by ODB, object-relational mapping (ORM)* compiler for C++.*/CREATE DATABASE IF NOT EXISTS `bite_im`;
USE `bite_im`;
DROP TABLE IF EXISTS `user`;CREATE TABLE `user` (`id` BIGINT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,`user_id` varchar(64) NOT NULL,`nickname` varchar(64) NULL,`description` TEXT NULL,`password` varchar(64) NULL,`phone` varchar(64) NULL,`avatar_id` varchar(64) NULL)ENGINE=InnoDB;CREATE UNIQUE INDEX `user_id_i`ON `user` (`user_id`);CREATE UNIQUE INDEX `nickname_i`ON `user` (`nickname`);CREATE UNIQUE INDEX `phone_i`ON `user` (`phone`);

3.4.2 内存数据库的数据管理

(1)会话信息映射键值对:

①映射类型:字符串键值对映射。

②映射字段

  1. 会话 ID(key) - 用户 ID(val) :便于通过会话 ID 查找用户 ID,进行后续操作时的连接身份识别鉴权:
    • 在用户登录的时候新增数据。
    • 在用户登录后的操作时进行有无验证及查询。
    • 该映射数据在用户退出登录的时候删除(目前并未提供实现)。
  2. 用户 ID(key) - 空(val) :这是一个用户登录状态的标记,用于避免同时重复登录:
    • 在用户登录的时候新增数据。
    • 在用户连接断开的时候删除数据。

(2)验证码信息映射键值对:

①映射类型:字符串键值对映射。

②映射字段:

  1. 验证码 ID(key) - 验证码(val) : 用于生成一个验证码 ID 和验证码。
    • 在用户获取短信验证码的时候新增数据。
    • 验证码通过短信平台发送给用户手机。
    • 而验证码 ID 直接响应发送给用户,用户登录的时候通过这两个信息进行验证。
    • 该映射字段需要设置一个 60s 过期自动删除的事件,以及在验证完毕后删除。

(3)对内存数据库redis的封装使用:

#include <sw/redis++/redis.h>
#include <iostream>namespace MyTest
{class RedisClientFactory{public:static std::shared_ptr<sw::redis::Redis> create(const std::string &host,int port,int db,bool keep_alive){sw::redis::ConnectionOptions opts;opts.host = host;opts.port = port;opts.db = db;opts.keep_alive = keep_alive;auto res = std::make_shared<sw::redis::Redis>(opts);return res;}};class Session{public:using ptr = std::shared_ptr<Session>;Session(const std::shared_ptr<sw::redis::Redis> &redis_client):_redis_client(redis_client){}void append(const std::string &ssid, const std::string &uid){_redis_client->set(ssid, uid);}void remove(const std::string &ssid) {_redis_client->del(ssid);}sw::redis::OptionalString uid(const std::string &ssid) {return _redis_client->get(ssid);}private:std::shared_ptr<sw::redis::Redis> _redis_client;};class Status {public:using ptr = std::shared_ptr<Status>;Status(const std::shared_ptr<sw::redis::Redis> &redis_client):_redis_client(redis_client){}void append(const std::string &uid){_redis_client->set(uid, "");}void remove(const std::string &uid){_redis_client->del(uid);}bool exists(const std::string &uid) {auto res = _redis_client->get(uid);if(res){return true;}return false;}private:std::shared_ptr<sw::redis::Redis> _redis_client;};class Codes {public:using ptr = std::shared_ptr<Codes>;Codes(const std::shared_ptr<sw::redis::Redis> &redis_client) :_redis_client(redis_client) {}void append(const std::string &cid, const std::string &code,const std::chrono::milliseconds &t = std::chrono::milliseconds(300000)){_redis_client->set(cid, code, t);}void remove(const std::string &cid){_redis_client->del(cid);}sw::redis::OptionalString code(const std::string &cid){return _redis_client->get(cid);}private:std::shared_ptr<sw::redis::Redis> _redis_client;};
}

3.4.3 文档数据库的数据管理

(1)用户信息的用户 ID,手机号,昵称字段需要在 ES 服务器额外进行一份存储,其目的是因为有用户搜索的功能,用户搜索通常会是一种字符串的模糊匹配方式,用传统的关系型数据库进行模糊匹配效率会极差,因此采用 ES 服务对索引字段进行分词后构建倒排索引,根据关键词进行搜索,效率会大大提升。

(2)对用户索引进行封装:

#include "icsearch.hpp"
#include "user.hxx"
//#include "message.hxx"namespace MyTest
{class ESClientFactory{public:static std::shared_ptr<elasticlient::Client> create(const std::vector<std::string> host_list){return std::make_shared<elasticlient::Client>(host_list);}};class ESUser{public:using ptr = std::shared_ptr<ESUser>;ESUser(const std::shared_ptr<elasticlient::Client> &client) :_es_client(client) {}bool createIndex(){bool ret = ESIndex(_es_client, "user").append("user_id", "keyword", "standard", true).append("nickname").append("phone", "keyword", "standard", true).append("description", "text", "standard", false).append("avatar_id", "keyword", "standard", false).create();if(ret == false){LOG_INFO("用户信息索引创建失败!");return false;}LOG_INFO("用户信息索引创建成功!");return true;}bool appendData(const std::string &uid,const std::string &phone,const std::string &nickname,const std::string &description,const std::string &avatar_id){bool ret = ESInsert(_es_client, "user").append("user_id", uid).append("nickname", nickname).append("phone", phone).append("description", description).append("avatar_id", avatar_id).insert(uid);if(ret == false){LOG_ERROR("用户数据新增/更新失败!");}LOG_INFO("用户数据新增/更新成功!");return true;}std::vector<User> search(const std::string &key, const std::vector<std::string> &uid_list){std::vector<User> res;Json::Value json_user = ESSearch(_es_client, "user").append_should_match("phone.keyword", key).append_should_match("user_id.keyword", key).append_should_match("nickname", key).append_must_not_terms("user_id.keyword", uid_list).search();if(json_user.isArray() == false){LOG_ERROR("用户搜索结果为空,或者结果不是数组类型");return res;}int sz = json_user.size();LOG_DEBUG("检索结果条目数量:{}", sz);for(int i = 0; i < sz; i++){User user;user.user_id(json_user[i]["_source"]["user_id"].asString());user.nickname(json_user[i]["_source"]["nickname"].asString());user.description(json_user[i]["_source"]["description"].asString());user.phone(json_user[i]["_source"]["phone"].asString());user.avatar_id(json_user[i]["_source"]["avatar_id"].asString());res.push_back(user);}return res;}private:// const std::string _uid_key = "user_id";// const std::string _desc_key = "user_id";// const std::string _phone_key = "user_id";// const std::string _name_key = "user_id";// const std::string _avatar_key = "user_id";std::shared_ptr<elasticlient::Client> _es_client;};
}

3.5 接口的实现

3.5.1 用户子服务所用到的protobuf接口实现

syntax = "proto3";
package bite_im;
import "base.proto";
option cc_generic_services = true;//----------------------------
//用户名注册   
message UserRegisterReq {string request_id = 1;string nickname = 2;string password = 3;optional string verify_code_id = 4; //目前客户端实现了本地验证,该字段没用了optional string verify_code = 5;//目前客户端实现了本地验证,该字段没用了
}
message UserRegisterRsp {string request_id = 1;bool success = 2;string errmsg = 3;
}
//----------------------------
//用户名登录 
message UserLoginReq {string request_id = 1;string nickname = 2;string password = 3;optional string verify_code_id = 4;optional string verify_code = 5;
}
message UserLoginRsp {string request_id = 1;bool success = 2;string errmsg = 3;string login_session_id = 4;
}
//----------------------------
//手机号验证码获取
message PhoneVerifyCodeReq {string request_id = 1;string phone_number = 2;
}
message PhoneVerifyCodeRsp {string request_id = 1;bool success = 2;string errmsg = 3;string verify_code_id = 4;
}
//----------------------------
//手机号注册
message PhoneRegisterReq {string request_id = 1;string phone_number = 2;string verify_code_id = 3;string verify_code = 4;
}
message PhoneRegisterRsp {string request_id = 1;bool success = 2;string errmsg = 3;
}
//----------------------------
//手机号登录
message PhoneLoginReq {string request_id = 1;string phone_number = 2;string verify_code_id = 3;string verify_code = 4;
}
message PhoneLoginRsp {string request_id = 1;bool success = 2;string errmsg = 3; string login_session_id = 4;
}
//个人信息获取-这个只用于获取当前登录用户的信息
//  客户端传递的时候只需要填充session_id即可
//其他个人/好友信息的获取在好友操作中完成
message GetUserInfoReq {string request_id = 1;optional string user_id = 2;    // 这个字段是网关进行身份鉴权之后填入的字段optional string session_id = 3; // 进行客户端身份识别的关键字段
}
message GetUserInfoRsp {string request_id = 1;bool success = 2;string errmsg = 3; UserInfo user_info = 4;
}
//内部接口
message GetMultiUserInfoReq {string request_id = 1;repeated string users_id = 2;
}
message GetMultiUserInfoRsp {string request_id = 1;bool success = 2;string errmsg = 3; map<string, UserInfo> users_info = 4;
}
//----------------------------
//用户头像修改 
message SetUserAvatarReq {string request_id = 1;optional string user_id = 2;optional string session_id = 3;bytes avatar = 4;
}
message SetUserAvatarRsp {string request_id = 1;bool success = 2;string errmsg = 3; 
}
//----------------------------
//用户昵称修改 
message SetUserNicknameReq {string request_id = 1;optional string user_id = 2;optional string session_id = 3;string nickname = 4;
}
message SetUserNicknameRsp {string request_id = 1;bool success = 2;string errmsg = 3; 
}
//----------------------------
//用户签名修改 
message SetUserDescriptionReq {string request_id = 1;optional string user_id = 2;optional string session_id = 3;string description = 4;
}
message SetUserDescriptionRsp {string request_id = 1;bool success = 2;string errmsg = 3; 
}
//----------------------------
//用户手机修改 
message SetUserPhoneNumberReq {string request_id = 1;optional string user_id = 2;optional string session_id = 3;string phone_number = 4;string phone_verify_code_id = 5;string phone_verify_code = 6;
}
message SetUserPhoneNumberRsp {string request_id = 1;bool success = 2;string errmsg = 3; 
}service UserService {rpc UserRegister(UserRegisterReq) returns (UserRegisterRsp);rpc UserLogin(UserLoginReq) returns (UserLoginRsp);rpc GetPhoneVerifyCode(PhoneVerifyCodeReq) returns (PhoneVerifyCodeRsp);rpc PhoneRegister(PhoneRegisterReq) returns (PhoneRegisterRsp);rpc PhoneLogin(PhoneLoginReq) returns (PhoneLoginRsp);rpc GetUserInfo(GetUserInfoReq) returns (GetUserInfoRsp);rpc GetMultiUserInfo(GetMultiUserInfoReq) returns (GetMultiUserInfoRsp);rpc SetUserAvatar(SetUserAvatarReq) returns (SetUserAvatarRsp);rpc SetUserNickname(SetUserNicknameReq) returns (SetUserNicknameRsp);rpc SetUserDescription(SetUserDescriptionReq) returns (SetUserDescriptionRsp);rpc SetUserPhoneNumber(SetUserPhoneNumberReq) returns (SetUserPhoneNumberRsp);
}

3.5.2 用户的注册和登录接口实现

(1)用户注册:

  1. 从请求中取出昵称和密码。
  2. 检查昵称是否合法(只能包含字母,数字,连字符-,下划线_,长度限制 3~15 之间)。
  3. 检查密码是否合法(只能包含字母,数字,长度限制 6~15 之间)。
  4. 根据昵称在数据库进行判断是否昵称已存在。
  5. 向数据库新增数据。
  6. 向 ES 服务器中新增用户信息。
  7. 组织响应,进行成功与否的响应即可。
#pragma once
#include <brpc/server.h>
#include <butil/logging.h>#include "data_es.hpp"      // es数据管理客户端封装
#include "data_redis.hpp"      // redis数据管理客户端封装
#include "mysql_user.hpp"      // mysql数据管理客户端封装
#include "etcd.hpp"     // 服务注册模块封装
#include "logger.hpp"   // 日志模块封装
#include "utils.hpp"    // 基础工具接口
#include "dms.hpp"      // 短信平台SDK模块封装
#include "channel.hpp"  // 信道管理模块封装#include "user.pb.h"  // protobuf框架代码
#include "base.pb.h"  // protobuf框架代码
#include "file.pb.h"  // protobuf框架代码namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:UserServiceImpl(const DMSClient::ptr &dms_client,const std::shared_ptr<elasticlient::Client> &es_client,const std::shared_ptr<odb::core::database> &mysql_client,const std::shared_ptr<sw::redis::Redis> &redis_client,const ServiceManager::ptr &channel_manager,const std::string &file_service_name):_es_user(std::make_shared<ESUser>(es_client)),_mysql_user(std::make_shared<UserTable>(mysql_client)),_redis_session(std::make_shared<Session>(redis_client)),_redis_status(std::make_shared<Status>(redis_client)),_redis_codes(std::make_shared<Codes>(redis_client)),_file_service_name(file_service_name),_mm_channels(channel_manager),_dms_client(dms_client){_es_user->createIndex();}bool nickname_check(std::string &nickname){return nickname.size() < 22;}bool password_check(std::string &password){if(password.size() < 6 || password.size() > 15){LOG_ERROR("密码长度不合法:{}-{}", password, password.size());return false;}for(int i = 0; i < password.size(); i++){if(!((password[i] > 'a' && password[i] < 'z') ||(password[i] > 'A' && password[i] < 'Z') ||(password[i] > '0' && password[i] < '9') ||password[i] == '_' || password[i] == '-')){LOG_ERROR("密码字符不合法:{}", password);return false;}}return true;}virtual void UserRegister(::google::protobuf::RpcController *controller,const ::bite_im::UserRegisterReq *request,::bite_im::UserRegisterRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到用户注册请求!");brpc::ClosureGuard rpc_guard(done);//定义一个错误处理函数,当出错的时候被调用auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};//1. 从请求中取出昵称和密码std::string nickname = request->nickname();std::string password  = request->password();//2. 检查昵称是否合法(只能包含字母,数字,连字符-,下划线_,长度限制 3~15 之间)bool ret = nickname_check(nickname);if(ret == false) {LOG_ERROR("{} - 用户名长度不合法!", request->request_id());return err_response(request->request_id(), "用户名长度不合法!");}//3. 检查密码是否合法(只能包含字母,数字,长度限制 6~15 之间)bool ret = password_check(nickname);if(ret == false){LOG_ERROR("{} - 密码格式不合法!", request->request_id());return err_response(request->request_id(), "密码格式不合法!");}//4. 根据昵称在数据库进行判断是否昵称已存在auto user = _mysql_user->select_by_nickname(nickname);if(user){LOG_ERROR("{} - 用户名被占用- {}!", request->request_id(), nickname);return err_response(request->request_id(), "用户名被占用!");}//5. 向数据库新增数据std::string uid = uuid();user = std::make_shared<User>(uid, nickname, password);ret = _mysql_user->insert(user);if(ret == false){LOG_ERROR("{} - Mysql数据库新增数据失败!", request->request_id());return err_response(request->request_id(), "Mysql数据库新增数据失败!");}//6. 向 ES 服务器中新增用户信息ret = _es_user->appendData(uid, "", nickname, "", "");if(ret == false) {LOG_ERROR("{} - ES搜索引擎新增数据失败!", request->request_id());return err_response(request->request_id(), "ES搜索引擎新增数据失败!");}//7. 组织响应,进行成功与否的响应即可。response->set_request_id(request->request_id());response->set_success(true);}};
}

(2)用户登录:

  1. 从请求中取出昵称和密码。
  2. 通过昵称获取用户信息,进行密码是否一致的判断。
  3. 根据 redis 中的登录标记信息是否存在判断用户是否已经登录。
  4. 构造会话 ID,生成会话键值对,向 redis 中添加会话信息以及登录标记信息。
  5. 组织响应,返回生成的会话 ID。
namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:virtual void UserLogin(::google::protobuf::RpcController *controller,const ::bite_im::UserLoginReq *request,::bite_im::UserLoginRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到用户登录请求!");brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};//1. 从请求中取出昵称和密码std::string nickname = request->nickname();std::string password  = request->password();//2. 通过昵称获取用户信息,进行密码是否一致的判断auto user = _mysql_user->select_by_nickname(nickname);if(!user || password != user->password()) {LOG_ERROR("{} - 用户名或密码错误 - {}-{}!", request->request_id(), nickname, password);return err_response(request->request_id(), "用户名或密码错误!");}//3. 根据 redis 中的登录标记信息是否存在判断用户是否已经登录。bool ret = _redis_status->exists(user->user_id());if(ret == true) {LOG_ERROR("{} - 用户已在其他地方登录 - {}!", request->request_id(), nickname);return err_response(request->request_id(), "用户已在其他地方登录!");}//4. 构造会话 ID,生成会话键值对,向 redis 中添加会话信息以及登录标记信息std::string ssid = uuid();_redis_session->append(ssid, user->user_id());//5. 添加用户登录信息_redis_status->append(user->user_id());//6. 组织响应,返回生成的会话 IDresponse->set_request_id(request->request_id());response->set_login_session_id(ssid);response->set_success(true);}};
}

3.5.3 获取短信验证码接口实现

(1)实现流程:

  1. 从请求中取出手机号码。
  2. 验证手机号码格式是否正确(必须以 1 开始,第二位 3~9 之间,后边 9 个数字字符)。
  3. 生成 4 位随机验证码。
  4. 基于短信平台 SDK 发送验证码。
  5. 构造验证码 ID,添加到 redis 验证码映射键值索引中。
  6. 组织响应,返回生成的验证码 ID。
namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:virtual void GetPhoneVerifyCode(::google::protobuf::RpcController *controller,const ::bite_im::PhoneVerifyCodeReq *request,::bite_im::PhoneVerifyCodeRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到短信验证码获取请求!");brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid,const std::string &errmsg) -> void{response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 从请求中取出手机号码std::string phone = request->phone_number();// 2. 验证手机号码格式是否正确(必须以 1 开始,第二位 3~9 之间,后边 9 个数字字符)bool ret = phone_check(phone);if(ret == false){LOG_ERROR("{} - 手机号码格式错误 - {}!", request->request_id(), phone);return err_response(request->request_id(), "手机号码格式错误!");}// 3. 生成 4 位随机验证码std::string code_id = uuid();std::string code = vcode();// 4. 基于短信平台 SDK 发送验证码ret = _dms_client->send(phone, code);if(ret == false){LOG_ERROR("{} - 短信验证码发送失败 - {}!", request->request_id(), phone);return err_response(request->request_id(), "短信验证码发送失败!");}// 5. 构造验证码 ID,添加到 redis 验证码映射键值索引中_redis_codes->append(code_id, code);// 6. 组织响应,返回生成的验证码 IDresponse->set_request_id(request->request_id());response->set_success(true);response->set_verify_code_id(code_id);LOG_DEBUG("获取短信验证码处理完成!");}};
}

3.5.4 手机号的登录和注册接口实现

(1)手机号注册:

  1. 从请求中取出手机号码和验证码。
  2. 检查注册手机号码是否合法。
  3. 从 redis 数据库中进行验证码 ID-验证码一致性匹配。
  4. 通过数据库查询判断手机号是否已经注册过。
  5. 向数据库新增用户信息。
  6. 向 ES 服务器中新增用户信息。
  7. 组织响应,返回注册成功与否。
namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:virtual void PhoneRegister(::google::protobuf::RpcController *controller,const ::bite_im::PhoneRegisterReq *request,::bite_im::PhoneRegisterRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到手机号注册请求!");brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid,const std::string &errmsg) -> void{response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 从请求中取出手机号码和验证码,验证码IDstd::string phone = request->phone_number();std::string code_id = request->verify_code_id();std::string code = request->verify_code();// 2. 检查注册手机号码是否合法bool ret = phone_check(phone);if(ret == false){LOG_ERROR("{} - 手机号码格式错误 - {}!", request->request_id(), phone);return err_response(request->request_id(), "手机号码格式错误!");}// 3. 从 redis 数据库中进行验证码 ID-验证码一致性匹配auto vcode = _redis_codes->code(code_id);if (vcode != code){LOG_ERROR("{} - 验证码错误 - {}-{}!", request->request_id(), code_id, code);return err_response(request->request_id(), "验证码错误!");}// 4. 通过数据库查询判断手机号是否已经注册过auto user = _mysql_user->select_by_phone(phone);if (user){LOG_ERROR("{} - 该手机号已注册过用户 - {}!", request->request_id(), phone);return err_response(request->request_id(), "该手机号已注册过用户!");}// 5. 向数据库新增用户信息std::string uid = uuid();user = std::make_shared<User>(uid, phone);ret = _mysql_user->insert(user);if(ret == false){LOG_ERROR("{} - 向数据库添加用户信息失败 - {}!", request->request_id(), phone);return err_response(request->request_id(), "向数据库添加用户信息失败!");}// 6. 向 ES 服务器中新增用户信息ret = _es_user->appendData(uid, phone, uid, "", "");if(ret == false){LOG_ERROR("{} - ES搜索引擎新增数据失败!", request->request_id());return err_response(request->request_id(), "ES搜索引擎新增数据失败!");}// 7. 组织响应,进行成功与否的响应即可。response->set_request_id(request->request_id());response->set_success(true);}};
}

(2)手机号登录:

  1. 从请求中取出手机号码和验证码 ID,以及验证码。
  2. 检查注册手机号码是否合法。
  3. 从 redis 数据库中进行验证码 ID-验证码一致性匹配。
  4. 根据手机号从数据数据进行用户信息查询,判断用用户是否存在。
  5. 根据 redis 中的登录标记信息是否存在判断用户是否已经登录。
  6. 构造会话 ID,生成会话键值对,向 redis 中添加会话信息以及登录标记信息。
  7. 组织响应,返回生成的会话 ID。
namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:virtual void PhoneLogin(::google::protobuf::RpcController *controller,const ::bite_im::PhoneLoginReq *request,::bite_im::PhoneLoginRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到手机号登录请求!");brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid,const std::string &errmsg) -> void{response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 从请求中取出手机号码和验证码 ID,以及验证码。std::string phone = request->phone_number();std::string code_id = request->verify_code_id();std::string code = request->verify_code();// 2. 检查注册手机号码是否合法bool ret = phone_check(phone);if(ret == false){LOG_ERROR("{} - 手机号码格式错误 - {}!", request->request_id(), phone);return err_response(request->request_id(), "手机号码格式错误!");}// 3. 根据手机号从数据数据进行用户信息查询,判断用用户是否存在auto user = _mysql_user->select_by_phone(phone);if(!user){LOG_ERROR("{} - 该手机号未注册用户 - {}!", request->request_id(), phone);return err_response(request->request_id(), "该手机号未注册用户!");}// 4. 从 redis 数据库中进行验证码 ID-验证码一致性匹配auto vcode = _redis_codes->code(code_id);if(vcode != code){LOG_ERROR("{} - 验证码错误 - {}-{}!", request->request_id(), code_id, code);return err_response(request->request_id(), "验证码错误!");}_redis_codes->remove(code_id);// 5. 根据 redis 中的登录标记信息是否存在判断用户是否已经登录。ret = _redis_status->exists(user->user_id());if(ret == true){LOG_ERROR("{} - 用户已在其他地方登录 - {}!", request->request_id(), phone);return err_response(request->request_id(), "用户已在其他地方登录!");}// 6. 构造会话 ID,生成会话键值对,向 redis 中添加会话信息以及登录标记信息std::string ssid = uuid();_redis_session->append(ssid, user->user_id());// 7. 添加用户登录信息_redis_status->append(user->user_id());// 8. 组织响应,返回生成的会话 IDresponse->set_request_id(request->request_id());response->set_login_session_id(ssid);response->set_success(true);}};
}

3.5.5 对用户信息修改接口的实现

(1)获取单个或者多个用户信息:

  1. 从请求中取出用户 ID。
  2. 通过用户 ID,从数据库中查询用户信息
  3. 根据用户信息中的头像 ID,从文件服务器获取头像文件数据,组织完整用户信息。
  4. 组织响应,返回用户信息。
namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:// 从这一步开始,用户登录之后才会进行的操作virtual void GetUserInfo(::google::protobuf::RpcController *controller,const ::bite_im::GetUserInfoReq *request,::bite_im::GetUserInfoRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到获取单个用户信息请求!");brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid,const std::string &errmsg) -> void{response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 从请求中取出用户 IDstd::string uid = request->user_id();// 2. 通过用户 ID,从数据库中查询用户信息auto user = _mysql_user->select_by_id(uid);if(!user){LOG_ERROR("{} - 未找到用户信息 - {}!", request->request_id(), uid);return err_response(request->request_id(), "未找到用户信息!");}// 3. 根据用户信息中的头像 ID,从文件服务器获取头像文件数据,组织完整用户信息UserInfo *user_info = response->mutable_user_info();user_info->set_user_id(user->user_id());user_info->set_nickname(user->nickname());user_info->set_description(user->description());user_info->set_phone(user->phone());if(!user->avatar_id().empty()){// 从信道管理对象中,获取到连接了文件管理子服务的channelauto channel = _mm_channels->choose(_file_service_name);if(!channel){LOG_ERROR("{} - 未找到文件管理子服务节点 - {} - {}!",request->request_id(), _file_service_name, uid);return err_response(request->request_id(), "未找到文件管理子服务节点!");}// 进行文件子服务的rpc请求,进行头像文件下载bite_im::FileService_Stub stub(channel.get());bite_im::GetSingleFileReq req;bite_im::GetSingleFileRsp rsp;req.set_request_id(request->request_id());req.set_file_id(user->avatar_id());brpc::Controller cntl;stub.GetSingleFile(&cntl, &req, &rsp, nullptr);if(cntl.Failed() == true || rsp.success() == false){LOG_ERROR("{} - 文件子服务调用失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "文件子服务调用失败!");}user_info->set_avatar(rsp.file_data().file_content());}// 4. 组织响应,返回用户信息response->set_request_id(request->request_id());response->set_success(true);}virtual void GetMultiUserInfo(::google::protobuf::RpcController *controller,const ::bite_im::GetMultiUserInfoReq *request,::bite_im::GetMultiUserInfoRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到批量用户信息获取请求!");brpc::ClosureGuard rpc_guard(done);// 1. 定义错误回调auto err_response = [this, response](const std::string &rid,const std::string &errmsg) -> void{response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 2. 从请求中取出用户ID --- 列表std::vector<std::string> uid_lists;for(int i = 0; i < request->users_id_size(); i++){uid_lists.push_back(request->users_id(i));}// 3. 从数据库进行批量用户信息查询auto users = _mysql_user->select_multi_users(uid_lists);if (users.size() != request->users_id_size()){LOG_ERROR("{} - 从数据库查找的用户信息数量不一致 {}-{}!",request->request_id(), request->users_id_size(), users.size());return err_response(request->request_id(), "从数据库查找的用户信息数量不一致!");}// 4. 批量从文件管理子服务进行文件下载auto channel = _mm_channels->choose(_file_service_name);if (!channel){LOG_ERROR("{} - 未找到文件管理子服务节点 - {}!", request->request_id(), _file_service_name);return err_response(request->request_id(), "未找到文件管理子服务节点!");}bite_im::FileService_Stub stub(channel.get());bite_im::GetMultiFileReq req;bite_im::GetMultiFileRsp rsp;req.set_request_id(request->request_id());for (auto &user : users){if (user.avatar_id().empty())continue;req.add_file_id_list(user.avatar_id());}brpc::Controller cntl;stub.GetMultiFile(&cntl, &req, &rsp, nullptr);if (cntl.Failed() == true || rsp.success() == false){LOG_ERROR("{} - 文件子服务调用失败:{} - {}!", request->request_id(),_file_service_name, cntl.ErrorText());return err_response(request->request_id(), "文件子服务调用失败!");}// 5. 组织响应()for (auto &user : users){auto user_map = response->mutable_users_info(); // 本次请求要响应的用户信息mapauto file_map = rsp.mutable_file_data();        // 这是批量文件请求响应中的mapUserInfo user_info;user_info.set_user_id(user.user_id());user_info.set_nickname(user.nickname());user_info.set_description(user.description());user_info.set_phone(user.phone());user_info.set_avatar((*file_map)[user.avatar_id()].file_content());(*user_map)[user_info.user_id()] = user_info;}response->set_request_id(request->request_id());response->set_success(true);}};
}

(2)设置头像:

  1. 从请求中取出用户 ID 与头像数据
  2. 从数据库通过用户 ID 进行用户信息查询,判断用户是否存在
  3. 上传头像文件到文件子服务,
  4. 将返回的头像文件 ID 更新到数据库中
  5. 更新 ES 服务器中用户信息
  6. 组织响应,返回更新成功与否
namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:virtual void SetUserAvatar(::google::protobuf::RpcController *controller,const ::bite_im::SetUserAvatarReq *request,::bite_im::SetUserAvatarRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到用户头像设置请求!");brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid,const std::string &errmsg) -> void{response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 从请求中取出用户 ID 与头像数据std::string uid = request->user_id();// 2. 从数据库通过用户 ID 进行用户信息查询,判断用户是否存在auto user = _mysql_user->select_by_id(uid);if(!user){LOG_ERROR("{} - 未找到用户信息 - {}!", request->request_id(), uid);return err_response(request->request_id(), "未找到用户信息!");}// 3. 上传头像文件到文件子服务,auto channel = _mm_channels->choose(_file_service_name);if(!channel){LOG_ERROR("{} - 未找到文件管理子服务节点 - {}!", request->request_id(), _file_service_name);return err_response(request->request_id(), "未找到文件管理子服务节点!");}bite_im::FileService_Stub stub(channel.get());bite_im::PutSingleFileReq req;bite_im::PutSingleFileRsp rsp;req.set_request_id(request->request_id());req.mutable_file_data()->set_file_name("");req.mutable_file_data()->set_file_size(request->avatar().size());req.mutable_file_data()->set_file_content(request->avatar());brpc::Controller cntl;stub.PutSingleFile(&cntl, &req, &rsp, nullptr);if (cntl.Failed() == true || rsp.success() == false){LOG_ERROR("{} - 文件子服务调用失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "文件子服务调用失败!");}std::string avatar_id = rsp.file_info().file_id();// 4. 将返回的头像文件 ID 更新到数据库中user->avatar_id(avatar_id);bool ret = _mysql_user->update(user);if(ret == false){LOG_ERROR("{} - 更新数据库用户头像ID失败 :{}!", request->request_id(), avatar_id);return err_response(request->request_id(), "更新数据库用户头像ID失败!");}// 5. 更新 ES 服务器中用户信息ret = _es_user->appendData(user->user_id(), user->phone(),user->nickname(), user->description(), user->avatar_id());if(ret == false){LOG_ERROR("{} - 更新搜索引擎用户头像ID失败 :{}!", request->request_id(), avatar_id);return err_response(request->request_id(), "更新搜索引擎用户头像ID失败!");}// 6. 组织响应,返回更新成功与否response->set_request_id(request->request_id());response->set_success(true);}};
}

(3)设置昵称:

  1. 从请求中取出用户 ID 与新的昵称
  2. 判断昵称格式是否正确
  3. 从数据库通过用户 ID 进行用户信息查询,判断用户是否存在
  4. 将新的昵称更新到数据库中
  5. 更新 ES 服务器中用户信息
  6. 组织响应,返回更新成功与否
namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:virtual void SetUserNickname(::google::protobuf::RpcController *controller,const ::bite_im::SetUserNicknameReq *request,::bite_im::SetUserNicknameRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到用户昵称设置请求!");brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid,const std::string &errmsg) -> void{response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 从请求中取出用户 ID 与新的昵称std::string uid = request->user_id();std::string new_nickname = request->nickname();// 2. 判断昵称格式是否正确bool ret = nickname_check(new_nickname);if(ret == false){LOG_ERROR("{} - 用户名长度不合法!", request->request_id());return err_response(request->request_id(), "用户名长度不合法!");}// 3. 从数据库通过用户 ID 进行用户信息查询,判断用户是否存在auto user = _mysql_user->select_by_id(uid);if (!user){LOG_ERROR("{} - 未找到用户信息 - {}!", request->request_id(), uid);return err_response(request->request_id(), "未找到用户信息!");}// 4. 将新的昵称更新到数据库中user->nickname(new_nickname);ret = _mysql_user->update(user);if (ret == false){LOG_ERROR("{} - 更新数据库用户昵称失败 :{}!", request->request_id(), new_nickname);return err_response(request->request_id(), "更新数据库用户昵称失败!");}// 5. 更新 ES 服务器中用户信息ret = _es_user->appendData(user->user_id(), user->phone(),user->nickname(), user->description(), user->avatar_id());if (ret == false){LOG_ERROR("{} - 更新搜索引擎用户昵称失败 :{}!", request->request_id(), new_nickname);return err_response(request->request_id(), "更新搜索引擎用户昵称失败!");}// 6. 组织响应,返回更新成功与否response->set_request_id(request->request_id());response->set_success(true);}};
}

(4)设置签名:

  1. 从请求中取出用户 ID 与新的签名
  2. 从数据库通过用户 ID 进行用户信息查询,判断用户是否存在
  3. 将新的签名更新到数据库中
  4. 更新 ES 服务器中用户信息
  5. 组织响应,返回更新成功与否
namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:virtual void SetUserDescription(::google::protobuf::RpcController *controller,const ::bite_im::SetUserDescriptionReq *request,::bite_im::SetUserDescriptionRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到用户签名设置请求!");brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid,const std::string &errmsg) -> void{response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 从请求中取出用户 ID 与新的昵称std::string uid = request->user_id();std::string new_description = request->description();// 3. 从数据库通过用户 ID 进行用户信息查询,判断用户是否存在auto user = _mysql_user->select_by_id(uid);if(!user){LOG_ERROR("{} - 未找到用户信息 - {}!", request->request_id(), uid);return err_response(request->request_id(), "未找到用户信息!");}// 4. 将新的昵称更新到数据库中user->description(new_description);bool ret = _mysql_user->update(user);if (ret == false){LOG_ERROR("{} - 更新数据库用户签名失败 :{}!", request->request_id(), new_description);return err_response(request->request_id(), "更新数据库用户签名失败!");}// 5. 更新 ES 服务器中用户信息ret = _es_user->appendData(user->user_id(), user->phone(),user->nickname(), user->description(), user->avatar_id());if (ret == false){LOG_ERROR("{} - 更新搜索引擎用户签名失败 :{}!", request->request_id(), new_description);return err_response(request->request_id(), "更新搜索引擎用户签名失败!");}// 6. 组织响应,返回更新成功与否response->set_request_id(request->request_id());response->set_success(true);}};
}

(5)设置绑定手机号:

  1. 从请求中取出手机号码和验证码 ID,以及验证码。
  2. 检查注册手机号码是否合法
  3. 从 redis 数据库中进行验证码 ID-验证码一致性匹配
  4. 根据手机号从数据数据进行用户信息查询,判断用用户是否存在
  5. 将新的手机号更新到数据库中
  6. 更新 ES 服务器中用户信息
  7. 组织响应,返回更新成功与否
namespace MyTest
{class UserServiceImpl : public bite_im::UserService{public:virtual void SetUserPhoneNumber(::google::protobuf::RpcController *controller,const ::bite_im::SetUserPhoneNumberReq *request,::bite_im::SetUserPhoneNumberRsp *response,::google::protobuf::Closure *done){LOG_DEBUG("收到用户手机号设置请求!");brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid,const std::string &errmsg) -> void{response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 从请求中取出用户 ID 与新的昵称std::string uid = request->user_id();std::string new_phone = request->phone_number();std::string code = request->phone_verify_code();std::string code_id = request->phone_verify_code_id();// 2. 对验证码进行验证auto vcode = _redis_codes->code(code_id);if (vcode != code){LOG_ERROR("{} - 验证码错误 - {}-{}!", request->request_id(), code_id, code);return err_response(request->request_id(), "验证码错误!");}// 3. 从数据库通过用户 ID 进行用户信息查询,判断用户是否存在auto user = _mysql_user->select_by_id(uid);if (!user){LOG_ERROR("{} - 未找到用户信息 - {}!", request->request_id(), uid);return err_response(request->request_id(), "未找到用户信息!");}// 4. 将新的昵称更新到数据库中user->phone(new_phone);bool ret = _mysql_user->update(user);if (ret == false){LOG_ERROR("{} - 更新数据库用户手机号失败 :{}!", request->request_id(), new_phone);return err_response(request->request_id(), "更新数据库用户手机号失败!");}// 5. 更新 ES 服务器中用户信息ret = _es_user->appendData(user->user_id(), user->phone(),user->nickname(), user->description(), user->avatar_id());if (ret == false){LOG_ERROR("{} - 更新搜索引擎用户手机号失败 :{}!", request->request_id(), new_phone);return err_response(request->request_id(), "更新搜索引擎用户手机号失败!");}// 6. 组织响应,返回更新成功与否response->set_request_id(request->request_id());response->set_success(true);}~UserServiceImpl(){}private:ESUser::ptr _es_user;UserTable::ptr _mysql_user;Session::ptr _redis_session;Status::ptr _redis_status;Codes::ptr _redis_codes;//这边是rpc调用客户端相关对象std::string _file_service_name;ServiceManager::ptr _mm_channels;DMSClient::ptr _dms_client;};
}

3.5.6 搭建Rpc服务和创建用户子服务的工厂类

(1)创建UserServer类来搭建RPC服务器:

namespace MyTest
{class UserServer{public:using ptr = std::shared_ptr<UserServer>;UserServer(const Discovery::ptr service_discoverer, const Registry::ptr &reg_client,const std::shared_ptr<elasticlient::Client> &es_client,const std::shared_ptr<odb::core::database> &mysql_client,std::shared_ptr<sw::redis::Redis> &redis_client,const std::shared_ptr<brpc::Server> &server):_service_discoverer(service_discoverer),_registry_client(reg_client),_es_client(es_client),_mysql_client(mysql_client),_redis_client(redis_client),_rpc_server(server){}//搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}~UserServer(){}private:Discovery::ptr _service_discoverer;Registry::ptr _registry_client;std::shared_ptr<elasticlient::Client> _es_client;std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<sw::redis::Redis> _redis_client;std::shared_ptr<brpc::Server> _rpc_server;};
}

(2)创建工厂类UserServerBuilder来实现用户子服务的创建以及Rpc服务器的创建:

namespace MyTest
{class UserServerBuilder{public://构造es客户端对象void make_es_object(const std::vector<std::string> host_list) {_es_client = ESClientFactory::create(host_list);}void make_dms_object(const std::string &access_key_id,const std::string &access_key_secret){_dms_client = std::make_shared<DMSClient>(access_key_id, access_key_secret);}void make_mysql_object(const std::string &user,const std::string &pswd,const std::string &host,const std::string &db,const std::string &cset,int port,int conn_pool_count){_mysql_client = ODBFactory::create(user, pswd, host, db, cset, port, conn_pool_count);}void make_redis_object(const std::string &host,int port,int db,bool keep_alive){_redis_client = RedisClientFactory::create(host, port, db, keep_alive);}//用于构造服务发现客户端&信道管理对象void make_discovery_object(const std::string &reg_host,const std::string &base_service_name,const std::string &file_service_name){_file_service_name = file_service_name;_mm_channels = std::make_shared<ServiceManager>();_mm_channels->declared(file_service_name);LOG_DEBUG("设置文件子服务为需添加管理的子服务:{}", file_service_name);auto put_cb = std::bind(&ServiceManager::onServiceOnline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);_service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb);}//用于构造服务注册客户端对象void make_registry_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host) {_registry_client = std::make_shared<Registry>(reg_host);_registry_client->registry(service_name, access_host);}void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads){if(!_es_client){LOG_ERROR("还未初始化ES搜索引擎模块!");abort();}if(!_mysql_client){LOG_ERROR("还未初始化Mysql数据库模块!");abort();}if (!_redis_client){LOG_ERROR("还未初始化Redis数据库模块!");abort();}if (!_mm_channels){LOG_ERROR("还未初始化信道管理模块!");abort();}if (!_dms_client){LOG_ERROR("还未初始化短信平台模块!");abort();}_rpc_server = std::make_shared<brpc::Server>();UserServiceImpl *user_service = new UserServiceImpl(_dms_client, _es_client,_mysql_client, _redis_client, _mm_channels, _file_service_name);int ret = _rpc_server->AddService(user_service,brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if(ret == -1){LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1){LOG_ERROR("服务启动失败!");abort();}}//构造RPC服务器对象UserServer::ptr build(){if(!_service_discoverer) {LOG_ERROR("还未初始化服务发现模块!");abort();}if(!_registry_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if(!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}UserServer::ptr server = std::make_shared<UserServer>(_service_discoverer, _registry_client,_es_client, _mysql_client, _redis_client, _rpc_server);return server;}private:Registry::ptr _registry_client;std::shared_ptr<elasticlient::Client> _es_client;std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<sw::redis::Redis> _redis_client;std::string _file_service_name;ServiceManager::ptr _mm_channels;Discovery::ptr _service_discoverer;std::shared_ptr<DMSClient> _dms_client;std::shared_ptr<brpc::Server> _rpc_server;};
}

(3)实现用户管理子服务的服务器的搭建:

#include "user_server.hpp"
//主要实现语音识别子服务的服务器的搭建DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");DEFINE_string(registry_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(instance_name, "/user_service/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:10003", "当前实例的外部访问地址");DEFINE_int32(listen_port, 10003, "Rpc服务器监听端口");
DEFINE_int32(rpc_timeout, -1, "Rpc调用超时时间");
DEFINE_int32(rpc_threads, 1, "Rpc的IO线程数量");DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(file_service, "/service/file_service", "文件管理子服务名称");DEFINE_string(es_host, "http://127.0.0.1:9200/", "ES搜索引擎服务器URL");DEFINE_string(mysql_host, "127.0.0.1", "Mysql服务器访问地址");
DEFINE_string(mysql_user, "root", "Mysql服务器访问用户名");
DEFINE_string(mysql_pswd, "123456", "Mysql服务器访问密码");
DEFINE_string(mysql_db, "bite_im", "Mysql默认库名称");
DEFINE_string(mysql_cset, "utf8", "Mysql客户端字符集");
DEFINE_int32(mysql_port, 0, "Mysql服务器访问端口");
DEFINE_int32(mysql_pool_count, 4, "Mysql连接池最大连接数量");DEFINE_string(redis_host, "127.0.0.1", "Redis服务器访问地址");
DEFINE_int32(redis_port, 6379, "Redis服务器访问端口");
DEFINE_int32(redis_db, 0, "Redis默认库号");
DEFINE_bool(redis_keep_alive, true, "Redis长连接保活选项");DEFINE_string(dms_key_id, "LTAI5tKd71CtXeq543QGB8Co", "短信平台密钥ID");
DEFINE_string(dms_key_secret, "hIHCL8ZZ8HTASpTtyrlPx6DuVzAl0t", "短信平台密钥");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);bite_im::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);bite_im::UserServerBuilder usb;usb.make_dms_object(FLAGS_dms_key_id, FLAGS_dms_key_secret);usb.make_es_object({FLAGS_es_host});usb.make_mysql_object(FLAGS_mysql_user, FLAGS_mysql_pswd, FLAGS_mysql_host, FLAGS_mysql_db, FLAGS_mysql_cset, FLAGS_mysql_port, FLAGS_mysql_pool_count);usb.make_redis_object(FLAGS_redis_host, FLAGS_redis_port, FLAGS_redis_db, FLAGS_redis_keep_alive);usb.make_discovery_object(FLAGS_registry_host, FLAGS_base_service, FLAGS_file_service);usb.make_rpc_server(FLAGS_listen_port, FLAGS_rpc_timeout, FLAGS_rpc_threads);usb.make_registry_object(FLAGS_registry_host, FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);auto server = usb.build();server->start();return 0;
}

(4)cmake构建代码:

# 1. 添加cmake版本说明
cmake_minimum_required(VERSION 3.1.3)
# 2. 声明工程名称
project(user_server)set(target "user_server")set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g")# 3. 检测并生成ODB框架代码
#   1. 添加所需的proto映射代码文件名称
set(proto_path ${CMAKE_CURRENT_SOURCE_DIR}/../proto)
set(proto_files base.proto user.proto file.proto)
#   2. 检测框架代码文件是否已经生成
set(proto_hxx "")
set(proto_cxx "")
set(proto_srcs "")
foreach(proto_file ${proto_files})
#   3. 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码string(REPLACE ".proto" ".pb.cc" proto_cc ${proto_file})string(REPLACE ".proto" ".pb.h" proto_hh  ${proto_file})if (NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}${proto_cc})add_custom_command(PRE_BUILDCOMMAND protocARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR} -I ${proto_path} --experimental_allow_proto3_optional ${proto_path}/${proto_file}DEPENDS ${proto_path}/${proto_file}OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc}COMMENT "生成Protobuf框架代码文件:" ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})endif()list(APPEND proto_srcs ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})
endforeach()# 3. 检测并生成ODB框架代码
#   1. 添加所需的odb映射代码文件名称
set(odb_path ${CMAKE_CURRENT_SOURCE_DIR}/../odb)
set(odb_files user.hxx)
#   2. 检测框架代码文件是否已经生成
set(odb_hxx "")
set(odb_cxx "")
set(odb_srcs "")
foreach(odb_file ${odb_files})
#   3. 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码string(REPLACE ".hxx" "-odb.hxx" odb_hxx ${odb_file})string(REPLACE ".hxx" "-odb.cxx" odb_cxx ${odb_file})if (NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}${odb_cxx})add_custom_command(PRE_BUILDCOMMAND odbARGS -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time ${odb_path}/${odb_file}DEPENDS ${odb_path}/${odb_file}OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx}COMMENT "生成ODB框架代码文件:" ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx})endif()
#   4. 将所有生成的框架源码文件名称保存起来 student-odb.cxx classes-odb.cxxlist(APPEND odb_srcs ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx})
endforeach()# 4. 获取源码目录下的所有源码文件
set(src_files "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/source src_files)
# 5. 声明目标及依赖
add_executable(${target} ${src_files} ${proto_srcs} ${odb_srcs})
# 7. 设置需要连接的库
target_link_libraries(${target} -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl -lodb-mysql -lodb -lodb-boost/usr/lib/x86_64-linux-gnu/libjsoncpp.so.19-lalibabacloud-sdk-core -lcpr -lelasticlient-lhiredis -lredis++)set(test_client "user_client")
set(test_files "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/test test_files)
add_executable(${test_client} ${test_files} ${proto_srcs})
target_link_libraries(${test_client} -pthread -lgtest -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl /usr/lib/x86_64-linux-gnu/libjsoncpp.so.19)# 6. 设置头文件默认搜索路径
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../common)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../odb)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../third/include)#8. 设置安装路径
INSTALL(TARGETS ${target} ${test_client} RUNTIME DESTINATION bin)

4. 消息转发子服务的实现

4.1 功能设计

  • 转发子服务,主要用于针对一条消息内容,组织消息的 ID 以及各项所需要素,然后告诉网关服务器一条消息应该发给谁。
  • 通常消息都是以聊天会话为基础进行发送的,根据会话找到它的所有成员,就是转发的目标。
  • 除此之外,转发子服务将收到的消息,放入消息队列中,由消息存储管理子服务进行消费存储。获取消息转发目标:针对消息内容,组织消息,并告知网关转发目标。

4.2 模块划分

(1)以下是消息转发模块划分:

  1. 参数/配置文件解析模块:基于 gflags 框架直接使用进行参数/配置文件解析。
  2. 日志模块:基于 spdlog 框架封装的模块直接使用进行日志输出。
  3. 服务注册模块:基于 etcd 框架封装的注册模块直接使用进行消息转发服务的服务
    注册。
  4. 数据库数据操作模块:基于 odb-mysql 数据管理封装的模块,从数据库获取会话成员。
  5. 服务发现与调用模块:基于 etcd 框架与 brpc 框架封装的服务发现与调用模块,从用户子服务获取消息发送者的用户信息。
  6. rpc 服务模块:基于 brpc 框架搭建 rpc 服务器。
  7. MQ 发布模块:基于 rabbitmq-client 封装的模块将消息发布到消息队列,让消息存储子服务进行消费,对消息进行存储。

4.3 功能模块示意图

(1)如下图是服务的模块图:

4.4 数据库的数据管理

(1)消息转发数据表:

  • 包含的字段:

    1. 主键 ID:自动生成。
    2. 用户 ID:用户唯一标识。
    3. 会话 ID:需要转发消息的会话。
  • 提供的操作:

    1. 通过用户 ID 获取用户信息。
    2. 通过会话 ID 获取用户信息。

(2)ODB映射数据结构chat_session_member.hxx的实现:

#pragma once
#include <string>
#include <cstddef>
#include <odb/core.hxx>// 聊天会话成员表映射对象
namespace MyTest
{#pragma db object table("chat_session_member")class ChatSessionMember{public:ChatSessionMember() {}ChatSessionMember(const std::string &ssid, const std::string &uid) :_session_id(ssid),_user_id(uid) {}~ChatSessionMember() {}std::string session_id() const { return _session_id; }void session_id(std::string &ssid) { _session_id = ssid; }std::string user_id() const { return _user_id; }void user_id(std::string &uid) { _user_id = uid; }private:friend class odb::access;#pragma db id autounsigned long _id;#pragma db type("varchar(64)") indexstd::string _session_id;#pragma db type("varchar(64)")std::string _user_id;};
}

(3)运行如下命令可以通过odb生成mysql代码:

odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time chat_session_member.hxx # 最后所要填写的取决与文件所在的路径

(4)生成的chat_session_member.sql代码:

/* This file was generated by ODB, object-relational mapping (ORM)* compiler for C++.*/
CREATE DATABASE IF NOT EXISTS `bite_im`;
USE `bite_im`;DROP TABLE IF EXISTS `chat_session_member`;CREATE TABLE `chat_session_member` (`id` BIGINT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,`session_id` varchar(64) NOT NULL,`user_id` varchar(64) NOT NULL)ENGINE=InnoDB;CREATE INDEX `session_id_i`ON `chat_session_member` (`session_id`);

4.5 接口的实现

(1)消息转发服务所用到的protobuf接口如下:

syntax = "proto3";
package bite_im;
import "base.proto";option cc_generic_services = true;//这个用于和网关进行通信
message NewMessageReq {string request_id = 1;  //请求ID -- 全链路唯一标识optional string user_id = 2;optional string session_id = 3;//客户端身份识别信息 -- 这就是消息发送者string chat_session_id = 4;  //聊天会话ID -- 标识了当前消息属于哪个会话,应该转发给谁MessageContent message = 5; // 消息内容--消息类型+内容
}
message NewMessageRsp {string request_id = 1;bool success = 2;string errmsg = 3; 
}//这个用于内部的通信,生成完整的消息信息,并获取消息的转发人员列表
message GetTransmitTargetRsp {string request_id = 1;bool success = 2;string errmsg = 3; MessageInfo message = 4; // 组织好的消息结构 -- repeated string target_id_list = 5; //消息的转发目标列表
}service MsgTransmitService {rpc GetTransmitTarget(NewMessageReq) returns (GetTransmitTargetRsp);
}

(2)获取消息转发目标与消息处理:

  1. 从请求中取出消息内容,会话 ID, 用户 ID
  2. 根据用户 ID 从用户子服务获取当前发送者用户信息
  3. 根据消息内容构造完成的消息结构(分配消息 ID,填充发送者信息,填充消息产
    生时间)
  4. 将消息序列化后发布到 MQ 消息队列中,让消息存储子服务对消息进行持久化存
  5. 从数据库获取目标会话所有成员 ID
  6. 组织响应(完整消息+目标用户 ID),发送给网关,告知网关该将消息发送给谁。
//实现语音识别子服务
#include <brpc/server.h>
#include <butil/logging.h>#include "etcd.hpp"     // 服务注册模块封装
#include "logger.hpp"   // 日志模块封装
#include "rabbitmq.hpp"
#include "channel.hpp"
#include "utils.hpp"
#include "mysql_chat_session_member.hpp"#include "base.pb.h"  // protobuf框架代码
#include "user.pb.h"  // protobuf框架代码
#include "transmite.pb.h"  // protobuf框架代码namespace MyTest
{class TransmiteServiceImpl : public bite_im::MsgTransmitService {public:TransmiteServiceImpl(const std::string &user_service_name,const ServiceManager::ptr &channels,const std::shared_ptr<odb::core::database> &mysql_client,const std::string &exchange_name,const std::string &routing_key,const MQClient::ptr &mq_client):_user_service_name(user_service_name),_mm_channels(channels),_mysql_session_member_table(std::make_shared<ChatSessionMemeberTable>(mysql_client)),_exchange_name(exchange_name),_routing_key(routing_key),_mq_client(mq_client){}void GetTransmitTarget(google::protobuf::RpcController* controller,const ::bite_im::NewMessageReq* request,::bite_im::GetTransmitTargetRsp* response,::google::protobuf::Closure* done) override{brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};//从请求中获取关键信息:用户ID,所属会话ID,消息内容std::string rid = request->request_id();std::string uid = request->user_id();std::string chat_ssid = request->chat_session_id();const MessageContent &content = request->message();// 进行消息组织:发送者-用户子服务获取信息,所属会话,消息内容,产生时间,消息IDauto channel = _mm_channels->choose(chat_session_name);if(!channel){LOG_ERROR("{}-{} 没有可供访问的用户子服务节点!", rid, _user_service_name);return err_response(rid, "没有可供访问的用户子服务节点!");}bite_im::UserService_Stub stub(channel.get());bite_im::GetUserInfoReq req;bite_im::GetUserInfoRsp rsp;req.set_request_id(rid);req.set_user_id(uid);brpc::Controller cntl;stub.GetUserInfo(&cntl, &req, &rsp, nullptr);if(cntl.Failed() == true || rsp.success() == false){LOG_ERROR("{} - 用户子服务调用失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "用户子服务调用失败!");}bite_im::MessageInfo message;message.set_message_id(uuid());message.set_chat_session_id(chat_ssid);message.set_timestamp(time(nullptr));message.mutable_sender()->CopyFrom(rsp.user_info());message.mutable_message()->CopyFrom(content);// 获取消息转发客户端用户列表auto target_list = _mysql_session_member_table->members(chat_ssid);// 将封装完毕的消息,发布到消息队列,待消息存储子服务进行消息持久化bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);if(ret == false){LOG_ERROR("{} - 持久化消息发布失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "持久化消息发布失败:!");}//组织响应response->set_request_id(rid);response->set_success(true);response->mutable_message()->CopyFrom(message);for(const auto &id : target_list) {response->add_target_id_list(id);}}~TransmiteServiceImpl(){}private://用户子服务调用相关信息std::string _user_service_name;ServiceManager::ptr _mm_channels;//聊天会话成员表的操作句柄ChatSessionMemeberTable::ptr _mysql_session_member_table;//消息队列客户端句柄std::string _exchange_name;std::string _routing_key;MQClient::ptr _mq_client;};
}

(3)创建TransmiteServer类来搭建RPC服务器:

namespace MyTest
{class TransmiteServer  {public:using ptr = std::shared_ptr<TransmiteServer>;TransmiteServer(const std::shared_ptr<odb::core::database> &mysql_client,const Discovery::ptr discovery_client,const Registry::ptr reg_client,const std::shared_ptr<brpc::Server> &server):_service_discoverer(discovery_client),_registry_client(reg_client),_mysql_client(mysql_client),_rpc_server(server){}// 搭建RPC服务器,并启动服务器void start(){_rpc_server->RunUntilAskedToQuit();}~TransmiteServer(){}private:Discovery::ptr _service_discoverer; //服务发现客户端Registry::ptr _registry_client; // 服务注册客户端std::shared_ptr<odb::core::database> _mysql_client; //mysql数据库客户端std::shared_ptr<brpc::Server> _rpc_server;};
}

(4)创建工厂类TransmiteServerBuilder来实现消息转发子服务的创建以及Rpc服务器的创建:

namespace MyTest
{class TransmiteServerBuilder {public://构造mysql客户端对象void make_mysql_object(const std::string &user,const std::string &pswd,const std::string &host,const std::string &db,const std::string &cset,int port,int conn_pool_count) {_mysql_client = ODBFactory::create(user, pswd, host, db, cset, port, conn_pool_count);}//用于构造服务发现客户端&信道管理对象void make_discovery_object(const std::string &reg_host,const std::string &base_service_name,const std::string &user_service_name) {_user_service_name = user_service_name;_mm_channels = std::make_shared<ServiceManager>();_mm_channels->declared(user_service_name);LOG_DEBUG("设置用户子服务为需添加管理的子服务:{}", user_service_name);auto put_cb = std::bind(&ServiceManager::onServiceOnline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);_service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb);}//用于构造服务注册客户端对象void make_registry_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host){_registry_client = std::make_shared<Registry>(reg_host);_registry_client->registry(service_name, access_host);}//用于构造rabbitmq客户端对象void make_mq_object(const std::string &user, const std::string &passwd,const std::string &host,const std::string &exchange_name,const std::string &queue_name,const std::string &binding_key) {_routing_key = binding_key;_exchange_name = exchange_name;_mq_client = std::make_shared<MQClient>(user, passwd, host);_mq_client->declareComponents(exchange_name, queue_name, binding_key);}//构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads){if(!_mysql_client){LOG_ERROR("还未初始化Mysql数据库模块!");abort();}if(!_mm_channels){LOG_ERROR("还未初始化信道管理模块!");abort();}if(!_mq_client){LOG_ERROR("还未初始化消息队列客户端模块!");abort();}_rpc_server = std::make_shared<brpc::Server>();TransmiteServiceImpl *transmite_service = new TransmiteServiceImpl(_user_service_name, _mm_channels, _mysql_client, _exchange_name, _routing_key, _mq_client);int ret = _rpc_server->AddService(transmite_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if(ret == -1){LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if(ret == -1) {LOG_ERROR("服务启动失败!");abort();}}SpeechServer::ptr build(){if(!_service_discoverer){LOG_ERROR("还未初始化服务发现模块!");abort();}if(!_registry_client){LOG_ERROR("还未初始化服务注册模块!");abort();}if(!_rpc_server){LOG_ERROR("还未初始化RPC服务器模块!");abort();}TransmiteServer::ptr server = std::make_shared<TransmiteServer>(_mysql_client, _service_discoverer, _registry_client, _rpc_server);return server;}private:std::string _user_service_name;ServiceManager::ptr _mm_channels;Discovery::ptr _service_discoverer;std::string _routing_key;std::string _exchange_name;MQClient::ptr _mq_client;Registry::ptr _registry_client; // 服务注册客户端std::shared_ptr<odb::core::database> _mysql_client; //mysql数据库客户端std::shared_ptr<brpc::Server> _rpc_server;};
}

(5)实现消息转发子服务的服务器的搭建:

#include "transmite_server.hpp"//主要实现语音识别子服务的服务器的搭建DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");DEFINE_string(registry_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(instance_name, "/transmite_service/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:10004", "当前实例的外部访问地址");DEFINE_int32(listen_port, 10004, "Rpc服务器监听端口");
DEFINE_int32(rpc_timeout, -1, "Rpc调用超时时间");
DEFINE_int32(rpc_threads, 1, "Rpc的IO线程数量");DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(user_service, "/service/user_service", "用户管理子服务名称");DEFINE_string(mysql_host, "127.0.0.1", "Mysql服务器访问地址");
DEFINE_string(mysql_user, "root", "Mysql服务器访问用户名");
DEFINE_string(mysql_pswd, "123456", "Mysql服务器访问密码");
DEFINE_string(mysql_db, "bite_im", "Mysql默认库名称");
DEFINE_string(mysql_cset, "utf8", "Mysql客户端字符集");
DEFINE_int32(mysql_port, 0, "Mysql服务器访问端口");
DEFINE_int32(mysql_pool_count, 4, "Mysql连接池最大连接数量");DEFINE_string(mq_user, "root", "消息队列服务器访问用户名");
DEFINE_string(mq_pswd, "123456", "消息队列服务器访问密码");
DEFINE_string(mq_host, "127.0.0.1:5672", "消息队列服务器访问地址");
DEFINE_string(mq_msg_exchange, "msg_exchange", "持久化消息的发布交换机名称");
DEFINE_string(mq_msg_queue, "msg_queue", "持久化消息的发布队列名称");
DEFINE_string(mq_msg_binding_key, "msg_queue", "持久化消息的发布队列名称");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);bite_im::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);bite_im::TransmiteServerBuilder tsb;tsb.make_mq_object(FLAGS_mq_user, FLAGS_mq_pswd, FLAGS_mq_host,FLAGS_mq_msg_exchange, FLAGS_mq_msg_queue, FLAGS_mq_msg_binding_key);tsb.make_mysql_object(FLAGS_mysql_user, FLAGS_mysql_pswd, FLAGS_mysql_host, FLAGS_mysql_db, FLAGS_mysql_cset, FLAGS_mysql_port, FLAGS_mysql_pool_count);tsb.make_discovery_object(FLAGS_registry_host, FLAGS_base_service, FLAGS_user_service);tsb.make_rpc_server(FLAGS_listen_port, FLAGS_rpc_timeout, FLAGS_rpc_threads);tsb.make_registry_object(FLAGS_registry_host, FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);auto server = tsb.build();server->start();return 0;
}

(6)cmake构建代码:

# 1. 添加cmake版本说明
cmake_minimum_required(VERSION 3.1.3)
# 2. 声明工程名称
project(transmite_server)set(target "transmite_server")set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g")# 3. 检测并生成ODB框架代码
#   1. 添加所需的proto映射代码文件名称
set(proto_path ${CMAKE_CURRENT_SOURCE_DIR}/../proto)
set(proto_files base.proto user.proto transmite.proto)
#   2. 检测框架代码文件是否已经生成
set(proto_hxx "")
set(proto_cxx "")
set(proto_srcs "")
foreach(proto_file ${proto_files})
#   3. 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码string(REPLACE ".proto" ".pb.cc" proto_cc ${proto_file})string(REPLACE ".proto" ".pb.h" proto_hh  ${proto_file})if (NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}${proto_cc})add_custom_command(PRE_BUILDCOMMAND protocARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR} -I ${proto_path} --experimental_allow_proto3_optional ${proto_path}/${proto_file}DEPENDS ${proto_path}/${proto_file}OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc}COMMENT "生成Protobuf框架代码文件:" ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})endif()list(APPEND proto_srcs ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})
endforeach()# 3. 检测并生成ODB框架代码
#   1. 添加所需的odb映射代码文件名称
set(odb_path ${CMAKE_CURRENT_SOURCE_DIR}/../odb)
set(odb_files chat_session_member.hxx)
#   2. 检测框架代码文件是否已经生成
set(odb_hxx "")
set(odb_cxx "")
set(odb_srcs "")
foreach(odb_file ${odb_files})
#   3. 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码string(REPLACE ".hxx" "-odb.hxx" odb_hxx ${odb_file})string(REPLACE ".hxx" "-odb.cxx" odb_cxx ${odb_file})if (NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}${odb_cxx})add_custom_command(PRE_BUILDCOMMAND odbARGS -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time ${odb_path}/${odb_file}DEPENDS ${odb_path}/${odb_file}OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx}COMMENT "生成ODB框架代码文件:" ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx})endif()
#   4. 将所有生成的框架源码文件名称保存起来 student-odb.cxx classes-odb.cxxlist(APPEND odb_srcs ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx})
endforeach()# 4. 获取源码目录下的所有源码文件
set(src_files "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/source src_files)
# 5. 声明目标及依赖
add_executable(${target} ${src_files} ${proto_srcs} ${odb_srcs})
# 7. 设置需要连接的库
target_link_libraries(${target} -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl -lodb-mysql -lodb -lodb-boost-lamqpcpp -lev)set(trans_user_client "trans_user_client")
set(trans_user_files ${CMAKE_CURRENT_SOURCE_DIR}/test/user_client.cc)
add_executable(${trans_user_client} ${trans_user_files} ${proto_srcs})
target_link_libraries(${trans_user_client} -pthread -lgtest -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl /usr/lib/x86_64-linux-gnu/libjsoncpp.so.19)set(transmite_client "transmite_client")
set(transmite_files ${CMAKE_CURRENT_SOURCE_DIR}/test/transmite_client.cc)
add_executable(${transmite_client} ${transmite_files} ${proto_srcs})
target_link_libraries(${transmite_client} -pthread -lgtest -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl /usr/lib/x86_64-linux-gnu/libjsoncpp.so.19)# 6. 设置头文件默认搜索路径
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../common)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../odb)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../third/include)#8. 设置安装路径
INSTALL(TARGETS ${target} ${trans_user_client} ${transmite_client} RUNTIME DESTINATION bin)

5. 服务端小结

5.1 语言识别模块总结

5.2 文件存储模块总结

5.3 用户管理模块总结

5.4 消息转发模块总结

目前已经完成服务器的四个功能子模块,剩下的各个子模块的服务实现,见博客:https://blog.csdn.net/m0_65558082/article/details/144088032?spm=1001.2014.3001.5502。

服务端整体代码链接:https://gitee.com/liu-yechi/new_code/tree/master/chat_system/server。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com