IO复用


迅速看原理,勇敢看源码

select系统调用

作用:

在一段指定时间内,监听用户感兴趣的文件描述符的可读、可写和异常等事件。

原型:

1
2
#include <sys/select.h>
int select ( int nfds, fd_set* readfds, fde_set* writefds, fd_set* exceptfds, struct timeval* timeout );

函数说明:

  • nfds: 指定被监听的文件描述符的总数,通常为所有文件描述符中的最大值+1
  • readfds、writefds 、exceptfds: 可读、可写和异常等事件对应的文件描述符集合。
  • fd_set结构:仅包含一个整型数组,该数组的每个元素的每一位标记了一个文件描述符。fd_set能容纳的文件描述符数量由FD_SETSIZE指定,这就限制了select能同时处理的文件描述符的总量。

select中的fd_set集合容量的限制为FD_SETSIZE,一般为1024 。修改它,需要重新编译内核。

fd_set相关的位操作:

1
2
3
4
5
6
7
8
9
10
11
12
#include <sys/select.h>
FD_ZERO( fd_set *fdset );
FD_SET( int fd, fd_set *fdset );
FD_CLR( int fd, fd_set *fdset );
int FD_ISSET( int fd, fd_set *fdset );
/*对于fd_set类型通过上面四个宏来操作:
FD_ZERO(fd_set *fdset) 将指定的文件描述符集清空,在对文件描述符集合进行设置前,必须对其进行初始化,如果不清空,由于在系统分配内存空间后,通常并不作清空处理,所以结果是不可知的。
FD_SET(int fd,fd_set *fdset) 用于在文件描述符集合中增加一个新的文件描述符。
FD_CLR(int fd, fd_set *fdset) 用于在文件描述符集合中删除一个文件描述符。
FD_ISSET(int fd,fd_set *fdset) 用于测试指定的文件描述符是否在该集合中。

先调用宏FD_ZERO将指定的fd_set清零,然后调用宏FD_SET将需要测试的fd加入fd_set,接着调用函数select测试fd_set中的所有fd,最后用宏FD_ISSET检查某个fd在函数select调用后,相应位是否仍然为1。*/
  • timeout:设置select的超时时间。这是timeval结构指针,用来告诉内核select等待多久。不过我们不能完全信任select调用返回后的timeout值,比如调用失败时,timeout值是不确定的。timeval结构体如下:
1
2
3
4
5
struct timeval
{
long tv_sec; \\秒数
long tv_usec; \\微秒
}

​ 如果给timeout变量的tv_sec成员和tv_usec成员都传递0,则select将立即返回。如果给timeout传递NULL,则select将一直阻塞,直到某个文件描述符就绪。

返回状态:

  • select成功时返回就绪(可读、可写和异常)文件描述符的总数。
  • 如果在超时时间内没有任何文件描述符就绪,select将返回0。
  • select失败时返回-1并设置errno。
  • 如果select 等待期间,程序接收到信号,则select立即返回-1,并设置errno为EINTR。

文件描述符就绪条件

可读:

  • socket内核接收缓冲区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地对该socket,并且读操作返回的字节数大于0。
  • socket通信的对方关闭连接,此时读操作返回0。
  • 监听socket上有新的连接请求。
  • socekt上有未处理的错误,此时我们可以使用getsockopt来读取和清除该错误。

可写:

  • socket内核发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
  • socket的写操作被关闭。对写操作被关闭的socket执行写操作将出发一个SIGPIPE信号。
  • socket使用非阻塞connect连接成功或者失败之后。
  • socket上有未处理的错误,此时我们可以使用getsockopt来读取和清除该错误。

异常:

  • socket上接收到带外数据。

处理带外数据

socket上接收到普通数据和带外数据都将使select返回,但socket处于不同的就绪状态:前者处于可读状态,后者处于异常状态。

poll系统调用

poll系统调用

作用:和select类型,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。

原型:

1
2
#include <poll.h>
int poll ( struct pollfd* fds, nfds_t nfds, int timeout );

函数说明:

  • fds:一个pollfd结构类型的数组,指定我们所感兴趣的文件描述符上发生的可读,可写和异常事件。

pollfd结构:

1
2
3
4
5
6
struct pollfd
{
int fd; /* 文件描述符 */
short events; /* 注册的事件 */
short revents; /* 实际发生的事件,由内核填充 */
}

其中,fd成员指定文件描述符;events 成员告诉poll监听fd上的哪些事件,它是一系列事件的按位或;revents成员则由内核修改,以通知应用程序fd上实际发生了哪些事件。 poll 支持的事件类型如下:

img

img

  • nfds :指定被监听事件集合fds的大小。其类型nfds_t 的定义如下:
1
typedef unsigned long int nfds_t;
  • timeout :指定poll的超时值,单位是毫秒。当timeout 为-1时,poll调用将永远阻塞,直到某个事件发生;当timeout为0时,poll调用将立即返回。

poll系统调用轮询描述符的数量的限制:一个进程所能打开的最大文件描述符有关。可以通过调整内核参数、ulimit -n命令、setrlimit函数。

  • 一个系统所能打开的文件描述符的最大数也是有限制的,跟内存有关,可以通过 /proc/sys/fs/file-max 调整。
  • 一个进程所能打开的文件描述符最大值,可以通过调整内核参数、ulimit -n命令、setrlimit函数。

epoll

特点:

epoll是linux上特有的IO复用函数,它在实现上和使用上与select有很大的差异。首先epoll使用一组函数完成任务,而不是单个函数,其次, epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集。但需要一个额外的文件描述来唯一标识内核中的这个事件表。

这个事件表的文件描述符由如下函数来创建:

1
2
#include <sys/epoll.h>
int epoll_create ( int size );

size参数现在不起作用,只是给内核一个提示,告诉他事件表需要多大,该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的事件表

操作内核事件表:

1
2
#include <sys/epoll.h>
int epoll_ctl ( int epfd, int op, int fd, struct epoll_event *event );

函数说明:

  • fd:要操作的文件描述符
  • op:指定操作类型
  • epfd:内核事件表
  • event:指定事件,它是epoll_event结构指针类型,

epoll_event定义:

1
2
3
4
5
struct epoll_event
{
__unit32_t events; /* epoll事件 */
epoll_data_t data; /* 用户数据 */
};

结构体说明:

events:描述事件类型,和poll支持的事件类型基本相同(两个额外的事件:EPOLLET和EPOLLONESHOT,高效运作的关键)

data成员:存储用户数据

1
2
3
4
5
6
7
typedef union epoll_data
{
void* ptr; /* 指定与fd相关的用户数据 */
int fd; /* 指定事件所从属的目标文件描述符 */
uint32_t u32;
uint64_t u64;
} epoll_data_t;

op参数指定操作类型:

  • EPOLL_CTL_ADD:往事件表中注册fd上的事件
  • EPOLL_CTL_MOD:修改fd上的注册事件
  • EPOLL_CTL_DEL:删除fd上的注册事件

epoll_wait函数

主要接口

作用:在一段超时时间内,等待一组文件描述符上的事件

原型:

1
2
#include <sys/epoll.h>
int epoll_wait ( int epfd, struct epoll_event* events, int maxevents, int timeout );

函数说明:

返回:成功时返回就绪的文件描述符的个数,失败时返回-1并设置errno

参数:

  • timeout:与poll相同

  • maxevents:指定最多监听多少个事件

  • events:检测到事件,将所有就绪的事件从内核事件表(由第一个参数epfd来指定)中复制到它的第二个参数events指向的数组中。

    这个数组只用于输出检测到的就绪事件。而不像select与poll中的数组既用于传入用户注册的事件,又用于输出内核输出的就绪事件。

    所以效率较高。

LT和ET模式

对文件操作符的操作模式:

  • LT:电平触发,默认的工作模式。当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。
  • ET:边沿触发,高效工作模式。文件描述符注册为EPOLLET事件,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。

区别:ET模式在很大程度上降低了同一个epoll事件被重复出发的参数,因此效率要比LT模式高。

注意:ET 模式要求socket为非阻塞模式,如果是阻塞的,那么读或写操作将会因为没有后续事件而一直处于阻塞状态(饥渴状态)。LT模式可以是阻塞或者非阻塞。

LT与ET模式的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*************************************************************************
> File Name: 9-3.cpp
> Created Time: Sat 03 Feb 2018 10:35:56 PM PST
************************************************************************/

#include"head.h"

using namespace std;

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

//设置文件描述符为非阻塞模式
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

//以两种不同模式将事件注册到epoll中
void addfd(int epollfd, int fd, bool enable_et) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if(enable_et) event.events |= EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

void lt(epoll_event* events, int number, int epollfd, int listenfd) {
char buf[BUFFER_SIZE];
for(int i = 0; i < number; i ++) {
int sockfd = events[i].data.fd;
/*
此处的作用是?
*/
if(sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, false);
}

else if(events[i].events & EPOLLIN) {
printf("event trigger once\n");
memset(buf, 0, sizeof(buf));
int ret = recv(sockfd, buf, BUFFER_SIZE, 0);
if(ret <= 0) {
close(sockfd);
continue;
}
printf("get %d bytes of content: %s\n", ret, buf);
}
else printf("something else happened\n");
}
}

void et(epoll_event* events, int number, int epollfd, int listenfd) {
char buf[BUFFER_SIZE];
for(int i = 0; i < number; i ++) {
int sockfd = events[i].data.fd;
if(sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);

addfd(epollfd, connfd, true);
}
else if(events[i].events & EPOLLIN) {
//这段代码不会被重复触发,所以我们循环读取
printf("event trigger once\n");
while(1) {
memset(buf, 0, sizeof(buf));
int ret = recv(sockfd, buf, BUFFER_SIZE, 0);
if(ret < 0) {
//非阻塞模式的I/O,当下面的条件成立表示数据已经全部取走
if((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
printf("read later\n");
break;
}
close(sockfd);
break;
}
else if(ret == 0) close(sockfd);
else printf("get %d bytes of content: %s\n", ret, buf);
}
}
else printf("something else happened\n");
}
}

int main(int argc, char** argv) {
if(argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}

const char* ip = argv[1];
int port = atoi(argv[2]);

int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port);
inet_pton(AF_INET, ip, &address.sin_addr);

int listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

ret = listen(listenfd, 5);
assert(ret != -1);

epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);

addfd(epollfd, listenfd, true);//设置触发方式

while(1) {
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if(ret < 0) {
printf("epoll failure\n");
break;
}
//lt(events, ret, epollfd, listenfd);
et(events, ret, epollfd, listenfd);
}
close(listenfd);
return 0;
}

测试结果:

ET模式:image-20220210224210698

LT模式:image-20220210224237886

三组I/O复用函数的比较

select和poll都只能工作在相对低效的LT模式,而epoll则可以工作在ET高效模式,并且epoll还支持EPOLLONESHOT事件。该事件能进一步减少可读、可写和异常等事件被触发的次数。

从实现原理上来说,select和poll采用的都是轮询的方式,即每次调用都要扫描整个注册文件描述符集合,并将其中就绪的文件描述符返回给用户程序,因此它们检测就绪事件的算法的时间复杂度是O(n)。epoll则不同,它采用的是回调的方式。内核检测到就绪的文件描述符时,将触发回调函数,回调函数就该文件描述符上对应的事件插入就绪事件队列。内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。因此epoll_wait无须轮询整个文件描述符集合来检测哪些事件已经就绪,其算法时间复杂度是O(1)。

系统调用 select poll epoll
事件集合 用户通过3个参数分别传入感兴趣的可读、可写及异常等事件,内核通过对这些参数的在线修改来反馈其中的就绪事件。这使得用户每次调用select都要重置这3个参数 统一处理所有事件类型,因此只需要一个事件集参数。用户通过pollfd.events传入感兴趣的事件,内核通过修改pollfd.revents反馈其中就绪的事件 内核通过一个事件表直接管理用户感兴趣的所有事件。因此每次调用epoll_wait时,无需反复传入用户感兴趣的事件。epoll_wait系统调用的参数events仅用来反馈就绪的事件
应用程序索引就绪文件描述符的时 间复杂度 O(n) O(n) O(1)
最大支持文件描述符数 一般有最大值限制(FD_SETSIZE 为1024,修改后需重新编译内核) 65535(一个进程所能打开的最大文件描述符数量,ulimit -n或者setrlimit函数 65535(系统能打开的最大文件描述符数量,/proc/sys/fs/file-max
工作模式 LT LT 支持ET高效模式
内核实现和工作效率 采用轮询方式检测就绪事件时间复杂度:O(n) 采用轮询方式检测就绪事件时间复杂度:O(n) 采用回调方式检测就绪事件事件复杂度:O(1)

​ 对于poll函数,内核每次修改的是pollfd结构体的revents成员,而events成员保持不变,因此下次调用poll时应用程序无须重置pollfd类型的事件集参数。由于每次select和poll调用都返回整个用户注册的事件集合(其中包括就绪的和未就绪的的),所以应用程序索引就绪文件描述符的时间复杂度为O(n)。epoll则采用与select和poll完全不同的方式来管理用户注册的事件。它在内核中维护一个事件表,并提供了一个独立的系统调用epoll_ctl来控制往其中添加、删除、修改事件。这样每次epoll_wait调用都直接从该内核事件表中取得用户注册的事件,而无须反复从用户空间读入这些事件。epoll_wait系统调用的events参数仅用来返回就绪的事件,这使得应用程序索引的就绪文件描述符的时间复杂度达到0(1)。

需要说明的是:

​ epoll的效率未必一定比select和poll高。当活动连接比较多的时候,epoll_wait的效率未必比select和poll高,因为此时回调函数被触发的过于频繁。所以,epoll_wait适用于连接数量多,但活动连接较少的情况。

非阻塞式connect

聊天室程序

同时处理TCP与UDP服务

IO多路复用函数机制详谈:


参考文献:

IO多路复用机制详解 - Yeang - 博客园 (cnblogs.com)

epoll 机制详谈:


img

1
2
3
4
5
6
7
8
9
10
struct eventpoll
{
spin_lock_t lock; //对本数据结构的访问
struct mutex mtx; //防止使用时被删除
wait_queue_head_t wq; //sys_epoll_wait() 使用的等待队列
wait_queue_head_t poll_wait; //file->poll()使用的等待队列
struct list_head rdllist; //事件满足条件的链表
struct rb_root rbr; //用于管理所有fd的红黑树
struct epitem *ovflist; //将事件到达的fd进行链接起来发送至用户空间
}
1
2
3
4
5
6
7
8
9
10
11
12
struct epitem
{
struct rb_node rbn; //用于主结构管理的红黑树
struct list_head rdllink; //事件就绪队列
struct epitem *next; //用于主结构体中的链表
struct epoll_filefd ffd; //每个fd生成的一个结构
int nwait;
struct list_head pwqlist; //poll等待队列
struct eventpoll *ep; //该项属于哪个主结构体
struct list_head fllink; //链接fd对应的file链表
struct epoll_event event; //注册的感兴趣的事件,也就是用户空间的epoll_event
}

参考文献:

epoll内核源码详解+自己总结的流程_技术交流_牛客网 (nowcoder.com)

(163条消息) Epoll原理解析_~~ LINUX ~~-CSDN博客_epoll

共享内存技术


阻塞IO,非阻塞IO,IO多路复用,信号驱动 IO,异步IO