理解C/C++异步IO编程:IO多路复用技术与Epoll入门
前言
这一篇笔记是很多其他系列的基础部分,也是基于C/C++编程语言和Linux操作系统异步IO实现的一个绕不过去的话题。笔者曾经想将他归类为某一个子话题下,但是发现很多博客项目照样需要使用这个系列作为一个铺垫,索性作为一个单独的博客来聊聊了。
在开始Epoll之前,我们不得不提异步IO和IO多路复用技术,这个跑不掉,不然,我们甚至连为什么epoll
试图被改进的对象poll/select
都没办法理解,笔者认为不理解这个,用不好Epoll的,甚至更不用提叠加上协程封装的相关上层库的理解了。基础不牢,地动山摇啊。
我们遇到了什么问题来搞出来这个了?
单线程程序在执行 I/O(如 read
/recv
)时会阻塞,导致不能同时处理多个文件描述符(socket、pipe、tty 等)。任何一个做过一定规模以上项目的朋友立马就知道问题了——这会导致整个系统的阻塞,如果一个IO操作要阻塞一会,一群IO那的把系统卡死了!而且最大的问题是——这一群IO往往只有零星几个才是真正起作用的——我们在应用层遇到了轮询难题——大量的CPU时间根本没有在解决问题,他们只是在等待IO准备好了才会执行下一步任务!
// 下面整个代码就是我们讨论的场景,当然,代码是伪代码,任何一个做过Socket编程的朋友一眼顶针
void blocking_io() {
int fd1, fd2, fd3;
// ❌ 问题:处理fd1时,fd2、fd3只能干等
recv(fd1, buffer, size, 0); // 阻塞在此!
process_data(fd1);
recv(fd2, buffer, size, 0); // 此时fd2可能早已就绪,甚至可能会阻塞后面的相关事件,吞吐量简直是地狱!
process_data(fd2);
recv(fd3, buffer, size, 0); // fd3继续等待
process_data(fd3);
}
异步 I/O 是增强系统响应性的利器
我相信很多朋友已经迫不及待的念叨这几个解决方案了——多线程/多进程、异步回调、以及 I/O 多路复用。当然,我们一下子把这些解决方案抛给初级的读者,这简直就是在折磨他们的大脑:多线程/多进程、异步回调有相应专门的内容可以学习。这里我们不去展开线程并发或回调编程的细节,而是把注意力集中到I/O 处理机制本身的优化上——也就是通常所说的“异步 I/O”。
对异步有理解的朋友,一下子就能明白什么是异步IO了:我们使用异步的IO处理,就不再需要同步的等待操作系统将任务完成后才会返回函数,而是我们告诉操作系统去做一个IO发起操作,然后我们立马就可以去做我们感兴趣的工作了——比如说密集的CPU计算,或者处理其他IO任务。直到操作系统通知我们事情做完了——在此期间,我们完全可以自由的做其他任何我们感兴趣的工作。这种模式在高并发和低延迟场景下尤其有价值,因为它能显著提高 CPU 与资源的利用率。
当然,标准的异步IO实现分为两种。一类是通过**非阻塞 I/O + 事件循环(多路复用)**来实现的用户态异步——这是很多框架(例如 Node.js、libevent)采用的做法。也是我们马上就要开始聊的一种——不管是poll还是select还是epoll,都是这样的支持。还有另一种,也是最为激进的——将这类操作优化到了内核,由内核或驱动在后台真正并行完成 I/O、并通过完成队列/回调通知用户的内核级异步(例如 Windows 的 IOCP、Linux 的 io_uring)。
关于异步IO的编程任务,libevent有一篇文档说的非常好,您可以自行的先看看:
这里不打算做重复的工作了,您自行参考。
I/O 多路复用(作为异步的一种实现形式)
咱们再说一次——poll/select/Epoll
隶属于IO响应度地狱难题的异步IO解决方案,而且是基于非阻塞(通知型)+ 事件循环(I/O 多路复用)实现的一个基础脚手架。我们遇到的难题是——什么是I/O 多路复用?
很简单,在最上面的最原始的IO代码中,我们离散同步的等待IO完成,这就让我们错过了很多事情,那我们不如这样解决问题——我们不好修改IO本身的同步性,但是我们可以解决IO等待的离散性。IO多路复用因此提出——它的核心思想是:用一个集中式的等待(如 select
/poll
/epoll
)来监视一组描述符,等到其中任意一个“就绪”时再去处理。注意这里的“就绪”意味着操作不会阻塞(比如可以 read
不会立即返回 EAGAIN),但真正的数据搬运仍由应用在收到通知后发起。
嗯?那我想你应该清楚为什么是一个事件循环来处理IO操作了。因为这种机制分明利用事件通知的编程风格更加的自然(观察者)——事件循环等待就绪 → 分发回调或恢复协程 → 应用主动完成 read
/write
。它的优点是实现简单、可移植且在连接数量中等时非常高效。
如果你对上面的文字理解了,你就可以下这两个结论了——
- 多路复用是“就绪通知”(readable/writable)——告诉你“现在可以调用 read/write 了不会立即阻塞”。
- 多路复用本身并不改变内核对 I/O 的执行方式:对于多数网络 I/O,内核仍然在底层做数据收发,只是你被告知何时去做系统调用。
快速的说一下Epoll的前辈Poll和Select
关于Select和Poll,这两个是Epoll的一个前身。但是为了引出来epoll,好像不提不完整。这里用两段话直接带走poll和select,具体的接口在附录中已经补充,您可以快速的查阅后再回来
select
select
通过位图(fd_set
)把一组文件描述符传进内核,内核在返回时把就绪的位重新写回位图;调用者需要传入 nfds
(最大 fd + 1),且受限于固定大小的 FD_SETSIZE
(常见为 1024),每次调用都有用户态/内核态的位图拷贝并且通常需要线性扫描这些位来找出就绪的 fd,因此在描述符较多时可扩展性差。
poll
poll
用一个 struct pollfd
数组替代位图,避免了 FD_SETSIZE
的硬性限制,接口更直观;但每次仍把整个数组从用户态拷贝到内核并由内核线性扫描来检测事件,复杂度仍与监控的 fd 数量成正比(O(n)),在大量空闲连接时开销显著。
好像不太对
你马上注意到select和poll比较令人难堪的点了——select被FD_SETSIZE大小限制,高性能并发场景下显然他甚至连高并发都做不好——监控的IO数量就将我们的代码限制死了;poll则好一些,但是每一次都要拷贝数组,而且伴随成千上万的IO,每一次调用都要拷贝KB级别的数组,太抽象了对不对?
那就对了,不然我们的博客就要改名为——《理解C/C++异步IO编程:Poll/Select入门》了!哪里来epoll参合对不对?
epoll 是什么、为什么要它?
终于到正题了,我们前面铺垫了异步IO和子解决方案IO多路复用,以及poll和select对巨量IO的无力,都让我们转向了今天Linux操作系统下对巨量异步IO处理的最常见解决方案——epoll。他试图解决这样的问题
当连接数从几十、几百扩展到几千或更多时,
select
/poll
的两个主要开销——(1)每次调用都要拷贝/遍历整个描述符集合;(2)用户态不断重建/传输监控结构——会带来大量 CPU 与内存带宽浪费。
我们的新解决方案epoll
是 Linux 提供的高效 I/O 多路复用(I/O multiplexing)接口,用于同时监视大量文件描述符(socket、pipe、eventfd、timerfd 等)的可读/可写/异常事件。也就是说,他跟poll和select仍然是坐一桌的朋友,但是强大的多。
Epoll是如何解决的呢?答案是:
epoll
采用注册/事件通知分离的模型:应用先通过epoll_ctl
向内核注册感兴趣的 fd(内核维护一个兴趣列表),随后调用epoll_wait
只等待“已就绪”的事件集合。内核维护就绪队列并只在有活动时返回实际就绪的 fd(数量与活跃事件 k 成正比),因此避免了对所有 fd 的线性扫描;此外
epoll
支持内核与用户空间通过可选的 mmap/环形缓冲区减少拷贝、并提供边沿触发(edge-triggered)与水平触发(level-triggered)两种语义以便更高效地实现非阻塞 I/O。综合起来,epoll
在大量并发连接下能显著降低 CPU 开销并提高吞吐与可扩展性。其实就是三点——
- 避免每次系统调用都扫一遍所有 fd(避免 O(n) 扫描开销)。
- 只把内核里“就绪”的 fd 返回给用户(Select是管你这关你那的全给你扔回去),减少用户态/内核态的数据拷贝与检查。
- 支持非常多的 FDs(理论上受限于系统资源,而非固定的 FD_SETSIZE)。
扩展:Epoll在做什么?
- 用户通过
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event)
把 fd 注册到 epoll 对象(内核维护一张红黑树 / 结构,当然,可能会变,接口肯定不变)。 - 当某个 fd 的状态变化(可读/可写)时,内核会把该 fd 放到 epoll 的就绪队列中。
epoll_wait(epfd, events, maxevents, timeout)
从这个就绪队列取出就绪 fd,返回给用户。- 注册/取消是一次性的内核操作(不在每次 wait 时都传整个列表),因此避免了大量复制与每次 O(n) 扫描。
epoll API 详解:这是我们关心的内容
我们完全可以这样引入#include <sys/epoll.h>
来引入epoll。我们来列一张表格
函数签名 (Signature) | 描述 (Description) | 备注 (Notes) |
---|---|---|
❌int epoll_create(int size) | 创建一个新的 epoll 实例。 | 旧 API。size 参数现在已无太大意义(历史遗留),不建议使用。 |
✔int epoll_create1(int flags) | 创建一个新的 epoll 实例。 | 推荐使用。支持 flags 参数,例如 EPOLL_CLOEXEC (避免 fork-exec 时文件描述符 FD 泄漏)。 |
✔int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) | 控制(添加、修改或删除)epoll 实例中的文件描述符 fd 。 | op 参数可以是 EPOLL_CTL_ADD (添加), EPOLL_CTL_MOD (修改), 或 EPOLL_CTL_DEL (删除)。 |
✔int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) | 阻塞等待 I/O 事件的发生。 | 阻塞等待事件。将就绪事件写入 events 数组,返回就绪事件的数量,或返回 -1(错误)/0(超时)。对了这里的timeout参数——-1 :阻塞直到事件到来(退化为同步);0 :非阻塞,立即返回;以及>0 :等待指定毫秒数后返回 |
这里涉及的结构体长这样:
struct epoll_event {
uint32_t events; // 监听的事件集合(EPOLLIN, EPOLLOUT, ...)
epoll_data_t data; // 用户数据(union,可放 int fd 或指针)
};
epoll_data_t
是 union,常用 data.fd
或 data.ptr
。
上面的内容就是我们Epoll打交道的核心了。
Epoll支持的两种IO就绪触发方式:ET与LT
Level-Triggered(默认)
- 语义:只要条件仍然成立(例如 socket 还有数据),
epoll_wait
每次都会报告该 fd。 - 易用:与
select/poll
类似,编程简单,不必把 fd 设成非阻塞(尽管推荐非阻塞),适合多数场景。 - 缺点:在大量频繁就绪的场景下可能产生重复通知,但通常这是可以接受的。
while (1) {
n = epoll_wait(...)
for each ev in events:
if (ev.fd is readable)
read_some(); // 读取尽可能多,也可以读一点
}
Edge-Triggered(高性能,但复杂)
- 语义:只有在事件发生“边缘”(从无到有的变化)时触发一次通知。之后如果你没有把数据全部读完/写完,就不会再收到通知,直到下次有新的变化(如更多数据到达)。
- 要点:
- 必须把 fd 设为 非阻塞。否则可能阻塞在读/写上,错过后续通知(或整个线程死锁)。
- 在处理 read/write 时必须循环读写直到
EAGAIN
或EWOULDBLOCK
,确保把数据全部消耗或写尽。
- 优点:在高并发中能显著减少系统调用次数与事件重复处理。
- 缺点:编程复杂度高,容易出 bug(遗漏 EAGAIN 循环会导致连接挂起、CPU 忙循环或永久静默)。
while (1) {
n = epoll_wait(...)
for each ev in events:
if (ev.fd is readable)
while (read(fd, buf, ... ) > 0) { process }
// 直到 EAGAIN
}
从0手撕一个基于Epoll的Linux Socket Echo
到这里,我就不打算重复讨论socket/bind/listen/accept/connect了,这就没趣了。如果你对标准同步的socket编程都不熟悉,还是老实的打打基础比较好。
拉起来一个服务器
我们第一步是起一个服务器开始监听。这里笔者直接给出代码了:
一些辅助的宏和端口:
#define PORT 12345
#define LISTEN_BACKLOG 128
#define BUF_SIZE 4096
// 错误检查宏
#define CHECK(expr, msg) \
if (expr) { \
perror(msg); \
exit(EXIT_FAILURE); \
}
/**
* @brief 设置文件描述符为非阻塞模式
* @param fd 文件描述符
* @return 0 成功,-1 失败
*/
static int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl(F_GETFL)");
return -1;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl(F_SETFL)");
return -1;
}
return 0;
}
int main() {
int listen_fd, epfd;
struct sockaddr_in addr;
// 1. 创建监听套接字
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
CHECK(listen_fd == -1, "socket");
// 设置 SO_REUSEADDR 避免 TIME_WAIT 导致重启失败
int on = 1;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
perror("setsockopt(SO_REUSEADDR)");
}
// 2. 绑定地址
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(PORT);
CHECK(bind(listen_fd, (struct sockaddr*)&addr, sizeof(addr)) == -1, "bind");
// 3. 设置非阻塞
CHECK(set_nonblocking(listen_fd) == -1, "set_nonblocking(listen_fd)");
// 4. 监听
CHECK(listen(listen_fd, LISTEN_BACKLOG) == -1, "listen");
...
到这里,如果代码执行无误,一个标准的服务器就被监听起来了。我们需要注意的是——所有的FD需要被设置为非阻塞的IO,表明我们的IO现在成为了通知型IO。
下一步就是初始化我们的epoll。上面提到,初始化一个epoll的办法是——epoll_create1,同时,按照建议,咱们最好还是在Flags上上一个EPOLL_CLOEXEC
// 5. 创建 epoll 实例
epfd = epoll_create1(EPOLL_CLOEXEC); // 推荐使用 EPOLL_CLOEXEC
CHECK(epfd == -1, "epoll_create1");
下一步,就是将我们所有关心的套接字注册到我们的epoll里。
// 6. 注册监听套接字到 epoll
struct epoll_event ev, events[MAX_EVENTS];
ev.events = EPOLLIN | EPOLLET; // 监听可读(新连接),使用 ET 模式
ev.data.fd = listen_fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
perror("epoll_ctl: listen_fd");
close(epfd);
close(listen_fd);
exit(EXIT_FAILURE);
}
printf("Epoll Echo Server: Listening on port %d\n", PORT);
可以看到,我们注册的事件是——读取(EPOLLIN
)且使用的是边缘触发。ev的两个赋值其实就是在注册我们关心什么事件,关心的是谁身上要发生的事件。EPOLL_CTL_ADD的是一个操作符,epoll.h中允许三个操作——添加,删除和修改。这里就是添加一个新的套接字到咱们的epoll去。
到这里就差不多了,下面就是一个事件循环来处理这些内容——
// 7. 主事件循环
while (1) {
#define MAX_EVENTS 64
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (n == -1) {
if (errno == EINTR) {
continue; // 被信号中断
}
perror("epoll_wait");
break; // 致命错误,退出循环
}
for (int i = 0; i < n; ++i) {
int fd = events[i].data.fd; // 哪个FD被捕捉到了事件?
uint32_t evs = events[i].events; // 被捕捉到了什么事件?
// 下面就是我们的业务代码了
if (fd == listen_fd) { // 是服务器有反应吗?处理新连接
handle_new_connection(epfd, listen_fd);
} else { // 不是,是连接的Peers要有数据IO交互
handle_client_data(epfd, fd, evs);
}
}
}
// 清理资源
printf("Server shutting down...\n");
close(listen_fd);
close(epfd);
return 0;
处理新连接
业务代码进一步体现我们要如何处理IO了。我们一步步来,先看我们如何处理到来的客户端。
/**
* @brief 处理监听套接字的连接事件
* @param epfd epoll 文件描述符
* @param listen_fd 监听套接字
*/
static void handle_new_connection(int epfd, int listen_fd) {
// 循环 accept 以应对 ET 模式下可能同时到达的多个连接
while (1) {
struct sockaddr_in cli_addr;
socklen_t cli_len = sizeof(cli_addr);
int conn_fd = accept(listen_fd, (struct sockaddr*)&cli_addr, &cli_len);
if (conn_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 所有传入连接已处理完毕
break;
} else if (errno == EINTR) {
// 被信号中断,继续尝试
continue;
} else {
perror("accept");
break;
}
}
...
}
}
accept是建立三次握手的调用,同步后,咱们就可以准备读写数据了。conn_fd是-1有时候可能并不是都是错误,这里的IF判断流更多的是判断到底发生了什么。哪一些处理是可以进一步尝试的。
如果我们通过了,就要设置整个Socket为非阻塞。
if (set_nonblocking(conn_fd) == -1) {
close(conn_fd);
continue;
}
下一步就是将我们的FD设置为EPOLL要感兴趣的套接字
struct epoll_event ev;
// 客户端连接使用 EPOLLIN 和 EPOLLET (边缘触发)
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {
perror("epoll_ctl: conn_fd");
close(conn_fd);
} else {
char ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &cli_addr.sin_addr, ip, sizeof(ip));
// 把我们的地址打印出来,实际上是IP + 端口,这里可以做日志记录。
printf("Accepted %s:%d -> fd %d\n", ip, ntohs(cli_addr.sin_port), conn_fd);
}
处理数据IO
处理数据IO要复杂一些。我们第一步是检查一下咱们的EPOLL有没有出错——此时客户端的fd已经被注册了,所以我们不用再次注册。而是检查有没有出问题。
/**
* @brief 处理客户端套接字的可读事件(以及错误/挂断)
* @param epfd epoll 文件描述符
* @param fd 客户端套接字
* @param evs 就绪事件掩码
*/
static void handle_client_data(int epfd, int fd, uint32_t evs) {
// 错误或挂断事件,关闭连接
if (evs & (EPOLLERR | EPOLLHUP)) {
printf("Client fd %d error/hangup. Closing.\n", fd);
close(fd);
return;
}
下面我们依次处理到底是可读还是可写了。先说可读的。
// 可读事件 (ET 模式:必须循环读,直到 EAGAIN/EWOULDBLOCK)
if (evs & EPOLLIN) { // &标志位判断法
char buf[BUF_SIZE];
ssize_t total_read = 0;
int closed = 0;
while (1) {
// 注意,这个IO是非阻塞IO,是不会一下子取出来所有数据的,要我们自己做处理
ssize_t count = read(fd, buf + total_read, sizeof(buf) - total_read);
// 没有读到数据,检查问题
if (count == -1) {
// 不用在读了,没有东西可读
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 没有更多数据,正常退出循环
break;
} else if (errno == EINTR) {
// 被信号中断,继续
continue;
} else {
// 其他读取错误
perror("read");
closed = 1;
break;
}
} else if (count == 0) {
// 对端关闭连接
printf("Client fd %d closed by peer\n", fd);
closed = 1;
break;
}
total_read += count;
// 简单处理:如果缓冲区已满,先处理/回显当前数据,然后继续尝试读
// 生产环境中,此处应是更复杂的逻辑
if (total_read >= BUF_SIZE) {
// 避免 over-read 导致的缓冲区溢出(虽然 read 本身会限制,但避免逻辑复杂化)
break;
}
}
if (closed) {
close(fd);
return;
}
。。。
}
我们现在只是读了数据,还要把他送回去呢!
// --- 数据回显逻辑 ---
if (total_read > 0) {
ssize_t written = 0;
// 循环写,直到写完或遇到 EAGAIN
while (written < total_read) {
ssize_t w = write(fd, buf + written, total_read - written);
if (w == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 发送缓冲区已满:理论上应将未发送数据存入一个写缓冲队列,
// 并注册 EPOLLOUT 事件等待下次写就绪。
// 这里仅作简单提示和 EPOLLOUT 注册。
fprintf(stderr, "Warning: Partial write for fd %d. Data not handled: %zd bytes\n",
fd, total_read - written);
struct epoll_event mod_ev;
mod_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
mod_ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &mod_ev);
break;
} else if (errno == EINTR) {
continue;
} else {
perror("write");
close(fd);
return;
}
} else {
written += w;
}
}
}
注意下面的部分写逻辑了嘛?出现部分写的时候,我们delay到下次再写。为此我们就需要注册EPOLLOUT事件,监听写事件。下面的代码就要进一步处理这个事情。
// 处理 EPOLLOUT 事件:仅在上次写入未完成时才会被关注
if (evs & EPOLLOUT) {
// 在实际应用中,此处应处理并发送写缓冲队列中剩余的数据。
// 完成发送后,需要取消对 EPOLLOUT 的关注,以避免不必要的唤醒。
// 假设数据已全部发送或暂时不发送:移除 EPOLLOUT
struct epoll_event mod_ev;
mod_ev.events = EPOLLIN | EPOLLET; // 重新只关注可读
mod_ev.data.fd = fd;
if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &mod_ev) == -1) {
perror("epoll_ctl(MOD/remove EPOLLOUT)");
}
// 如果数据未发完,则不应移除 EPOLLOUT,直到队列清空
}
}
到这里这个demo就结束了,代码放到了附录三。
附录1:补充一下poll/select
select
:接口与用法
你可以在sys/select.h中看到整个接口
#include <sys/select.h>
int select(int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);
nfds
:需要传入的最大文件描述符值加 1,也就是max_fd + 1
。fd_set
:一组位图(file descriptor set),用来指定和返回哪个 fd 关心或已准备好。timeout
:NULL 表示无限等待;{0,0}
表示立即返回(轮询);非 NULL 表示最多等待指定时间。
补充一下:fd_set 的操作宏
FD_ZERO(fd_set *set)
:清空集合FD_SET(int fd, fd_set *set)
:将 fd 加入集合FD_CLR(int fd, fd_set *set)
:从集合中移除 fdFD_ISSET(int fd, fd_set *set)
:测试 fd 是否在集合中
注意:
fd_set
在实现上通常是一个固定大小的位图,默认FD_SETSIZE
常量通常是 1024(在许多系统上),因此select
不适合处理大量 fd。
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(listen_fd, &readfds);
FD_SET(client_fd, &readfds);
int nfds = max(listen_fd, client_fd) + 1;
struct timeval tv = {5, 0}; // 最多等待 5 秒
int ret = select(nfds, &readfds, NULL, NULL, &tv);
if (ret > 0) {
if (FD_ISSET(listen_fd, &readfds)) {
// 新连接
}
if (FD_ISSET(client_fd, &readfds)) {
// 可读数据
}
} else if (ret == 0) {
// 超时
} else {
// 错误(检查 errno)
}
一些注意事项
nfds
必须为最大 FD + 1:如果给错,select
不会检查更大的 fd。select
会修改传入的 fd_set:调用后,fd_set 会只保留那些已就绪的 fd。如果你需要在循环中重复使用,必须在每次调用前重新填充 fd_set(或备份原始集合)。- 受 FD_SETSIZE 限制:用户指标上限通常是 1024(可宏定义重新编译 libc,但通常不可行)。因此
select
不适合高并发大量连接场景。 timeout
以struct timeval
表示,精度到微秒但受系统时钟及调度影响。- 系统调用返回时可能被信号中断(EINTR):必须在上层逻辑中处理(通常是重试或采用特定中断策略)。
- 效率:每次调用要把 fd_set 从用户态复制到内核态(以及返回),当 fd 数量较多时代价高。并且需要遍历 fd_set 来寻找就绪 fd(O(n))。
poll
:接口与用法
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds
:指向struct pollfd
数组,每个元素描述一个要监测的 fd 以及关心的事件。
struct pollfd
:
struct pollfd {
int fd; // 文件描述符
short events; // 关心的事件(输入)
short revents; // 实际发生的事件(输出)
};
常用事件位:POLLIN
(可读)、POLLOUT
(可写)、POLLERR
、POLLHUP
、POLLNVAL
。
timeout
以毫秒为单位,-1 表示无限等待,0 表示立即返回。
struct pollfd fds[2];
fds[0].fd = listen_fd; fds[0].events = POLLIN;
fds[1].fd = client_fd; fds[1].events = POLLIN | POLLOUT;
int ret = poll(fds, 2, 5000); // 等待最多 5 秒
if (ret > 0) {
if (fds[0].revents & POLLIN) { /* 新连接 */ }
if (fds[1].revents & POLLIN) { /* 可读 */ }
if (fds[1].revents & POLLOUT) { /* 可写 */ }
}
一些关键点
- 没有 fd 上限(不像 select 的 FD_SETSIZE):
poll
使用动态数组,因此能处理更大量的 fd。但会受限于进程的文件描述符上限(ulimit -n
)。 - 同样是 O(n) 扫描:内核仍需遍历传入的
pollfd
数组来检测事件,且每次调用都要从用户态拷贝到内核态并返回结果。 revents
在返回后置位,调用后你必须检查revents
。poll
在不同平台的行为略有差异(例如 event flag 组合在某些角落案例可能微妙不同),但总体是 POSIX 兼容。- 对大量空闲互斥 fd 的效率问题:如果你监控成千上万的 fd,但大部分都空闲,
poll
每次仍然要遍历完整数组。
快速说一下异同点
- 可扩展性:
select
受限于FD_SETSIZE
(通常 1024),poll
没有这种固定限制(受进程最大 fd 限制)。 - 接口简洁性:
poll
的struct pollfd
更直观,避免 fd_set 位操作。 - 内存复制成本:两者都需要内核/用户空间拷贝,但
poll
的数组通常比select
的位图更高效(尤其在 fd 密集且稀疏场景下)。 - 查找就绪 fd:两者都需要线性扫描已注册的 descriptors(O(n))。这使得它们在连接数非常大时效率不佳(这就是 epoll/kqueue 出现的原因)。
- 信号/中断:两者在遇到信号时都会返回
-1
并把errno
设为EINTR
。
附录2:epoll.h全解
Epoll.h还有一些我们完全可以关心的内容,这里也放到这里。
1) epoll_data_t
(typedef union epoll_data { ... } epoll_data_t;
)
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
- 目的:这是
epoll_event.data
的用户态可用的“槽位”(user data). 内核在返回事件时会把这个data
原样返回给用户,用户可以把它用作句柄/指针等。 - 字段含义:
ptr
:常用来保存用户对象指针(例如连接上下文struct conn *
)。写ev.data.ptr = ctx;
,取ctx = ev.data.ptr;
。fd
:直接保存整型文件描述符(方便一些简单例子或向后兼容)。u32
/u64
:整型替代,可以保存整型 id / 索引 / flags。u64
在 64 位平台上可安全存放指针值(把指针先转换为uint64_t
),但更常见的是直接使用ptr
。
- 可移植性注意:
- 在 32-bit 平台
void*
是 32 位;在 64-bit 平台void*
是 64 位。u64
保证有 64 位空间可用(适合跨位宽转换)。如果你把指针转成u64
再回转ptr
,要注意严格别名和转换安全,但在实践中通常直接用data.ptr
更直观。
- 在 32-bit 平台
- 推荐用法:生产代码常用
data.ptr = my_conn_ptr;
,因为它直接存放地址并可避免额外映射表查找。若需要复用fd
,data.fd
也行(但如果经常 close/reuse fd,指针更安全能唯一标识连接对象)。
struct epoll_event
struct epoll_event {
uint32_t events; /* Epoll events (bitmask) */
epoll_data_t data; /* User data */
} __EPOLL_PACKED;
events
:一个位掩码(bitmask),用于指定/返回哪些事件感兴趣 / 哪些事件发生(例如EPOLLIN|EPOLLET
)。data
:上面epoll_data_t
联合,占据可供用户使用的存储位。__EPOLL_PACKED
:- 宏通常展开为类似
__attribute__((packed))
(或内核定义的等价宏),目的是确保结构体的内存布局在不同架构/编译器间 ABI 稳定(避免编译器插入不可预期的填充字节),便于用户态/内核态按固定布局交互。 - 注意:packed 会影响对齐访问的行为(在某些架构上可能增加未对齐访问成本),但对 epoll API 的 ABI 稳定性是必要的。
- 宏通常展开为类似
- 使用场景:
- 在调用
epoll_ctl(..., &ev)
时传入ev.events
(表示要监视的事件)和ev.data
(表示与该 fd/对象关联的上下文)。 - 在
epoll_wait()
返回时,内核把就绪事件填入struct epoll_event events[]
,并把data
原样返回给用户。
- 在调用
struct epoll_params
struct epoll_params {
uint32_t busy_poll_usecs;
uint16_t busy_poll_budget;
uint8_t prefer_busy_poll;
/* pad the struct to a multiple of 64bits */
uint8_t __pad;
};
- 用途:这是给
epoll
上下文设置 busy-poll(忙轮询)相关参数的结构。可以通过 epoll 的 ioctl(EPIOCSPARAMS
/EPIOCGPARAMS
)来设置或读取这些参数,从而调整 epoll 在网络 I/O 场景下的忙轮询行为。 - 字段解释:
busy_poll_usecs
(单位:微秒):当正在 busy-poll 时,内核在该 epoll 上下文会 busy-poll 多久(microseconds)。在这段时间内内核会轮询网络设备以尽快获取数据,而不是立即返回中断。适用于极低延迟但可接受额外 CPU 的场景。该值不能超过INT_MAX
。busy_poll_budget
(uint16_t):每次 poll 尝试最多处理的“报文/包”数量(budget),防止单次 busy-poll 吃掉太多工作。该值通常受NAPI_POLL_WEIGHT
限制prefer_busy_poll
(uint8_t,布尔值):是否偏好使用 busy-poll。如果设置,epoll_wait 在网络等待时会尝试 busy-poll(当设备/驱动/内核支持时),以降低 I/O 延迟(但提高 CPU 占用)。有关在系统 suspend/IRQ 管理和更复杂交互的内核补丁讨论,可见内核跟踪。__pad
:用于把结构填充到 64 位对齐边界
- 风险/注意:
- busy-poll 可以显著降低网络 I/O 的延迟,但会消耗 CPU(尤其是在高并发下)。设置前需性能测试并确认对系统影响。
- 大多数 busy-poll 参数需要特权(CAP_NET_ADMIN)或受内核限制
enum EPOLL_EVENTS
枚举
名称 | 值(hex / dec) | 语义(何时触发 / 含义) | 典型用法 / 注意事项 |
---|---|---|---|
EPOLLIN | 0x001 / 1 | 文件描述符可读。对 socket:有数据可读,或监听 socket 上有新的连接。 | 最常用的读事件。与 EPOLLET 共用时必须循环读直到 EAGAIN 。 |
EPOLLPRI | 0x002 / 2 | 有紧急数据(out-of-band data,如 TCP urgent)或优先事件。 | 很少用到,针对某些协议的 OOB。 |
EPOLLOUT | 0x004 / 4 | 文件描述符可写(写不会阻塞)。 | 用于在写被 EAGAIN 部分写后再次等待可写;常与写缓冲配合。 |
EPOLLERR | 0x008 / 8 | 错误条件(例如连接被重置)。内核总是报告此标志(即使未在 interest list 中请求)。 | 不用显式注册也会返回,收到后应检查并关闭/处理。 |
EPOLLHUP | 0x010 / 16 | Hang up(对端关闭/挂起)。内核总是报告。 | 与 EPOLLIN 一起出现时通常 read() 返回 0(EOF)。 |
EPOLLRDNORM | 0x040 / 64 | 普通数据可读(规范 POSIX 风格)。 | 与 EPOLLIN 类似。 |
EPOLLRDBAND | 0x080 / 128 | 带外/带标记的数据可读(banded data)。 | 很少用。 |
EPOLLWRNORM | 0x100 / 256 | 普通数据可写(与 EPOLLOUT 语义接近)。 | 同 EPOLLOUT 。 |
EPOLLWRBAND | 0x200 / 512 | 带标记数据可写。 | 很少用。 |
EPOLLMSG | 0x400 / 1024 | 消息边界/异步消息相关(不常见)。 | 大多数应用忽略。 |
EPOLLRDHUP | 0x2000 / 8192 | 对端半关闭(TCP:对端关闭写方向)。 | 常用于区分“短连接 EOF”与其它异常;需要显式注册以得到更细粒度的半关闭通知。 |
EPOLLEXCLUSIVE | 1u << 28 (0x10000000) | 独占唤醒:当多个 epoll 实例都对同一 target 注册时(通常为监听 socket),设置 EPOLLEXCLUSIVE 会在事件到达时只唤醒 其中一个 等待者,从而减少“惊群(thundering herd)”。 | 用于多线程/多进程并发 accept 优化(减少无效唤醒)。注意不适用于所有场景(且与某些语义组合要小心)。(Stack Overflow) |
EPOLLWAKEUP | 1u << 29 (0x20000000) | 与设备休眠/唤醒策略交互的标志:若事件会唤醒设备(例如从 suspend),设置该标志可让设备保持唤醒直到事件被处理(用于 autosleep 场景)。 | 用于需要确保系统不会在事件排队后马上回睡的场景(比如移动设备的 autosleep),需配合驱动/平台支持使用。(man7.org) |
EPOLLONESHOT | 1u << 30 (0x40000000) | 单次触发:当事件触发后,内核会自动把该 fd 从就绪队列中“禁用”(等同于不再自动上报),需要应用通过 epoll_ctl(EPOLL_CTL_MOD, ...) 重新启用(re-arm)。 | 适合把同一个 fd 分配给一个工作线程独占处理,防止多个线程同时处理同一 fd。使用时需要手动 MOD 重新注册。 |
EPOLLET | 1u << 31 (0x80000000) | Edge-Triggered(边缘触发):只有在状态发生变化的“边缘”产生通知(例如从不可读变为可读),而非持续就绪时重复通知(level-triggered)。 | 高性能但复杂:必须把 fd 设为非阻塞并在事件到来时循环读/写直到 EAGAIN 。否则可能漏掉后续数据或死锁。(man7.org) |
附录3——Demo代码
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
// --- 宏定义 ---
#define PORT 12345
#define MAX_EVENTS 64
#define LISTEN_BACKLOG 128
#define BUF_SIZE 4096
// 错误检查宏
#define CHECK(expr, msg) \
if (expr) { \
perror(msg); \
exit(EXIT_FAILURE); \
}
// --- 辅助函数 ---
/**
* @brief 设置文件描述符为非阻塞模式
* @param fd 文件描述符
* @return 0 成功,-1 失败
*/
static int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl(F_GETFL)");
return -1;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl(F_SETFL)");
return -1;
}
return 0;
}
/**
* @brief 处理监听套接字的连接事件
* @param epfd epoll 文件描述符
* @param listen_fd 监听套接字
*/
static void handle_new_connection(int epfd, int listen_fd) {
// 循环 accept 以应对 ET 模式下可能同时到达的多个连接
while (1) {
struct sockaddr_in cli_addr;
socklen_t cli_len = sizeof(cli_addr);
int conn_fd = accept(listen_fd, (struct sockaddr*)&cli_addr, &cli_len);
if (conn_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 所有传入连接已处理完毕
break;
} else if (errno == EINTR) {
// 被信号中断,继续尝试
continue;
} else {
perror("accept");
break;
}
}
if (set_nonblocking(conn_fd) == -1) {
close(conn_fd);
continue;
}
struct epoll_event ev;
// 客户端连接使用 EPOLLIN 和 EPOLLET (边缘触发)
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {
perror("epoll_ctl: conn_fd");
close(conn_fd);
} else {
char ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &cli_addr.sin_addr, ip, sizeof(ip));
printf("Accepted %s:%d -> fd %d\n", ip, ntohs(cli_addr.sin_port), conn_fd);
}
}
}
/**
* @brief 处理客户端套接字的可读事件(以及错误/挂断)
* @param epfd epoll 文件描述符
* @param fd 客户端套接字
* @param evs 就绪事件掩码
*/
static void handle_client_data(int epfd, int fd, uint32_t evs) {
// 错误或挂断事件,关闭连接
if (evs & (EPOLLERR | EPOLLHUP)) {
printf("Client fd %d error/hangup. Closing.\n", fd);
close(fd);
return;
}
// 可读事件 (ET 模式:必须循环读,直到 EAGAIN/EWOULDBLOCK)
if (evs & EPOLLIN) {
char buf[BUF_SIZE];
ssize_t total_read = 0;
int closed = 0;
while (1) {
ssize_t count = read(fd, buf + total_read, sizeof(buf) - total_read);
if (count == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 没有更多数据,正常退出循环
break;
} else if (errno == EINTR) {
// 被信号中断,继续
continue;
} else {
// 其他读取错误
perror("read");
closed = 1;
break;
}
} else if (count == 0) {
// 对端关闭连接
printf("Client fd %d closed by peer\n", fd);
closed = 1;
break;
}
total_read += count;
// 简单处理:如果缓冲区已满,先处理/回显当前数据,然后继续尝试读
// 生产环境中,此处应是更复杂的逻辑
if (total_read >= BUF_SIZE) {
break; // 避免 over-read 导致的缓冲区溢出(虽然 read 本身会限制,但避免逻辑复杂化)
}
}
if (closed) {
close(fd);
return;
}
// --- 数据回显逻辑 ---
if (total_read > 0) {
ssize_t written = 0;
// 循环写,直到写完或遇到 EAGAIN
while (written < total_read) {
ssize_t w = write(fd, buf + written, total_read - written);
if (w == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 发送缓冲区已满:理论上应将未发送数据存入一个写缓冲队列,
// 并注册 EPOLLOUT 事件等待下次写就绪。
// 这里仅作简单提示和 EPOLLOUT 注册。
fprintf(stderr, "Warning: Partial write for fd %d. Data not handled: %zd bytes\n",
fd, total_read - written);
struct epoll_event mod_ev;
mod_ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
mod_ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &mod_ev);
break;
} else if (errno == EINTR) {
continue;
} else {
perror("write");
close(fd);
return;
}
} else {
written += w;
}
}
}
}
// 处理 EPOLLOUT 事件:仅在上次写入未完成时才会被关注
if (evs & EPOLLOUT) {
// 在实际应用中,此处应处理并发送写缓冲队列中剩余的数据。
// 完成发送后,需要取消对 EPOLLOUT 的关注,以避免不必要的唤醒。
// 假设数据已全部发送或暂时不发送:移除 EPOLLOUT
struct epoll_event mod_ev;
mod_ev.events = EPOLLIN | EPOLLET; // 重新只关注可读
mod_ev.data.fd = fd;
if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &mod_ev) == -1) {
perror("epoll_ctl(MOD/remove EPOLLOUT)");
}
// 如果数据未发完,则不应移除 EPOLLOUT,直到队列清空
}
}
// --- 主函数 ---
int main() {
int listen_fd, epfd;
struct sockaddr_in addr;
// 1. 创建监听套接字
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
CHECK(listen_fd == -1, "socket");
// 设置 SO_REUSEADDR 避免 TIME_WAIT 导致重启失败
int on = 1;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
perror("setsockopt(SO_REUSEADDR)");
}
// 2. 绑定地址
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(PORT);
CHECK(bind(listen_fd, (struct sockaddr*)&addr, sizeof(addr)) == -1, "bind");
// 3. 设置非阻塞
CHECK(set_nonblocking(listen_fd) == -1, "set_nonblocking(listen_fd)");
// 4. 监听
CHECK(listen(listen_fd, LISTEN_BACKLOG) == -1, "listen");
// 5. 创建 epoll 实例
epfd = epoll_create1(EPOLL_CLOEXEC); // 推荐使用 EPOLL_CLOEXEC
CHECK(epfd == -1, "epoll_create1");
struct epoll_event ev, events[MAX_EVENTS];
// 6. 注册监听套接字到 epoll
ev.events = EPOLLIN | EPOLLET; // 监听可读(新连接),使用 ET 模式
ev.data.fd = listen_fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
perror("epoll_ctl: listen_fd");
close(epfd);
close(listen_fd);
exit(EXIT_FAILURE);
}
printf("Epoll Echo Server: Listening on port %d\n", PORT);
// 7. 主事件循环
while (1) {
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (n == -1) {
if (errno == EINTR) {
continue; // 被信号中断
}
perror("epoll_wait");
break; // 致命错误,退出循环
}
for (int i = 0; i < n; ++i) {
int fd = events[i].data.fd;
uint32_t evs = events[i].events;
if (fd == listen_fd) {
handle_new_connection(epfd, listen_fd);
} else {
handle_client_data(epfd, fd, evs);
}
}
}
// 清理资源
printf("Server shutting down...\n");
close(listen_fd);
close(epfd);
return 0;
}