一个简易聊天室程序


很多服务器要一边处理网络连接一边处理用户输入,比如聊天室程序,这样的就可以用I/O复用来实现,
我们以一个poll实现的聊天室程序来举例说明一下

客户端程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include"head.h"
#define _GNU_SOURCE 1
using namespace std;
#define BUFFER_SIZE 64


/*
很多服务器要一边处理网络连接一边处理用户输入,比如聊天室程序,这样的就可以用I/O复用来实现,
我们以一个poll实现的聊天室程序来举例说明一下:
*/

int main(int argc, char** argv) {
if(argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);

struct sockaddr_in server_address;
bzero(&server_address, sizeof(server_address));
server_address.sin_family = AF_INET;
server_address.sin_port = htons(port);
inet_pton(AF_INET, ip, &server_address.sin_addr);

int sockfd = socket(AF_INET, SOCK_STREAM, 0);
assert(sockfd >= 0);
if(connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) {
printf("connection failed\n");
close(sockfd);
return 1;
}
/*
所谓的文件描述符是一个低级的正整数。
最前面的三个文件描述符(0,1,2)分别与标准输入(stdin),标准输出(stdout)和标准错误(stderr)对应。
因此,函数 scanf() 使用 stdin,而函数 printf() 使用 stdout。
你可以用不同的文件描述符改写默认的设置并重定向进程的 I/O 到不同的文件。
*/
pollfd fds[2];
fds[0].fd = 0; //标准输入的文件描述符
fds[0].events = POLLIN;
fds[0].revents = 0;

fds[1].fd = sockfd;
fds[1].events = POLLIN | POLLRDHUP;
fds[1].revents = 0;
char read_buf[BUFFER_SIZE];
int pipefd[2];
int ret = pipe(pipefd);
assert(ret != -1);

while(1) {
ret = poll(fds, 2, -1);
if(ret < 0) {
printf("poll failure\n");
break;
}

if(fds[1].revents & POLLRDHUP) {
printf("server close the connection\n");
break;
}

else if(fds[1].revents & POLLIN) {
//接收数据
memset(read_buf, 0, sizeof(read_buf));
recv(fds[1].fd, read_buf, BUFFER_SIZE, 0);
printf("%s\n", read_buf);
}

if(fds[0].revents & POLLIN) {
/*使用splice函数将用户输入的数据写到socketfd上(零拷贝)*/
ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
}
}
close(sockfd);
return 0;
}

服务端程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
 
#define _GNU_SOURCE 1
#include"head.h"
using namespace std;

#define USER_LIMIT 5
#define BUFFER_SIZE 64
#define FD_LIMIT 65535


/* ### 程序解析 ###
* 本程序使用client_data来保存客户的数据,包括客户的socket地址,客户发来的信息,以及用于将数据发出的发送缓冲区。
* 在users数组中,保存了每个客户连接的相关信息,并且采用连接socket文件描述符来进行索引。
*
* 本程序使用的I/O复用技术是poll系统调用,所以在poll成功返回时,不得不遍历整个事件集/描述符集(可以采用poll返回值进行优化),
* 其中包括了就绪事件和非就绪事件,所以,在一定程度上降低了服务器效率。
* 并且,本程序没有使用并发手段,所以服务器整个处理过程都是串行工作,造成服务器效率低下。
*
* 另外,在处理方式上也比较低效:
* 当某个客户连接发来数据,服务器接收了此数据之后,服务器就暂时清除其他所有客户连接(即除了发来数据的客户连接)上的可读事件,
* 然后在这些客户连接上设置可写事件。 当再一次调用poll时,服务器将会处理所有的可写事件,处理完毕之后就清除所有的可写事件,
* 并且重置之前被清除的可读事件。 以此循环,直到所有客户下线。
*/


struct client_data {
sockaddr_in address;
char* write_buf;
char buf[BUFFER_SIZE];
};

int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

int main(int argc, char** argv) {
if(argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);

int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port);
inet_pton(AF_INET, ip, &address.sin_addr);

int listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

ret = listen(listenfd, 5);
assert(ret != -1);


/* 用已经建立连接的socket fd来索引相应的客户端数据 */
client_data* users = new client_data[FD_LIMIT];

pollfd fds[USER_LIMIT + 1];
int user_counter = 0;
for(int i = 1; i <= USER_LIMIT; i ++) {
fds[i].fd = -1;
fds[i].events = 0;
}
fds[0].fd = listenfd;
fds[0].events = POLLIN | POLLERR;
fds[0].revents = 0;

while(1) {
ret = poll(fds, user_counter + 1, -1);
if(ret < 0) {
printf("poll failure\n");
break;
}

/* poll机制中,要遍历所有注册事件的文件描述符 */
/* 由于timeout==-1永久阻塞,直至事件发生,所以poll不会返回0 */
for(int i = 0; i < user_counter + 1;i ++) {
if((fds[i].fd == listenfd) && (fds[i].revents & POLLIN)) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);

if(connfd < 0) {
printf("errno is: %d\n", errno);
continue;
}

if(user_counter >= USER_LIMIT) { /* 用户过多,则关闭连接 */
const char* info = "too many users\n";
printf("%s", info);
send(connfd, info, sizeof(info), 0);
close(connfd);
continue;
}

/* 对于新的可接受的连接,同时修改fds和users数组。 */
user_counter ++;
users[connfd].address = client_address;
setnonblocking(connfd);
fds[user_counter].fd = connfd;
fds[user_counter].events = POLLIN | POLLERR | POLLRDHUP;
fds[user_counter].revents = 0;
printf("comes a new user, now have %d users\n", user_counter);
}
else if(fds[i].revents & POLLERR) {
printf("get an error from %d\n", fds[i].fd);
char errors[100];
memset(errors, 0, sizeof(errors));
socklen_t length = sizeof(errors);
if(getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0) {
printf("get socket option failed\n");
}
continue;
}
else if(fds[i].revents &POLLRDHUP) {
users[fds[i].fd] = users[fds[user_counter].fd]; //将最后一个用户数据存储区移动到前面
close(fds[i].fd);
fds[i] = fds[user_counter]; //修改描述符集
user_counter --;
/* 必须让i减1,因为通过以上操作已经将最后一个socket fd替换到当前位置i,此时的i还未检查 */
i--;
printf("a client left\n");
}
else if(fds[i].revents & POLLIN) { //有数据可读
int connfd = fds[i].fd;
memset(users[connfd].buf, '\0', BUFFER_SIZE);
ret = recv(connfd, users[connfd].buf, BUFFER_SIZE, 0);
if(ret < 0) { ///* 如果读操作出错,就关闭连接 */
if(errno != EAGAIN) {
close(connfd);
users[fds[i].fd] = users[fds[user_counter].fd];
fds[i] = fds[user_counter];
user_counter --;
i --;
}
}
else if(ret == 0) {} //收到EOF

else {
/* 如果接收到客户数据,则通知其他socket连接准备写数据 */
for(int j = 1; j <= user_counter; j ++) {
if(fds[j].fd == connfd) continue;
fds[j].events |= ~POLLIN; /* 暂时取消其他客户连接的可读事件 */
fds[j].events |= POLLOUT;
users[fds[j].fd].write_buf = users[connfd].buf; /* 指明要发给其他客户的数据所在发送缓冲区的位置 */
}
}
}
else if(fds[i].revents & POLLOUT) { //有数据可写
int connfd = fds[i].fd;
if(!users[connfd].write_buf) continue;
ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0);
users[connfd].write_buf = NULL;

fds[i].events |= ~POLLOUT;
fds[i].events |= POLLIN;
}
}
}
delete []users;
close(listenfd);
return 0;
}