文章目录
- 3.1.3 MYSQL连接池
- 1.维持管理固定数量的链接,复用连接资源
- 2. MYSQL网络模型
- 3. MYSQL连接驱动使用
- 4. 同步连接池
- 1. 执行一个sql语句,怎么拿到数据库返回值?
- 2. MySQL 执行流程
- 3. 同步连接池的使用场景
- 4. 连接池的线程管理
- 5. 为什么服务器初始化要同步,不用异步连接池
- 5. 异步连接池
- 1. 使用场景
- 2. 核心特点
- 3. 线程池机制
- 4. 线程安全保障
- 5. 任务调度方式
- 5. 连接池实现
- 1. 连接池类型
- 2. SQL 处理流程
- 3. C++11 连接池异步任务管理
3.1.3 MYSQL连接池
1.维持管理固定数量的链接,复用连接资源
-
MySQL 连接池的核心是通过 MySQL 驱动 来管理数据库连接。MySQL 服务器端和客户端(如应用程序)需要通过 特定的协议 进行通信,而这个协议需要处理 编码(encode) 和 解码(decode),以保证数据的正确传输。
-
连接池对应着MYSQL里面的一个database,一个连接池通常对应一个 database,多个database就要多个连接池,可以访问database中的任何表(有权限)
-
在数据库应用中,每次建立和释放数据库连接的成本很高。连接池的主要作用是:
- 预先创建 一定数量的数据库连接,避免频繁创建销毁连接带来的开销。
- 复用已有的连接,提高数据库访问效率。
- 控制最大连接数,防止数据库服务器过载
2. MYSQL网络模型
-
主线程的作用
MySQL 服务器采用 事件驱动 的 Reactor 模型 来管理客户端连接,其主线程的主要任务包括:- 监听新的客户端连接(listenfd 监听文件描述符)
- 检查已有连接是否有新数据(select 或其他 I/O 多路复用机制,如 epoll)
- 将新的连接分配给工作线程
-
同一条连接收到多个请求,串行执行。不同连接之间的请求是可以并行执行的,因为它们是由不同的工作线程处理的
3. MYSQL连接驱动使用
- ubuntu安装libmysqlclient-dev
- 加载mysql.h以及静态库或者动态库
- 初始化连接mysql_library_init
- 使用mysql连接驱动与MYSQL进行交互(执行SQL语句)
- 释放连接资源mysql_library_end
4. 同步连接池
1. 执行一个sql语句,怎么拿到数据库返回值?
同步 vs. 异步,执行 SQL 语句时:
- 同步 (Synchronous):调用 mysql_query() 直接等待 MySQL 返回结果。
- 异步 (Asynchronous):在 线程池 或 事件驱动 机制中执行 SQL,调用线程不会被阻塞
2. MySQL 执行流程
特征:使用阻塞的io
mysql_query:
- sql通过MYSQL协议打包
- 数据send/write
- read会阻塞线程等待MYSQL的返回
- 确定是一个完整的回应包之后,协议解析,将结果返回
所以是一个耗时操作,需要优化
3. 同步连接池的使用场景
适用于服务器初始化
- 配置加载:如拉取用户权限、系统参数、缓存预加载等。
- 数据预处理:如初始化 热点数据,避免后续频繁数据库查询。
- 启动阶段任务:必须确保 数据加载完成 后,服务器才能提供服务
4. 连接池的线程管理
- 当前线程获取数据库连接
数据库连接池 维护 一定数量的数据库连接,避免频繁创建/销毁连接的开销。
线程 从连接池获取可用连接,执行 SQL 语句,完成后释放回池 - 线程安全
加锁管理连接池,防止多个线程同时访问数据库时发生竞争。
允许多个线程/协程访问数据库,但最大连接数受 数据库服务器配置 限制
对比项 | 线程(Thread) | 协程(Coroutine) |
---|---|---|
调度方式 | 操作系统内核 负责调度 | 用户态 调度(应用层控制) |
切换成本 | 高(涉及内核态切换) | 低(仅寄存器、堆栈切换) |
并行性 | 可以利用多核 CPU | 单线程运行,但可以高效调度 |
适用场景 | CPU 计算密集型(如多线程计算、数据处理) | I/O 密集型(如爬虫、数据库连接池) |
资源占用 | 线程占用大量系统资源(栈、寄存器) | 协程占用资源少(轻量级任务) |
创建销毁开销 | 较大(需要系统调用) | 较小(在用户态创建/销毁) |
编程模型 | 传统 多线程编程,容易产生 锁竞争、死锁 | 异步编程,代码更复杂,但性能更高 |
5. 为什么服务器初始化要同步,不用异步连接池
-
服务器启动必须确保所有依赖数据已准备好
同步执行可以保证数据完整性
同步连接池会等待 SQL 查询执行完毕,确保所有关键数据已经正确加载。
如果使用异步连接池,数据库查询可能在服务已启动后仍未完成,导致数据未准备好,影响服务的稳定性。 -
需要阻塞等待初始化完成
避免并发问题
服务器初始化阶段不需要高并发处理,只需要保证数据正确性。
如果用异步连接池,某些数据可能在服务器启动后才返回,导致访问时出现未初始化数据错误
5. 异步连接池
1. 使用场景
适用于服务器正常运行后,所有涉及数据库查询的业务逻辑都应该采用异步连接池,以提高并发能力和响应速度。
2. 核心特点
核心线程数少:可以是一个或少量几个线程,通过事件驱动处理多个请求。
单位时间内请求数受限:核心线程主要用于处理数据库返回值,而非阻塞等待数据库响应。
高吞吐量:避免阻塞,每个线程可以同时管理多个数据库查询。
3. 线程池机制
- 线程池管理任务队列:
任务(SQL 查询请求)进入任务队列;
线程池从任务队列中取出任务执行。
2 .发生阻塞的情况:
任务队列为空 → 线程等待新任务;
等待数据库返回 → 线程会切换处理其他任务,不会阻塞等待。
4. 线程安全保障
多线程环境必须保证线程安全:
采用**互斥锁(Mutex)**防止资源竞争;
采用**原子变量(Atomic)**保证操作的原子性。
5. 任务调度方式
具体执行的SQL 任务与数据库连接无关,可以动态分配连接。
允许多个连接同时访问数据库,提高并发处理能力。
5. 连接池实现
1. 连接池类型
同步连接池:请求阻塞等待数据库返回结果。
异步连接池:请求非阻塞,线程继续处理其他任务,数据库返回结果后进行回调处理。
2. SQL 处理流程
SQL 语句在数据库执行前,会经过以下几个阶段:
-
输入 SQL 语句:用户提交 SQL 查询。
-
解析阶段:
词法分析(Lexical Analysis):将 SQL 语句拆分为关键字、标识符、运算符等。
语法分析(Syntax Analysis):构建 SQL 语法树,检查 SQL 是否符合语法规则。 -
查询优化:
过滤器(Filter):剔除无效的 SQL 语句,提升查询效率。
优化器(Optimizer):选择最优的执行计划,提高查询性能。
存储引擎执行 SQL:根据执行计划,调用存储引擎执行查询。
- 返回结果:数据库执行完成后,返回查询结果。
3. C++11 连接池异步任务管理
在 C++11 及以上版本,异步任务管理可借助 Promise-Future 机制 实现:
promise 任务执行后存储结果
future 读取 promise 传递的结果
#include <iostream>
#include <future>
#include <thread>int database_query() {std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟 SQL 查询耗时return 42; // 查询结果
}int main() {std::promise<int> prom;std::future<int> fut = prom.get_future();std::thread t([&prom]() {int result = database_query();prom.set_value(result); // 传递查询结果});std::cout << "Waiting for database query result..." << std::endl;std::cout << "Query result: " << fut.get() << std::endl; // 获取 SQL 结果t.join();return 0;
}
运行流程
主线程创建 promise 并获取 future。
新线程执行 database_query(),查询完成后调用 prom.set_value(result) 传递查询结果。
主线程阻塞等待 fut.get() 获取 SQL 查询结果
#include <vector>
#include <memory>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <mysql.h>/*** @brief 数据库连接池类模板* @tparam T 数据库类型*/
template <class T>
class DatabaseWorkerPool {
public:/*** @brief 构造函数*/DatabaseWorkerPool();/*** @brief 析构函数*/~DatabaseWorkerPool();/*** @brief 设置连接信息* @param infoString 连接字符串* @param asyncThreads 异步连接数* @param synchThreads 同步连接数*/void SetConnectionInfo(const std::string& infoString, uint8 asyncThreads, uint8 synchThreads);/*** @brief 打开连接池* @return 错误码,0表示成功*/uint32 Open();/*** @brief 关闭连接池*/void Close();/*** @brief 获取一个空闲连接* @return 数据库连接指针*/T* GetFreeConnection();/*** @brief 执行查询* @param sql SQL语句* @return 查询结果*/QueryResult Query(const char* sql);private:/*** @brief 打开指定数量的连接* @param type 连接类型(同步/异步)* @param numConnections 连接数量* @return 错误码,0表示成功*/uint32 OpenConnections(int type, uint8 numConnections);/*** @brief 获取数据库名称* @return 数据库名称*/const char* GetDatabaseName() const;// 连接类型枚举enum {IDX_SYNCH = 0, // 同步连接IDX_ASYNC = 1, // 异步连接IDX_MAX // 最大索引};// 连接信息结构体struct MySQLConnectionInfo {std::string host;std::string user;std::string password;std::string database;uint16 port;std::string ssl;};std::unique_ptr<MySQLConnectionInfo> _connectionInfo; // 连接信息std::vector<std::unique_ptr<T>> _connections[IDX_MAX]; // 连接池(同步和异步)uint8 _async_threads; // 异步连接数uint8 _synch_threads; // 同步连接数std::mutex _mutex; // 互斥锁std::condition_variable _condition; // 条件变量
};// 实现部分template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool(): _async_threads(0), _synch_threads(0) {// 检查MySQL库是否线程安全if (!mysql_thread_safe()) {throw std::runtime_error("MySQL library is not thread-safe");}
}template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool() {Close();
}template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(const std::string& infoString, uint8 asyncThreads, uint8 synchThreads) {// 解析连接字符串并存储_connectionInfo = std::make_unique<MySQLConnectionInfo>();// 这里简化处理,实际应该解析infoString_connectionInfo->host = "localhost";_connectionInfo->user = "root";_connectionInfo->password = "";_connectionInfo->database = "test";_connectionInfo->port = 3306;_async_threads = asyncThreads;_synch_threads = synchThreads;
}template <class T>
uint32 DatabaseWorkerPool<T>::Open() {if (!_connectionInfo) {throw std::runtime_error("Connection info was not set!");}// 打开异步连接uint32 error = OpenConnections(IDX_ASYNC, _async_threads);if (error) return error;// 打开同步连接error = OpenConnections(IDX_SYNCH, _synch_threads);return error;
}template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(int type, uint8 numConnections) {std::lock_guard<std::mutex> lock(_mutex);for (uint8 i = 0; i < numConnections; ++i) {// 创建新连接auto connection = std::make_unique<T>();// 初始化连接if (!connection->Open(_connectionInfo.get())) {return 1; // 简化错误处理}// 添加到连接池_connections[type].push_back(std::move(connection));}return 0;
}template <class T>
void DatabaseWorkerPool<T>::Close() {std::lock_guard<std::mutex> lock(_mutex);// 关闭所有连接for (auto& connList : _connections) {connList.clear();}
}template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection() {std::unique_lock<std::mutex> lock(_mutex);// 优先从同步连接池获取for (auto& conn : _connections[IDX_SYNCH]) {if (conn->TryLock()) {return conn.get();}}// 如果没有可用的同步连接,等待_condition.wait(lock, [this]() {for (auto& conn : _connections[IDX_SYNCH]) {if (conn->TryLock()) {return true;}}return false;});// 再次尝试获取for (auto& conn : _connections[IDX_SYNCH]) {if (conn->TryLock()) {return conn.get();}}return nullptr; // 不应该执行到这里
}template <class T>
QueryResult DatabaseWorkerPool<T>::Query(const char* sql) {auto connection = GetFreeConnection();if (!connection) {return QueryResult(nullptr);}// 执行查询ResultSet* result = connection->Query(sql);connection->Unlock();_condition.notify_one(); // 通知其他等待的线程if (!result || !result->GetRowCount() || !result->NextRow()) {delete result;return QueryResult(nullptr);}return QueryResult(result);
}template <class T>
const char* DatabaseWorkerPool<T>::GetDatabaseName() const {return _connectionInfo ? _connectionInfo->database.c_str() : "";
}