3.3 Linux io_uring
随着硬件设备性能的发展,Linux 内核一个新的异步 IO 技术逐渐发展起来,io_uring。它总结了对 aio 的抱怨,发展成为一组真正异步、面向任何 IO 类型的异步接口 1。
io_uring 的使用可参考 Lord of the io_uring 系列文章。对于用户库,建议使用更加友好的 liburing
。实际上有些 io_uring 的使用细节和文档,liburing
工程反而更加全面。
1 基本原理与基本接口
io_uring 使用的心智模型较为简洁,开发者可同样地理解为 “提交-执行-收割”。正如其名,分为两个环形队列
- 提交队列(SQ)用于提交请求,完成队列(CQ)用于通知请求完成情况
- 环形缓冲区在内核和用户空间共享
- 用户应用写入单个或者多个提交事件,包含了操作信息、数据指针等,提交到 SQ 尾部
- 用户应用使用系统调用提交请求
- 内核处理完毕后,将完成事件放到 CQ 尾部
- 用户应用从 CQ 头部读取完成事件
2 Ring! Ring! Ring! 环形缓冲区
无锁竞争 在单生产者-单消费者模型中,环形缓冲区是一个常见的高性能设计。在 io_uring 中:
- 提交队列(SQ):应用是生产者(提交IO请求),内核是消费者(消费请求)。
- 完成队列(CQ):内核是生产者(生成完成事件),应用是消费者(处理完成事件)。
使用环形缓冲区来消除锁竞争。通过分离生产者和消费者的指针(如头尾指针),双方无需共享锁,仅需内存屏障(Memory Barrier)保证可见性,大幅降低同步开销。
零拷贝 通过环形缓冲区,移动指针而不是数据复制。
延迟确定性 插入和删除操作在 O1 时间完成。
反压控制 生产者能感知缓冲区深度,在必要时候阻塞/丢弃数据。
3 io_depth
进入到异步 I/O 后,有一个重要的概念 io_depth
需要关注。读者可能已在硬盘性能测试套件 fio
中见过这类参数。借用 fio
手册的定义:
iodepth=int
Number of I/O units to keep in flight against the file
即发出去正在执行的 io 数量。可以理解为生产-消费模型,我们提交一系列 IO 到异步队列中,这个 “深度” 指的就是我们视角提交出去,还未取得结果的 IO 数量。
为什么异步 I/O 才会关注 io_depth?
一般在同步 IO 中,每个线程同时只能提交一个 IO 请求,取得结果前线程被阻塞。因此同步 IO 的 io_depth 只能为 1。同步 IO 我们一般关注 in-flight 线程数量。
实际系统中,io_depth 可以作为观测系统压力的指标。观测到 io_depth 上升,意味着磁盘消费 IO 小于用户生产 IO 的速度,系统开始反压 (back pressure)。必须考虑引入合理的限流措施,并分析磁盘性能是否符合预期。
io_depth 越高越好吗?
在现代 SSD 硬件中,可以并行执行 IO 请求,因此适当的 io_depth (比如 16,64) 有利于充分利用磁盘性能。另外,OS 层面可以对 IO 进行一定合并,提升性能。
深度过大,效果提升逐渐不明显,一方面 OS 可能限制最大深度;磁盘达到性能极限后,也不会因为请求增大而大幅提升。
4 Code Snippet
我们重构上篇文章的线程池 IO 例程,改为使用 io_uring 方式提交 IO 请求、收割 IO 请求。
首先封装一些异步 IO 用的 context 结构体。
struct UringData {
void *user_ctx;
OpType op_type;
};
struct UserContext {
enum class Type { WRITE, READ } type;
int fd = -1;
off_t offset = -1;
std::shared_ptr<std::array<char, IO_BLOCK_SIZE>> buffer;
ssize_t write_result = -1;
ssize_t fsync_result = -1;
ssize_t read_result = -1;
bool write_completed = false;
bool fsync_completed = false;
bool read_completed = false;
UringData write_data{this, OpType::WRITE};
UringData fsync_data{this, OpType::FSYNC};
UringData read_data{this, OpType::READ};
};
UringIO 类,模拟用户的 io 请求,异步 io 请求提交和收割。其中,我们将 write io 和 fsync link 到一起,保证逻辑顺序。
class UringIO {
public:
explicit UringIO(unsigned int depth) : ring_(), depth_(depth) {
int ret = io_uring_queue_init(depth, &ring_, 0);
if (ret < 0) {
throw std::runtime_error("io_uring_queue_init failed: " + std::string(strerror(-ret)));
}
}
~UringIO() { io_uring_queue_exit(&ring_); }
// simulate user io
void simulate_user_rpc(int fd) {
std::vector<std::unique_ptr<UserContext>> write_ctxs;
std::vector<std::unique_ptr<UserContext>> read_ctxs;
unsigned int inflight = 0;
int write_count = 0;
int read_count = 0;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, IO_COUNT - 1);
auto start_time = std::chrono::high_resolution_clock::now();
// write: seq write + FSYNC
while (write_count < IO_COUNT || inflight > 0) {
// submit write
while (write_count < IO_COUNT && inflight <= depth_ - 2) {
auto ctx = std::make_unique<UserContext>();
ctx->type = UserContext::Type::WRITE;
ctx->fd = fd;
ctx->offset = write_count * IO_BLOCK_SIZE;
ctx->buffer = std::make_shared<std::array<char, IO_BLOCK_SIZE>>();
char content_char = 'A' + (write_count % 26);
memset(ctx->buffer->data(), content_char, IO_BLOCK_SIZE);
// get SQE for write
struct io_uring_sqe *write_sqe = io_uring_get_sqe(&ring_);
if (!write_sqe)
break;
io_uring_prep_write(write_sqe, fd, ctx->buffer->data(), IO_BLOCK_SIZE, ctx->offset);
io_uring_sqe_set_data(write_sqe, &ctx->write_data);
// get SQE for fsync
struct io_uring_sqe *fsync_sqe = io_uring_get_sqe(&ring_);
if (!fsync_sqe)
break;
io_uring_prep_fsync(fsync_sqe, fd, IORING_FSYNC_DATASYNC);
io_uring_sqe_set_data(fsync_sqe, &ctx->fsync_data);
fsync_sqe->flags |= IOSQE_IO_LINK; // link write+fsync
// submit
int ret = io_uring_submit(&ring_);
if (ret < 0) {
std::cerr << "io_uring_submit failed: " << strerror(-ret) << std::endl;
break;
}
inflight += 2;
write_ctxs.push_back(std::move(ctx));
write_count++;
}
// fetch competition
if (inflight > 0) {
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(&ring_, &cqe);
if (ret < 0) {
std::cerr << "io_uring_wait_cqe failed: " << strerror(-ret) << std::endl;
break;
}
UringData *data = static_cast<UringData *>(io_uring_cqe_get_data(cqe));
UserContext *ctx = static_cast<UserContext *>(data->user_ctx);
switch (data->op_type) {
case OpType::WRITE:
ctx->write_result = cqe->res;
ctx->write_completed = true;
break;
case OpType::FSYNC:
ctx->fsync_result = cqe->res;
ctx->fsync_completed = true;
break;
case OpType::READ:
ctx->read_result = cqe->res;
ctx->read_completed = true;
break;
}
io_uring_cqe_seen(&ring_, cqe);
inflight--;
}
}
// read: rand write
while (read_count < IO_COUNT || inflight > 0) {
// submit read
while (read_count < IO_COUNT && inflight < depth_) {
int block_num = dis(gen);
off_t offset = block_num * IO_BLOCK_SIZE;
auto ctx = std::make_unique<UserContext>();
ctx->type = UserContext::Type::READ;
ctx->fd = fd;
ctx->offset = offset;
ctx->buffer = std::make_shared<std::array<char, IO_BLOCK_SIZE>>();
struct io_uring_sqe *read_sqe = io_uring_get_sqe(&ring_);
if (!read_sqe)
break;
io_uring_prep_read(read_sqe, fd, ctx->buffer->data(), IO_BLOCK_SIZE, offset);
io_uring_sqe_set_data(read_sqe, &ctx->read_data);
int ret = io_uring_submit(&ring_);
if (ret < 0) {
std::cerr << "io_uring_submit failed: " << strerror(-ret) << std::endl;
break;
}
inflight++;
read_ctxs.push_back(std::move(ctx));
read_count++;
}
// fetch compelation
if (inflight > 0) {
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(&ring_, &cqe);
if (ret < 0) {
std::cerr << "io_uring_wait_cqe failed: " << strerror(-ret) << std::endl;
break;
}
UringData *data = static_cast<UringData *>(io_uring_cqe_get_data(cqe));
UserContext *ctx = static_cast<UserContext *>(data->user_ctx);
if (data->op_type == OpType::READ) {
ctx->read_result = cqe->res;
ctx->read_completed = true;
}
io_uring_cqe_seen(&ring_, cqe);
inflight--;
}
}
auto end_time = std::chrono::high_resolution_clock::now();
// verify data
bool write_all_success = true;
for (auto &ctx : write_ctxs) {
if (ctx->write_result != static_cast<ssize_t>(IO_BLOCK_SIZE)) {
write_all_success = false;
std::cerr << "Write failed at offset " << ctx->offset << ": expected "
<< IO_BLOCK_SIZE << ", got " << ctx->write_result << std::endl;
}
if (ctx->fsync_result != 0) {
write_all_success = false;
std::cerr << "Fsync failed at offset " << ctx->offset << ": "
<< strerror(-ctx->fsync_result) << std::endl;
}
}
bool read_all_success = true;
for (auto &ctx : read_ctxs) {
if (ctx->read_result != static_cast<ssize_t>(IO_BLOCK_SIZE)) {
read_all_success = false;
std::cerr << "Read failed at offset " << ctx->offset << ": expected "
<< IO_BLOCK_SIZE << ", got " << ctx->read_result << std::endl;
} else {
int block_num = ctx->offset / IO_BLOCK_SIZE;
char expected_char = 'A' + (block_num % 26);
for (size_t i = 0; i < IO_BLOCK_SIZE; ++i) {
if ((*ctx->buffer)[i] != expected_char) {
read_all_success = false;
std::cerr << "Data corruption at offset " << ctx->offset + i
<< ": expected " << expected_char << ", got " << (*ctx->buffer)[i]
<< std::endl;
break;
}
}
}
}
// statics
if (write_all_success && read_all_success) {
std::cout << "All IO operations completed successfully!" << std::endl;
std::cout << "Total IO operations: " << IO_COUNT * 2 << std::endl;
std::chrono::duration<double> elapsed = end_time - start_time;
std::cout << "Elapsed time: " << elapsed.count() << " seconds" << std::endl;
std::cout << "IOPS: " << static_cast<int>(IO_COUNT * 2 / elapsed.count()) << std::endl;
double throughput = (IO_COUNT * 2 * IO_BLOCK_SIZE) / (elapsed.count() * 1024 * 1024);
std::cout << "Throughput: " << throughput << " MB/s" << std::endl;
} else {
std::cout << "IO operations completed with errors" << std::endl;
}
}
private:
struct io_uring ring_;
unsigned int depth_;
};
main 入口
int main(int argc, char *argv[]) {
const std::string test_file = "io_uring_test.bin";
unsigned int io_depth = 32;
if (argc > 1) {
try {
io_depth = std::stoul(argv[1]);
if (io_depth < 2) {
std::cerr << "IO depth must be at least 2, using default 32" << std::endl;
io_depth = 32;
}
} catch (const std::exception &e) {
std::cerr << "Invalid IO depth argument, using default 32: " << e.what() << std::endl;
}
}
std::cout << "Using IO depth: " << io_depth << std::endl;
int fd = open(test_file.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644);
if (fd == -1) {
std::cerr << "Failed to open file: " << strerror(errno) << std::endl;
return 1;
}
try {
UringIO uring_io(io_depth);
uring_io.simulate_user_rpc(fd);
} catch (const std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
close(fd);
return 1;
}
close(fd);
return 0;
}
5 运行结果与讨论
在上述程序中,我们故意设计了 4K 的顺序写 IO 模式。设置不同的深度,在一块 SATA SSD 上压测,结果如下。
队列深度 | 总操作数 | 耗时(秒) | IOPS | 吞吐量(MB/s) |
---|---|---|---|---|
2 | 1,000,000 | 159.223 | 6,280 | 24.5332 |
4 | 1,000,000 | 81.0652 | 12,335 | 48.1865 |
8 | 1,000,000 | 71.1472 | 14,055 | 54.9038 |
16 | 1,000,000 | 67.4389 | 14,828 | 57.9228 |
32 | 1,000,000 | 63.9078 | 15,647 | 61.1233 |
64 | 1,000,000 | 53.269 | 18,772 | 73.3307 |
128 | 1,000,000 | 41.6389 | 24,016 | 93.8125 |
256 | 1,000,000 | 34.4474 | 29,029 | 113.397 |
512 | 1,000,000 | 33.1311 | 30,183 | 117.903 |
1024 | 1,000,000 | 30.0471 | 33,281 | 130.004 |
随着队列深度增加,IOPS持续提升(2→1024:6,280→33,281)。在128深度后提升幅度减小,512→1024 仅提升约 7%(图中 x 轴为 log 缩放)。我们的 4K 小 IO 顺序写,队列深度越大,系统的排队整流效应越好。
注意,这个例子只是为了简便,使用单线程操作 io,这不是必须的。设计者需要根据自己的需要合理安排线程模型(比如考虑使用的线程框架、rpc 框架,考虑 rpc+io 的 cpu 本地性等)。