2.4 线程池模式 (Thread Pool)
1 动机
在上文中,我们已经意识到了 Sync IO 调用会阻塞我们的线程。这期间对 CPU 反而是闲置,这就是我们常说的 I/O 密集任务。(另一个反面是计算密集型任务,会持续占用 CPU 进行有效计算。)
这种线程闲置也会导致同时提交到存储硬件的 IO 操作变少,很难榨干存储硬件能力,尤其是 SSD 介质。
因此,一种自然而然的模式就应运而生了。既然阻塞一个线程,那我们就用一组线程,提高整体的并发 —— 将所有同步 IO 交由一组线程执行。
接下来我们实现一个示例程序作为演示,包含 1 组 IO 线程,还有额外 1 个线程模拟用户的 rpc 读写逻辑。
图: 示例程序的线程分工
2 Code Snippet
2.1 简单的线程池实现
我们新建一个 IOThreadPool
类,构造时使用 std::thread
创建线程。RPC 模拟线程可以通过 enqueue
提交任务函数进入队列。工作线程从队列里取出任务并执行。
class IOThreadPool {
public:
explicit IOThreadPool(size_t num_threads) : stop(false) {
for (size_t i = 0; i < num_threads; ++i) {
// create worker thread
workers.emplace_back([this] {
// worker loop
while (true) {
std::function<void()> task;
// fetch task
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
// do task
task();
}
});
}
}
void enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.push(task);
}
condition.notify_one();
}
~IOThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic<bool> stop;
};
2.2 模拟用户 rpc 线程
创建一个线程来模拟用户读写的 rpc 执行。这里我们处理内存,提交 IO 任务,等待所有 IO 执行完成,最终进行数据验证。
这里我直接提交了 lambda 函数。由 IO Thread Pool 工作进程执行。lambda 函数内的 IO 操作完成后,设置同步原语标记这笔 IO 完成。
注意:我在写操作使用了 fsync
保证数据安全性。这将导致 IO 阻塞时间变长,以突出 IO 线程池的效果。
void simulate_user_rpc(IOThreadPool &pool, int fd) {
std::vector<std::shared_ptr<IOResult>> write_results;
auto start_time = std::chrono::high_resolution_clock::now();
// sequential write
for (int i = 0; i < IO_COUNT; ++i) {
auto write_buf = std::make_shared<std::array<char, BLOCK_SIZE>>();
char content_char = 'A' + (i % 26);
memset(write_buf->data(), content_char, BLOCK_SIZE);
off_t offset = i * BLOCK_SIZE;
auto result = std::make_shared<IOResult>();
write_results.push_back(result);
pool.enqueue([fd, offset, write_buf, result] {
ssize_t ret = pwrite(fd, write_buf->data(), BLOCK_SIZE, offset);
auto completed = true;
if (fsync(fd) == -1) {
std::cerr << "fsync failed" << std::endl;
completed = false;
}
{
std::lock_guard<std::mutex> lock(result->mutex);
result->fd = fd;
result->offset = offset;
result->result = ret;
result->completed = completed;
result->buffer = std::move(write_buf);
}
result->cv.notify_one();
});
}
// random read
std::vector<std::shared_ptr<IOResult>> read_results;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, IO_COUNT - 1);
for (int i = 0; i < IO_COUNT; ++i) {
int block_num = dis(gen);
off_t offset = block_num * BLOCK_SIZE;
auto read_buf = std::make_shared<std::array<char, BLOCK_SIZE>>();
auto result = std::make_shared<IOResult>();
read_results.push_back(result);
pool.enqueue([fd, offset, read_buf, result] {
ssize_t writen = pread(fd, read_buf->data(), BLOCK_SIZE, offset);
{
std::lock_guard<std::mutex> lock(result->mutex);
result->result = writen;
result->fd = fd;
result->offset = offset;
result->completed = true;
result->buffer = std::move(read_buf);
}
result->cv.notify_one();
});
}
// wait for all write operations to complete
bool write_all_success = true;
for (auto &result : write_results) {
std::unique_lock<std::mutex> lock(result->mutex);
result->cv.wait(lock, [&result] { return result->completed; });
if (result->result != BLOCK_SIZE) {
write_all_success = false;
std::cerr << "IO operation failed with return: " << result->result << std::endl;
}
}
// wait for all read operations to complete
bool read_all_success = true;
for (auto &result : read_results) {
std::unique_lock<std::mutex> lock(result->mutex);
result->cv.wait(lock, [&result] { return result->completed; });
if (result->result != BLOCK_SIZE) {
read_all_success = false;
std::cerr << "IO operation failed with return: " << result->result << std::endl;
} else {
// check data integrity
char expected_char = 'A' + (result->offset / BLOCK_SIZE) % 26;
std::vector<char> compare_buffer(BLOCK_SIZE, expected_char);
if (std::memcmp(result->buffer->data(), compare_buffer.data(), BLOCK_SIZE) != 0) {
read_all_success = false;
std::cerr << "Data integrity check failed at offset: " << result->offset
<< std::endl;
}
}
}
auto end_time = std::chrono::high_resolution_clock::now();
if (write_all_success && read_all_success) {
std::cout << "All IO operations completed successfully!" << std::endl;
std::cout << "Total IO operations: " << IO_COUNT * 2 << std::endl;
std::chrono::duration<double> elapsed = end_time - start_time;
std::cout << "Elapsed time: " << elapsed.count() << " seconds" << std::endl;
std::cout << "IOPS: " << IO_COUNT * 2 / elapsed.count() << std::endl;
std::cout << "Throughput: "
<< static_cast<double>(IO_COUNT * 2 * BLOCK_SIZE) / elapsed.count() /
(1024 * 1024)
<< " MB/s" << std::endl;
} else {
std::cout << "Some IO operations failed!" << std::endl;
}
}
2.3 初始化并运行
int main(int argc, char *argv[]) {
const std::string test_file = "io_pool_test.bin";
size_t num_io_threads = 4;
if (argc > 1) {
try {
num_io_threads = std::stoul(argv[1]);
if (num_io_threads == 0) {
std::cerr << "Thread count must be greater than 0, using "
"default value 4"
<< std::endl;
num_io_threads = 4;
}
} catch (const std::exception &e) {
std::cerr << "Invalid thread count argument, using default value 4: " << e.what()
<< std::endl;
}
}
std::cout << "Using IO thread pool size: " << num_io_threads << std::endl;
int fd = open(test_file.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644);
if (fd == -1) {
std::cerr << "Failed to open file" << std::endl;
return 1;
}
try {
IOThreadPool pool(num_io_threads);
simulate_user_rpc(pool, fd);
} catch (const std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
close(fd);
return 1;
}
close(fd);
return 0;
}
运行一下看看
➜ snip git:(master) ✗ g++ -Wall -Wextra -g -o 04 ./04_io_thread_pool.cpp
➜ snip git:(master) ✗ ./04 8
Using IO thread pool size: 8
All IO operations completed successfully!
Total IO operations: 1000000
Elapsed time: 89.6841 seconds
IOPS: 11150.3
Throughput: 43.5557 MB/s
3 性能和讨论
笔者设置不同的线程数,在同机器上的 SATA SSD 和 NVMe SSD 上尝试。其中,IO 大小 4KiB,关注 IOPS。
SATA SSD
设备类型 | 线程数 | 总操作数 | 耗时(秒) | IOPS | 吞吐量(MB/s) |
---|---|---|---|---|---|
SATA SSD | 2 | 1,000,000 | 33.7835 | 29,600.2 | 115.626 |
SATA SSD | 8 | 1,000,000 | 25.2971 | 39,530.3 | 154.415 |
SATA SSD | 64 | 1,000,000 | 26.4295 | 37,836.5 | 147.799 |
SATA SSD | 128 | 1,000,000 | 29.108 | 34,354.8 | 134.199 |
SATA SSD | 256 | 1,000,000 | 34.9604 | 28,603.8 | 111.733 |
SATA SSD | 512 | 1,000,000 | 43.883 | 22,787.8 | 89.015 |
NVMe SSD
设备类型 | 线程数 | 总操作数 | 耗时(秒) | IOPS | 吞吐量(MB/s) |
---|---|---|---|---|---|
NVMe SSD | 2 | 1,000,000 | 30.7639 | 32,505.7 | 126.975 |
NVMe SSD | 8 | 1,000,000 | 15.191 | 65,828.4 | 257.142 |
NVMe SSD | 64 | 1,000,000 | 12.6304 | 79,173.9 | 309.273 |
NVMe SSD | 128 | 1,000,000 | 20.4492 | 48,901.6 | 191.022 |
NVMe SSD | 256 | 1,000,000 | 43.5615 | 22,956.1 | 89.672 |
NVMe SSD | 512 | 1,000,000 | 55.7664 | 17,931.9 | 70.047 |
可以看到,我们的线程数从 2 开始增加,性能逐渐提升。但线程数过大之后,反而逐渐下降。相比于单线程顺序读写,使用线程池模式无疑提高了我们系统的 IO 性能!真是令人愉悦。
但开发者要时刻记得我们最初的动机:同步 IO 不能榨干硬件性能,所以用线程池来凑。但随着线程数量增长,上下文切换带来的代价开始逐渐显现,系统的瓶颈反而逐渐转移到了线程切换和线程间同步上面。此时操作系统的 CPU 消耗也会变高
观察到 SATA SSD 在线程池模型下、线程数为 8 时性能较好,初步认为达到了硬件性能预期。我们也有理由怀疑,虽然 NVMe SSD 在线程数为 64 时表现较好,是不是远远没有达到其硬件能力? 更多地掣肘我们的,可能是线程间的切换和同步代价了(考虑线程锁效率?NUMA消耗?大量线程切换导致的 sys cpu 占用等等)。单个 IO 线程只能同时进行一个 IO,而我们又不能无限地增大线程数量。
笔者深深地认为,选择合适 IO 模型,一定要评估我们系统的实际需求和硬件的规模。比如:
- 系统需求:注重高扩展性?高性能?
- 硬件规模:HDD/SSD?单机器 CPU 和 disk 配比?