2.4 线程池模式 (Thread Pool)

2.4 线程池模式 (Thread Pool)

1 动机

在上文中,我们已经意识到了 Sync IO 调用会阻塞我们的线程。这期间对 CPU 反而是闲置,这就是我们常说的 I/O 密集任务。(另一个反面是计算密集型任务,会持续占用 CPU 进行有效计算。)

这种线程闲置也会导致同时提交到存储硬件的 IO 操作变少,很难榨干存储硬件能力,尤其是 SSD 介质。

因此,一种自然而然的模式就应运而生了。既然阻塞一个线程,那我们就用一组线程,提高整体的并发 —— 将所有同步 IO 交由一组线程执行

接下来我们实现一个示例程序作为演示,包含 1 组 IO 线程,还有额外 1 个线程模拟用户的 rpc 读写逻辑。

图: 示例程序的线程分工

2 Code Snippet

2.1 简单的线程池实现

我们新建一个 IOThreadPool 类,构造时使用 std::thread 创建线程。RPC 模拟线程可以通过 enqueue 提交任务函数进入队列。工作线程从队列里取出任务并执行。

class IOThreadPool {
  public:
    explicit IOThreadPool(size_t num_threads) : stop(false) {
        for (size_t i = 0; i < num_threads; ++i) {

            // create worker thread
            workers.emplace_back([this] {
                // worker loop
                while (true) {
                    std::function<void()> task;

                    // fetch task
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                                             [this] { return this->stop || !this->tasks.empty(); });

                        if (this->stop && this->tasks.empty())
                            return;

                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    // do task
                    task();
                }
            });
        }
    }

    void enqueue(std::function<void()> task) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks.push(task);
        }
        condition.notify_one();
    }

    ~IOThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread &worker : workers) {
            worker.join();
        }
    }

  private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    std::atomic<bool> stop;
};

2.2 模拟用户 rpc 线程

创建一个线程来模拟用户读写的 rpc 执行。这里我们处理内存,提交 IO 任务,等待所有 IO 执行完成,最终进行数据验证。

这里我直接提交了 lambda 函数。由 IO Thread Pool 工作进程执行。lambda 函数内的 IO 操作完成后,设置同步原语标记这笔 IO 完成。

注意:我在写操作使用了 fsync 保证数据安全性。这将导致 IO 阻塞时间变长,以突出 IO 线程池的效果。


void simulate_user_rpc(IOThreadPool &pool, int fd) {

    std::vector<std::shared_ptr<IOResult>> write_results;
    auto start_time = std::chrono::high_resolution_clock::now();

    // sequential write
    for (int i = 0; i < IO_COUNT; ++i) {
        auto write_buf = std::make_shared<std::array<char, BLOCK_SIZE>>();
        char content_char = 'A' + (i % 26);
        memset(write_buf->data(), content_char, BLOCK_SIZE);

        off_t offset = i * BLOCK_SIZE;
        auto result = std::make_shared<IOResult>();
        write_results.push_back(result);

        pool.enqueue([fd, offset, write_buf, result] {
            ssize_t ret = pwrite(fd, write_buf->data(), BLOCK_SIZE, offset);
            auto completed = true;
            if (fsync(fd) == -1) {
                std::cerr << "fsync failed" << std::endl;
                completed = false;
            }

            {
                std::lock_guard<std::mutex> lock(result->mutex);
                result->fd = fd;
                result->offset = offset;
                result->result = ret;
                result->completed = completed;
                result->buffer = std::move(write_buf);
            }
            result->cv.notify_one();
        });
    }

    // random read
    std::vector<std::shared_ptr<IOResult>> read_results;

    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(0, IO_COUNT - 1);

    for (int i = 0; i < IO_COUNT; ++i) {
        int block_num = dis(gen);
        off_t offset = block_num * BLOCK_SIZE;
        auto read_buf = std::make_shared<std::array<char, BLOCK_SIZE>>();
        auto result = std::make_shared<IOResult>();
        read_results.push_back(result);

        pool.enqueue([fd, offset, read_buf, result] {
            ssize_t writen = pread(fd, read_buf->data(), BLOCK_SIZE, offset);

            {
                std::lock_guard<std::mutex> lock(result->mutex);
                result->result = writen;
                result->fd = fd;
                result->offset = offset;
                result->completed = true;
                result->buffer = std::move(read_buf);
            }

            result->cv.notify_one();
        });
    }

    // wait for all write operations to complete
    bool write_all_success = true;
    for (auto &result : write_results) {
        std::unique_lock<std::mutex> lock(result->mutex);
        result->cv.wait(lock, [&result] { return result->completed; });

        if (result->result != BLOCK_SIZE) {
            write_all_success = false;
            std::cerr << "IO operation failed with return: " << result->result << std::endl;
        }
    }

    // wait for all read operations to complete
    bool read_all_success = true;
    for (auto &result : read_results) {
        std::unique_lock<std::mutex> lock(result->mutex);
        result->cv.wait(lock, [&result] { return result->completed; });

        if (result->result != BLOCK_SIZE) {
            read_all_success = false;
            std::cerr << "IO operation failed with return: " << result->result << std::endl;
        } else {
            // check data integrity
            char expected_char = 'A' + (result->offset / BLOCK_SIZE) % 26;
            std::vector<char> compare_buffer(BLOCK_SIZE, expected_char);
            if (std::memcmp(result->buffer->data(), compare_buffer.data(), BLOCK_SIZE) != 0) {
                read_all_success = false;
                std::cerr << "Data integrity check failed at offset: " << result->offset
                          << std::endl;
            }
        }
    }

    auto end_time = std::chrono::high_resolution_clock::now();

    if (write_all_success && read_all_success) {
        std::cout << "All IO operations completed successfully!" << std::endl;
        std::cout << "Total IO operations: " << IO_COUNT * 2 << std::endl;
        std::chrono::duration<double> elapsed = end_time - start_time;
        std::cout << "Elapsed time: " << elapsed.count() << " seconds" << std::endl;
        std::cout << "IOPS: " << IO_COUNT * 2 / elapsed.count() << std::endl;
        std::cout << "Throughput: "
                  << static_cast<double>(IO_COUNT * 2 * BLOCK_SIZE) / elapsed.count() /
                         (1024 * 1024)
                  << " MB/s" << std::endl;
    } else {
        std::cout << "Some IO operations failed!" << std::endl;
    }
}

2.3 初始化并运行

int main(int argc, char *argv[]) {
    const std::string test_file = "io_pool_test.bin";
    size_t num_io_threads = 4;

    if (argc > 1) {
        try {
            num_io_threads = std::stoul(argv[1]);
            if (num_io_threads == 0) {
                std::cerr << "Thread count must be greater than 0, using "
                             "default value 4"
                          << std::endl;
                num_io_threads = 4;
            }
        } catch (const std::exception &e) {
            std::cerr << "Invalid thread count argument, using default value 4: " << e.what()
                      << std::endl;
        }
    }

    std::cout << "Using IO thread pool size: " << num_io_threads << std::endl;

    int fd = open(test_file.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644);
    if (fd == -1) {
        std::cerr << "Failed to open file" << std::endl;
        return 1;
    }

    try {
        IOThreadPool pool(num_io_threads);
        simulate_user_rpc(pool, fd);

    } catch (const std::exception &e) {
        std::cerr << "Error: " << e.what() << std::endl;
        close(fd);
        return 1;
    }

    close(fd);
    return 0;
}

运行一下看看

➜  snip git:(master) ✗ g++ -Wall -Wextra -g -o 04 ./04_io_thread_pool.cpp
➜  snip git:(master) ✗ ./04 8                                            
Using IO thread pool size: 8
All IO operations completed successfully!
Total IO operations: 1000000
Elapsed time: 89.6841 seconds
IOPS: 11150.3
Throughput: 43.5557 MB/s

3 性能和讨论

笔者设置不同的线程数,在同机器上的 SATA SSD 和 NVMe SSD 上尝试。其中,IO 大小 4KiB,关注 IOPS。

SATA SSD

设备类型 线程数 总操作数 耗时(秒) IOPS 吞吐量(MB/s)
SATA SSD 2 1,000,000 33.7835 29,600.2 115.626
SATA SSD 8 1,000,000 25.2971 39,530.3 154.415
SATA SSD 64 1,000,000 26.4295 37,836.5 147.799
SATA SSD 128 1,000,000 29.108 34,354.8 134.199
SATA SSD 256 1,000,000 34.9604 28,603.8 111.733
SATA SSD 512 1,000,000 43.883 22,787.8 89.015

NVMe SSD

设备类型 线程数 总操作数 耗时(秒) IOPS 吞吐量(MB/s)
NVMe SSD 2 1,000,000 30.7639 32,505.7 126.975
NVMe SSD 8 1,000,000 15.191 65,828.4 257.142
NVMe SSD 64 1,000,000 12.6304 79,173.9 309.273
NVMe SSD 128 1,000,000 20.4492 48,901.6 191.022
NVMe SSD 256 1,000,000 43.5615 22,956.1 89.672
NVMe SSD 512 1,000,000 55.7664 17,931.9 70.047

可以看到,我们的线程数从 2 开始增加,性能逐渐提升。但线程数过大之后,反而逐渐下降。相比于单线程顺序读写,使用线程池模式无疑提高了我们系统的 IO 性能!真是令人愉悦。

但开发者要时刻记得我们最初的动机:同步 IO 不能榨干硬件性能,所以用线程池来凑。但随着线程数量增长,上下文切换带来的代价开始逐渐显现,系统的瓶颈反而逐渐转移到了线程切换和线程间同步上面。此时操作系统的 CPU 消耗也会变高

观察到 SATA SSD 在线程池模型下、线程数为 8 时性能较好,初步认为达到了硬件性能预期。我们也有理由怀疑,虽然 NVMe SSD 在线程数为 64 时表现较好,是不是远远没有达到其硬件能力? 更多地掣肘我们的,可能是线程间的切换和同步代价了(考虑线程锁效率?NUMA消耗?大量线程切换导致的 sys cpu 占用等等)。单个 IO 线程只能同时进行一个 IO,而我们又不能无限地增大线程数量。

笔者深深地认为,选择合适 IO 模型,一定要评估我们系统的实际需求和硬件的规模。比如:

  • 系统需求:注重高扩展性?高性能?
  • 硬件规模:HDD/SSD?单机器 CPU 和 disk 配比?