本文主要探讨mysql连接池的实现。
readme
*****************************************************mysql连接池
*****************************************************概述:高并发情况下,大量TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手耗费显著,本项目主要用连接池的方式提高mysql的访问瓶颈.主要文件:run.sh 项目运行脚本clean.sh 编译清理脚本CMakeLists.txt 编译文件log.h 调试信息打印接口connection.h mysql操作库connection.cpp mysql操作库实现connection_pool.h mysql连接池库connection_pool.cpp mysql连接池库实现mysql.cnf mysql配置文件main.cpp mysql连接池测试文件编译运行:main.cpp中有连接池测试范例,也可自行修改范例sh run.sh 运行该指令可编译运行项目输出测试范例结果
项目文件结构
安转mysql-8.0
apt install -y mysql-server-8.0 libmysqlclient-devsystemctl start mysqlsystemctl enable mysqlsystemctl status mysqlmysql_secure_installationmysql -u root -palter user "root"@"localhost" IDENTIFIED BY '123456';mysql -u root -p=123456############## mysql.h位于/usr/include/mysql############## 也可通过mysql源码编译安装 wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.41.tar.gz
项目运行结果
run.sh
#!/bin/bashif [ -f ./Makefile ]
thenmake clean
ficmake .makeecho "---------------------------------"./pro
clean.sh
#!/bin/bashrm -rf CMakeCache.txt CMakeFiles Makefile cmake_install.cmake *.o pro
CMakeLists.txt
###################################
#
# mysql连接池编译连接文件
#
#
####################################最低版本要求
CMAKE_MINIMUM_REQUIRED(VERSION 2.20) #设置g++编译器
SET(CMAKE_CXX_COMPILER "g++-11") #添加编译选项
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g")#设置工程名
PROJECT(CLASS) #打印消息
MESSAGE(STATUS "mysql connection pool") #查找动态库
find_library(MYSQL_LIBRARY mysqlclient NAMES mysqlclient PATHS "/usr/lib/x86_64-linux-gnu/" ) #生成可执行文件
ADD_EXECUTABLE(pro main.cpp connection.cpp connection_pool.cpp) #链库
target_link_libraries(pro ${MYSQL_LIBRARY})
log.h
/** 日志调试打印* 格式:时间 文件:函数:行号:str*/#ifndef __LOG__
#define __LOG__#define LOG_PRINT(str) \std::cout << __TIMESTAMP__ << " " << __FILE__ << ":" << __FUNCTION__ << ":" \<< __LINE__ << ":" << str << std::endl;
#endif
connection.h
/** mysql相关操作:* 初始化连接数据库* 释放数据库连接* 连接数据库* 数据增删改* 数据查询* 数据库连接时长反馈* 数据库连接时长刷新*/#include <mysql/mysql.h>
#include <ctime>
#include <string>
#include <iostream>class Connection
{public://初始化数据库连接Connection();//释放数据库连接~Connection();//连接数据库bool connect_mysql(std::string ip, unsigned int port, std::string user, \std::string password, std::string dbname);//数据增删改bool idu_mysql(std::string sql);//数据查询bool query_mysql(std::string sql);//数据库连接时长刷新void refresh_alive_time(){alive_time = clock();}//数据库连接时长反馈std::clock_t get_alive_time(){return clock() - alive_time;}private:MYSQL *conn; //数据库连接句柄std::clock_t alive_time;//数据库连接存活时间 //
}; //end of class Connection
connection.cpp
/** mysql相关操作:* 初始化连接数据库* 释放数据库连接* 连接数据库* 数据增删改* 数据查询*/#include "connection.h"
#include "log.h"//初始化数据连接
Connection::Connection()
{conn = mysql_init(nullptr);if(conn == nullptr)LOG_PRINT("mysql init error");
}//释放数据库连接
Connection::~Connection()
{if(conn != nullptr)mysql_close(conn);
}//数据库连接
bool Connection::connect_mysql(std::string ip, unsigned int port, std::string user, \std::string password, std::string dbname)
{MYSQL *con = mysql_real_connect(conn,ip.c_str(), user.c_str(),password.c_str(), dbname.c_str(), port, nullptr, 0);return con != nullptr;
}//数据库增删改
bool Connection::idu_mysql(std::string sql)
{if((mysql_query(conn, sql.c_str()))){LOG_PRINT("idu error:" + sql);return false;}return true;
}//数据库查询
bool Connection::query_mysql(std::string sql)
{if(mysql_query(conn, sql.c_str())){LOG_PRINT("select error:" + sql);return false;}MYSQL_RES *ret = mysql_store_result(conn);if(ret == nullptr){LOG_PRINT("select error:" + sql);return false;}MYSQL_ROW row;MYSQL_FIELD* filed;while((filed = mysql_fetch_field(ret)))std::cout << filed->name << "\t";std::cout << std::endl;while((row = mysql_fetch_row(ret))){for(int i = 0; i < mysql_num_fields(ret); i++){std::cout << row[i] << "\t";}std::cout << std::endl;}mysql_free_result(ret);return true;
}
connection_pool.h
/** mysql连接池相关:* 获取连接池实例* 获取空闲连接* 读取mysql配置* 生产连接* 回收超时链接*/#include <memory>
#include <queue>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <list>#include "connection.h"
#include "log.h"class Connection_Pool
{public://获取连接池实例static Connection_Pool *get_pool();//获取空闲链接std::shared_ptr<Connection> get_connection();private://懒汉单例(构造私有化)Connection_Pool();//读取mysql配置bool load_mysql_cnf();//生产新链接void manufacture_connection();//回收超时链接void recycle_timeout_connection();std::string ip;unsigned int port;std::string user;std::string password;std::string dbname;unsigned int init_conn_size; //连接池初始链接量unsigned int max_conn_size; //连接池最大连接量unsigned int wait_conn_alive_max_timeout; //未使用连接存活最大时时间unsigned int get_max_timeout; //获取连接池链接等待的最大时间std::queue<Connection*> conn_queue; //mysql链接队列std::mutex queue_mutex; //队列安全互斥锁std::atomic_uint conn_cnt; //连接池的连接量std::condition_variable cv; //线程间通信
};
connection_pool.cpp
/** mysql连接池相关:* 获取连接池实例* 获取空闲连接* 读取mysql配置* 生产连接* 回收超时链接*/
#include <fstream>
#include <sstream>
#include <map>
#include <functional>#include "connection_pool.h"Connection_Pool* Connection_Pool::get_pool()
{static Connection_Pool conn_pool;return &conn_pool;
}//获取空闲连接
std::shared_ptr<Connection> Connection_Pool::get_connection()
{std::unique_lock<std::mutex> lock(queue_mutex);while(conn_queue.empty()){if(std::cv_status::timeout == cv.wait_for(lock,std::chrono::milliseconds(get_max_timeout))){if(conn_queue.empty()){LOG_PRINT("get connection timeout...");return nullptr;}}}//自定链接义删除器,使用完归还到队列,未定义则会调用析构关闭连接std::shared_ptr<Connection> con(conn_queue.front(),[&](Connection *c){std::unique_lock<std::mutex> lock(queue_mutex);c->refresh_alive_time();conn_queue.push(c);});conn_queue.pop();cv.notify_all();return con;
}bool Connection_Pool::load_mysql_cnf()
{std::map<std::string,std::string> config;std::string line;std::ifstream file("mysql.cnf");if(!file.is_open()){LOG_PRINT("open mysql.cnf error");return false;}while(getline(file,line)){std::istringstream is_line(line);std::string key;if(getline(is_line,key,'=')){std::string value;if(getline(is_line,value))config[key] = value;}}ip = config["ip"];port = atoi(config["port"].c_str());user = config["user"];password = config["password"];dbname = config["dbname"];init_conn_size = atoi(config["init_conn_size"].c_str());max_conn_size = atoi(config["max_conn_size"].c_str());wait_conn_alive_max_timeout = atoi(config["wait_conn_alive_max_timeout"].c_str());get_max_timeout = atoi(config["conn_pool_timeout"].c_str());return true;
}Connection_Pool::Connection_Pool()
{//加载mysql配置if(!load_mysql_cnf()){LOG_PRINT("load mysql config fail");}//创建初始链接for(int i = 0; i < init_conn_size; i++){Connection *con = new Connection();con->connect_mysql(ip, port, user, password, dbname); //链接mysqlcon->refresh_alive_time(); //刷新链接时间conn_cnt++; //链接池链接量++}//生产连接线程std::thread manu_conn(std::bind(&Connection_Pool::manufacture_connection,this));manu_conn.detach();//回收超时链接线程std::thread recycle_conn(std::thread(std::bind(&Connection_Pool::recycle_timeout_connection,this)));recycle_conn.detach();
}void Connection_Pool::manufacture_connection()
{for(;;){std::unique_lock<std::mutex> lock(queue_mutex);while(!conn_queue.empty()){cv.wait(lock); //未使用链接线程不为空则等待使用}//建立链接数小于最大链接数构建新链接if(conn_cnt < max_conn_size){Connection *con = new Connection();con->connect_mysql(ip, port, user, password, dbname); //链接mysqlcon->refresh_alive_time(); //刷新链接时间conn_cnt++; //链接池链接量++conn_queue.push(con);}cv.notify_all();}
}void Connection_Pool::recycle_timeout_connection()
{for(;;){std::this_thread::sleep_for(std::chrono::seconds(wait_conn_alive_max_timeout));std::unique_lock<std::mutex> lock(queue_mutex);while(conn_cnt > init_conn_size){//对头元素为存活时间最大的链接,若队头链接时间小于最大存活时间则整个队列时间均小于Connection *con = conn_queue.front();if(con->get_alive_time() >= (wait_conn_alive_max_timeout *1000)){conn_queue.pop();conn_cnt--;delete con; //调用析构关闭连接}else{break;}}}
}
mysql.cnf
ip=localhost
port=3306
user=root
password=123456
dbname=test
init_conn_size=10
max_conn_size=1024
#秒
wait_conn_alive_max_timeout=60
#毫秒
get_max_timeout=100
main.cpp
#include <cstring>
#include <ctime>
#include "connection_pool.h"int main()
{clock_t begin = clock();char sql1[1024] = { 0 };char sql2[1024] = { 0 };char sql3[1024] = { 0 };char sql4[1024] = { 0 };Connection_Pool *cp = Connection_Pool::get_pool();std::thread t1([&sql1,&cp]() {for (int i = 0; i < 2500; ++i){std::shared_ptr<Connection> sp ;while(1){sp = cp->get_connection();if(sp != nullptr)break;}memset(sql1,0,1024);sprintf(sql1,"insert into user(id,name,saving_comming) values(%d,'cxb1',1000)",i);sp->idu_mysql(sql1);}});std::thread t2([&sql2,&cp]() {for (int i = 0; i < 2500; ++i){std::shared_ptr<Connection> sp ;while(1){sp = cp->get_connection();if(sp != nullptr)break;}memset(sql2,0,1024);sprintf(sql2,"insert into user(id,name,saving_comming) values(%d,'cxb2',1000)",i);sp->idu_mysql(sql2);}});std::thread t3([&sql3,&cp]() {std::shared_ptr<Connection> sp ;while(1){sp = cp->get_connection();if(sp != nullptr)break;}for (int i = 0; i < 2500; ++i){memset(sql3,0,1024);sprintf(sql3,"insert into user(id,name,saving_comming) values(%d,'cxb3',1000)",i);sp->idu_mysql(sql3);}});std::thread t4([&sql4,&cp](){std::shared_ptr<Connection> sp ;while(1){sp = cp->get_connection();if(sp != nullptr)break;}for (int i = 0; i < 2500; ++i){memset(sql4,0,1024);sprintf(sql4,"insert into user(id,name,saving_comming) values(%d,'cxb4',1000)",i);sp->idu_mysql(sql4);}});t1.join();t2.join();t3.join();t4.join();clock_t end = clock();std::cout << "time:" << (static_cast<double>(end - begin) / CLOCKS_PER_SEC) << "s" << std::endl;return 0;
}