在学习本章前,需要对基本的IO操作(socket等)有所了解。
在IO模型上,主要可以分为阻塞IO(BIO)、同步非阻塞(NIO)和异步非阻塞(AIO)三种。
基本定义:
同步和阻塞之间的关系:
以常见的TCP编程为例,其BIO下的通信流程如下图所示:
在上图中:
accept
、 read
、 write
均是阻塞型IO:
accept
等待客户端的连接时,需要为每一个客户端单独创建一个线程,随后会阻塞在 accept
系统调用中,直到有新的客户端连接到达。read
、 write
函数进行读写数据操作时,该线程会阻塞到该系统调用,直到读取/写入请求完成。在基本的BIO模型中,用户空间-内核空间的交互如下图所示:
设想一个场景:
accept
系统调用中等待客户端连接才可以完成此并发需求。但是这么多的线程必然会导致:对上一章节末尾的场景进行分析,其效率问题主要集中在了:
BSD socket最开始是为了BSD系统(类Unix系统)设计的,而类Unix系统的设计理念之一就是"一切皆文件",因此BSD为socket等文件设计了如下的 select
解决方案:
select
函数,该函数可以同时监听多个文件的读缓冲区可读、写缓冲区可写、异常事件。基本示意图如下:
在上图中:
listen
时,将socket设置为非阻塞。此时socket也是文件。accept
。accept
得到的文件设置为非阻塞,并加入listen集合。本函数在Linux、FreeBSD、Windows、Mac OS上均有实现:
int select(int nfds, fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);
int nfds
:需要使用 select
委托内核查询的三个集合中的最大fd号+1。fd_set *readfds
:
fd_set *writefds
:
fd_set *exceptfds
:
struct timeval *timeout
:
其中需要注意的是:
select
默认可以监视的最大fd数量为1024select
默认可以监视的最大fd数量为1024此处仅简单列出参数含义用于大致对比不同的解决方案,具体定义见各操作系统的socket开发笔记。
poll解决方案由System V引入。
poll解决方案与select基本一致,其只是没有了1024的最大文件数的限制,以及传参的形式有所区别。
本函数在Linux、FreeBSD上均有实现:
struct pollfd{
int fd;
short events;
short revents;
};
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd *fds
:为 struct pollfd
构成的数组, struct pollfd
的结构体成员:
fd
:需要监听的fd号events
:需要使用poll等待的事件,可以使用位运算,详见下表。revents
:发生的事件nfds_t nfds
:指定 fds
的数组大小int timeout
:等待的毫秒数,其中:
timeout = -1
:永久等待,直到事件发生timeout = 0
:立即返回timeout > 0
:等待xxmsrevents
域不为0的文件描述符个数short events
和 short revents
可以是:WSAPoll
函数,作用与形式类似。select
中的1024上限,但是当数量过多时仍然会导致运行速度降低。epoll在Linux 2.5.45中引入。
epoll是Linux中独有的一种IO多路复用方案。该方案的使用比上述两个方案更为复杂,但是由于其内部使用的是红黑树的数据结构,其效率相较于 select
和 poll
更为高效。
epoll方案的使用步骤为:
int epoll_create(int size);
创建epoll实例,其中 size
参数表示 epoll
实例可以管理的最大文件描述符数。int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
添加到epoll实例中。int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
函数等待事件。具体实现可见Linux用户态socket笔记。
select |
poll |
epoll |
|
---|---|---|---|
操作方式 | |||
底层数据结构 | 数组 | 链表 | 哈希表 |
IO效率 | |||
最大连接数 | 1024(x86) 或 2048(x64) | 无上限 | 无上限 |
fd拷贝 | 每次调用 select 都要完成fd集合的用户-内核-用户的两次拷贝。 |
每次调用 poll 都要完成fd集合的用户-内核-用户的两次拷贝。 |
调用 epoll_ctl 时拷贝进内核并保存,随后 epoll_wait 时不拷贝。 |
之所以同样是IO多路复用,却要搞出三种方案,是因为 select
和 poll
几乎是由两个Unix团体同时引入。
这些IO多路复用模型也是非阻塞型IO。
IO多路复用解决了线程数过多的问题,允许使用一个线程监听和服务多个IO事件。但是仍然没有解决非阻塞IO的轮询问题。在多数情况下的 select
操作是无效的。
在进行程序设计时,通常会有 "回调函数" 的设计思路。而信号驱动型IO就可以简单的描述为 "使用回调函数进行IO事件处理" 的一种IO模型。
基本的使用流程为:
sigaction
或 signal
注册需要监听的信号及其回调函数。通常推荐使用前者。但是这样的IO设计引入了一些新的问题:
SIGINT
(交互式信号,如 ctrl+c
中断)等常用IO进行注册和使用。在拥有了非阻塞的IO模型后(注意非阻塞IO和NIO的区别),就已经可以实现异步请求了:
int service(){
// 发起数据库读取请求
db_promise = nio_db_read();
// 发起文件下载请求
dl_promise = nio_file_download();
// 轮询等待IO完成
while(!nio_db_read_complete() || !nio_file_download_complete());
// do sth...
return 0;
}
但是上述的异步请求伪代码只是实现了同时请求多个IO操作,并在后续的 while
中进行轮询,浪费了大量的CPU时间。
因此可以引入一个 promise
对象:
Promise
类:class Promise() {
public:
// 非阻塞查询
bool IsReady() const;
// 阻塞等待
void Wait() const;
static void WaitForAll(const std::vector<SimplePromise<T>*>& promises);
// 回调函数, 返回值为参数内回调函数的返回值
std::optional<R> Then(const std::function<R(const T&)>& callback);
}
上述的Promise类可以基于IO多路复用等方式实现。
则原先的代码可以如下编写:
int service(){
// 发起数据库读取请求
Promise db_promise = aio_db_read_async();
// 发起文件下载请求
Promise dl_promise = aio_file_download_async();
// 阻塞等待多个Promise
Promise::WaitForAll({db_promise, dl_promise});
// do sth...
return 0;
}
考虑如下的任务流:
则明显地,此时可以考虑使用嵌套回调函数完成:
int service() {
Promise taskA_Promise =
task_a_step1_async().Then([](){
return task_a_step2_async().Then([]{
return task_a_step3_async().Then([]{
return task_a_step4_async();
})
})
});
Promise taskB_Promise =
task_b_step1_async().Then([](){
return task_b_step2_async().Then([]{
return task_b_step3_async();
})
})
});
Promise taskC_Promise =
task_c_step1_async().Then([](){
return task_c_step2_async();
});
Promise::WaitForAll({taskA_Promise, taskB_Promise, taskC_Promise});
// Do sth.
return 0;
}
但是这样调用会存在 callback hell
问题,为了解决问题从而引入了await/async方案。
该方案可以理解为语言为异步IO模型提供了一种异步编程的简化,其主要特性如下:
Promise
:
async
关键字用于修饰函数,被该关键字修饰的函数会立刻返回一个 Promise
对象。await
用于等待 Promise
对象完成。async
和 await
的代码通常被编译器转换成状态机,即意味着编译器会自动将开发者所写的连贯逻辑编译为使用状态机控制的代码。则上述代码可以转化为:
class Program
{
static async Task Main(string[] args)
{
// 启动任务A和任务B
Task taskA = TaskA();
Task taskB = TaskB();
Task taskC = TaskC();
// 等待任务A和任务B都完成
await Task.WhenAll(taskA, taskB, taskC);
// 执行任务C
await TaskD();
}
static async Task TaskA()
{
await StepA1();
await StepA2();
await StepA3();
await StepA4();
}
static async Task TaskB()
{
await StepB1();
await StepB2();
await StepB3();
}
static async Task TaskC()
{
await StepC1();
await StepC2();
}
}