欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > 3.1.3 MYSQL连接池

3.1.3 MYSQL连接池

2025/4/1 1:30:50 来源:https://blog.csdn.net/weixin_52423494/article/details/146600390  浏览:    关键词:3.1.3 MYSQL连接池

文章目录

  • 3.1.3 MYSQL连接池
    • 1.维持管理固定数量的链接,复用连接资源
    • 2. MYSQL网络模型
    • 3. MYSQL连接驱动使用
    • 4. 同步连接池
      • 1. 执行一个sql语句,怎么拿到数据库返回值?
      • 2. MySQL 执行流程
      • 3. 同步连接池的使用场景
      • 4. 连接池的线程管理
      • 5. 为什么服务器初始化要同步,不用异步连接池
    • 5. 异步连接池
      • 1. 使用场景
      • 2. 核心特点
      • 3. 线程池机制
      • 4. 线程安全保障
      • 5. 任务调度方式
    • 5. 连接池实现
      • 1. 连接池类型
      • 2. SQL 处理流程
      • 3. C++11 连接池异步任务管理

3.1.3 MYSQL连接池

1.维持管理固定数量的链接,复用连接资源

  1. MySQL 连接池的核心是通过 MySQL 驱动 来管理数据库连接。MySQL 服务器端和客户端(如应用程序)需要通过 特定的协议 进行通信,而这个协议需要处理 编码(encode)解码(decode),以保证数据的正确传输。

  2. 连接池对应着MYSQL里面的一个database,一个连接池通常对应一个 database,多个database就要多个连接池,可以访问database中的任何表(有权限)

  3. 在数据库应用中,每次建立和释放数据库连接的成本很高。连接池的主要作用是:

    • 预先创建 一定数量的数据库连接,避免频繁创建销毁连接带来的开销。
    • 复用已有的连接,提高数据库访问效率。
    • 控制最大连接数,防止数据库服务器过载

2. MYSQL网络模型

  1. 主线程的作用
    MySQL 服务器采用 事件驱动 的 Reactor 模型 来管理客户端连接,其主线程的主要任务包括:

    1. 监听新的客户端连接(listenfd 监听文件描述符)
    2. 检查已有连接是否有新数据(select 或其他 I/O 多路复用机制,如 epoll)
    3. 将新的连接分配给工作线程
  2. 同一条连接收到多个请求,串行执行。不同连接之间的请求是可以并行执行的,因为它们是由不同的工作线程处理的


3. MYSQL连接驱动使用

  1. ubuntu安装libmysqlclient-dev
  2. 加载mysql.h以及静态库或者动态库
  3. 初始化连接mysql_library_init
  4. 使用mysql连接驱动与MYSQL进行交互(执行SQL语句)
  5. 释放连接资源mysql_library_end

4. 同步连接池

1. 执行一个sql语句,怎么拿到数据库返回值?

同步 vs. 异步,执行 SQL 语句时:
- 同步 (Synchronous):调用 mysql_query() 直接等待 MySQL 返回结果。
- 异步 (Asynchronous):在 线程池 或 事件驱动 机制中执行 SQL,调用线程不会被阻塞

2. MySQL 执行流程

特征:使用阻塞的io
mysql_query:

  1. sql通过MYSQL协议打包
  2. 数据send/write
  3. read会阻塞线程等待MYSQL的返回
  4. 确定是一个完整的回应包之后,协议解析,将结果返回
    所以是一个耗时操作,需要优化

在这里插入图片描述

3. 同步连接池的使用场景

适用于服务器初始化

  1. 配置加载:如拉取用户权限、系统参数、缓存预加载等。
  2. 数据预处理:如初始化 热点数据,避免后续频繁数据库查询。
  3. 启动阶段任务:必须确保 数据加载完成 后,服务器才能提供服务

4. 连接池的线程管理

  1. 当前线程获取数据库连接
    数据库连接池 维护 一定数量的数据库连接,避免频繁创建/销毁连接的开销。
    线程 从连接池获取可用连接,执行 SQL 语句,完成后释放回池
  2. 线程安全
    加锁管理连接池,防止多个线程同时访问数据库时发生竞争。
    允许多个线程/协程访问数据库,但最大连接数受 数据库服务器配置 限制
对比项线程(Thread)协程(Coroutine)
调度方式操作系统内核 负责调度用户态 调度(应用层控制)
切换成本(涉及内核态切换)(仅寄存器、堆栈切换)
并行性可以利用多核 CPU单线程运行,但可以高效调度
适用场景CPU 计算密集型(如多线程计算、数据处理)I/O 密集型(如爬虫、数据库连接池)
资源占用线程占用大量系统资源(栈、寄存器)协程占用资源少(轻量级任务)
创建销毁开销较大(需要系统调用)较小(在用户态创建/销毁)
编程模型传统 多线程编程,容易产生 锁竞争、死锁异步编程,代码更复杂,但性能更高

5. 为什么服务器初始化要同步,不用异步连接池

  1. 服务器启动必须确保所有依赖数据已准备好
    同步执行可以保证数据完整性
    同步连接池会等待 SQL 查询执行完毕,确保所有关键数据已经正确加载。
    如果使用异步连接池,数据库查询可能在服务已启动后仍未完成,导致数据未准备好,影响服务的稳定性。

  2. 需要阻塞等待初始化完成
    避免并发问题
    服务器初始化阶段不需要高并发处理,只需要保证数据正确性。
    如果用异步连接池,某些数据可能在服务器启动后才返回,导致访问时出现未初始化数据错误

5. 异步连接池

在这里插入图片描述

1. 使用场景

适用于服务器正常运行后,所有涉及数据库查询的业务逻辑都应该采用异步连接池,以提高并发能力和响应速度。

2. 核心特点

核心线程数少:可以是一个或少量几个线程,通过事件驱动处理多个请求。

单位时间内请求数受限:核心线程主要用于处理数据库返回值,而非阻塞等待数据库响应。

高吞吐量:避免阻塞,每个线程可以同时管理多个数据库查询。

3. 线程池机制

  1. 线程池管理任务队列:

任务(SQL 查询请求)进入任务队列;
线程池从任务队列中取出任务执行。

2 .发生阻塞的情况:
任务队列为空 → 线程等待新任务;
等待数据库返回 → 线程会切换处理其他任务,不会阻塞等待。

4. 线程安全保障

多线程环境必须保证线程安全:

采用**互斥锁(Mutex)**防止资源竞争;

采用**原子变量(Atomic)**保证操作的原子性。

5. 任务调度方式

具体执行的SQL 任务与数据库连接无关,可以动态分配连接。
允许多个连接同时访问数据库,提高并发处理能力。

5. 连接池实现

1. 连接池类型

同步连接池:请求阻塞等待数据库返回结果。
异步连接池:请求非阻塞,线程继续处理其他任务,数据库返回结果后进行回调处理。

2. SQL 处理流程

SQL 语句在数据库执行前,会经过以下几个阶段:

  1. 输入 SQL 语句:用户提交 SQL 查询。

  2. 解析阶段:
    词法分析(Lexical Analysis):将 SQL 语句拆分为关键字、标识符、运算符等。
    语法分析(Syntax Analysis):构建 SQL 语法树,检查 SQL 是否符合语法规则。

  3. 查询优化:

过滤器(Filter):剔除无效的 SQL 语句,提升查询效率。
优化器(Optimizer):选择最优的执行计划,提高查询性能。
存储引擎执行 SQL:根据执行计划,调用存储引擎执行查询。

  1. 返回结果:数据库执行完成后,返回查询结果。

3. C++11 连接池异步任务管理

在 C++11 及以上版本,异步任务管理可借助 Promise-Future 机制 实现:
promise 任务执行后存储结果
future 读取 promise 传递的结果

#include <iostream>
#include <future>
#include <thread>int database_query() {std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟 SQL 查询耗时return 42; // 查询结果
}int main() {std::promise<int> prom;std::future<int> fut = prom.get_future();std::thread t([&prom]() {int result = database_query();prom.set_value(result); // 传递查询结果});std::cout << "Waiting for database query result..." << std::endl;std::cout << "Query result: " << fut.get() << std::endl; // 获取 SQL 结果t.join();return 0;
}

运行流程
主线程创建 promise 并获取 future。
新线程执行 database_query(),查询完成后调用 prom.set_value(result) 传递查询结果。
主线程阻塞等待 fut.get() 获取 SQL 查询结果

#include <vector>
#include <memory>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <mysql.h>/*** @brief 数据库连接池类模板* @tparam T 数据库类型*/
template <class T>
class DatabaseWorkerPool {
public:/*** @brief 构造函数*/DatabaseWorkerPool();/*** @brief 析构函数*/~DatabaseWorkerPool();/*** @brief 设置连接信息* @param infoString 连接字符串* @param asyncThreads 异步连接数* @param synchThreads 同步连接数*/void SetConnectionInfo(const std::string& infoString, uint8 asyncThreads, uint8 synchThreads);/*** @brief 打开连接池* @return 错误码,0表示成功*/uint32 Open();/*** @brief 关闭连接池*/void Close();/*** @brief 获取一个空闲连接* @return 数据库连接指针*/T* GetFreeConnection();/*** @brief 执行查询* @param sql SQL语句* @return 查询结果*/QueryResult Query(const char* sql);private:/*** @brief 打开指定数量的连接* @param type 连接类型(同步/异步)* @param numConnections 连接数量* @return 错误码,0表示成功*/uint32 OpenConnections(int type, uint8 numConnections);/*** @brief 获取数据库名称* @return 数据库名称*/const char* GetDatabaseName() const;// 连接类型枚举enum {IDX_SYNCH = 0,  // 同步连接IDX_ASYNC = 1,  // 异步连接IDX_MAX         // 最大索引};// 连接信息结构体struct MySQLConnectionInfo {std::string host;std::string user;std::string password;std::string database;uint16 port;std::string ssl;};std::unique_ptr<MySQLConnectionInfo> _connectionInfo;  // 连接信息std::vector<std::unique_ptr<T>> _connections[IDX_MAX]; // 连接池(同步和异步)uint8 _async_threads;  // 异步连接数uint8 _synch_threads;  // 同步连接数std::mutex _mutex;     // 互斥锁std::condition_variable _condition; // 条件变量
};// 实现部分template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool(): _async_threads(0), _synch_threads(0) {// 检查MySQL库是否线程安全if (!mysql_thread_safe()) {throw std::runtime_error("MySQL library is not thread-safe");}
}template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool() {Close();
}template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(const std::string& infoString, uint8 asyncThreads, uint8 synchThreads) {// 解析连接字符串并存储_connectionInfo = std::make_unique<MySQLConnectionInfo>();// 这里简化处理,实际应该解析infoString_connectionInfo->host = "localhost";_connectionInfo->user = "root";_connectionInfo->password = "";_connectionInfo->database = "test";_connectionInfo->port = 3306;_async_threads = asyncThreads;_synch_threads = synchThreads;
}template <class T>
uint32 DatabaseWorkerPool<T>::Open() {if (!_connectionInfo) {throw std::runtime_error("Connection info was not set!");}// 打开异步连接uint32 error = OpenConnections(IDX_ASYNC, _async_threads);if (error) return error;// 打开同步连接error = OpenConnections(IDX_SYNCH, _synch_threads);return error;
}template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(int type, uint8 numConnections) {std::lock_guard<std::mutex> lock(_mutex);for (uint8 i = 0; i < numConnections; ++i) {// 创建新连接auto connection = std::make_unique<T>();// 初始化连接if (!connection->Open(_connectionInfo.get())) {return 1; // 简化错误处理}// 添加到连接池_connections[type].push_back(std::move(connection));}return 0;
}template <class T>
void DatabaseWorkerPool<T>::Close() {std::lock_guard<std::mutex> lock(_mutex);// 关闭所有连接for (auto& connList : _connections) {connList.clear();}
}template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection() {std::unique_lock<std::mutex> lock(_mutex);// 优先从同步连接池获取for (auto& conn : _connections[IDX_SYNCH]) {if (conn->TryLock()) {return conn.get();}}// 如果没有可用的同步连接,等待_condition.wait(lock, [this]() {for (auto& conn : _connections[IDX_SYNCH]) {if (conn->TryLock()) {return true;}}return false;});// 再次尝试获取for (auto& conn : _connections[IDX_SYNCH]) {if (conn->TryLock()) {return conn.get();}}return nullptr; // 不应该执行到这里
}template <class T>
QueryResult DatabaseWorkerPool<T>::Query(const char* sql) {auto connection = GetFreeConnection();if (!connection) {return QueryResult(nullptr);}// 执行查询ResultSet* result = connection->Query(sql);connection->Unlock();_condition.notify_one(); // 通知其他等待的线程if (!result || !result->GetRowCount() || !result->NextRow()) {delete result;return QueryResult(nullptr);}return QueryResult(result);
}template <class T>
const char* DatabaseWorkerPool<T>::GetDatabaseName() const {return _connectionInfo ? _connectionInfo->database.c_str() : "";
}
Client DatabaseWorkerPool ConnectionPool MySQLConnection SetConnectionInfo() 存储连接配置 Open() OpenConnections(IDX_ASYNC) 创建连接对象 连接成功 OpenConnections(IDX_SYNCH) 创建连接对象 连接成功 Query(SQL) GetFreeConnection() 返回连接 等待(_condition.wait) alt [有可用连接] [无可用连接] Execute(SQL) 返回结果 ReleaseConnection() _condition.notify_one() 返回结果 Close() 清理所有连接 Client DatabaseWorkerPool ConnectionPool MySQLConnection

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com