[转帖]深入了解epoll模型 -- 开卷有益

深入,了解,epoll,模型,开卷有益 · 浏览次数 : 0

小编点评

# Generate Content Function ```c void generate_content(void) { // ... Your code to generate content here ... // ... Important part, handle events and signals ... // ... Get event ready list, from eventpoll_callback function ... // ... Process and handle events from ready list ... // ... Signal completion, set flag for wait queue ... // ... Wait for completion, release lock and queue ... // ... ... ... ... // ... Call ep_poll_safewake to process events ... } ``` # Explanation * This function performs the essential tasks of generating content. * It gets the event ready list, from the `eventpoll_callback` function. * It processes and handles events from the ready list. * It signals completion and waits for it to finish. * It waits for completion, releases the lock and queue. * It calls `ep_poll_safewake` to process events. # Important Notes * This function requires `waitqueue_active` and `waitqueue_signal` functions. * It also requires `eventpoll_callback` function to provide event data. * The specific event handling and signal processing depends on your code.

正文

https://cloud.tencent.com/developer/article/1992927?areaSource=&traceId=

 

希望打开此篇对你有所帮助。

文章目录

上网一搜epoll,基本是这样的结果出来:《多路转接I/O – epoll模型》,万变不离这个标题。 但是呢,不变的事物,我们就更应该抓出其中的重点了。 多路、转接、I/O、模型。 别急,先记住这几个词,我比较喜欢你们看我文章的时候带着问题。

什么是epoll?或者说,它和select有什么判别?

什么是select

有的朋友可能对select也不是很了解啊,我这里稍微科普一下:网络连接,服务器也是通过文件描述符来管理这些连接上来的客户端,既然是供连接的服务器,那就免不了要接收来自客户端的消息。那么多台客户端,消息那么的多,要是漏了一条两条重要消息,那也不要用TCP了,那怎么办?

前辈们就是有办法,轮询,轮询每个客户端文件描述符,查看他们是否带着消息,如果带着,那就处理一下;如果没带着,那就一边等着去。这就是select,轮询,颇有点领导下基层的那种感觉哈。

但是这个select的轮询呐,会有个问题,明眼人一下就能想到,那即是耗费资源啊,耗费什么资源,时间呐,慢呐(其实也挺快了,不过相对epoll来说就是慢)。 再认真想一下,还浪费什么资源,系统资源。有的客户端呐,占着那啥玩意儿不干那啥事儿,这种客户端呐,还不少。这也怪不得人家,哪儿有客户端时时刻刻在发消息,要是有,那就要小心是不是恶意攻击了。那把这么一堆偶尔动一下的客户端的文件描述符一直攥手里,累不累?能一次攥多少个?就像一个老板,一直想着下去巡视,那他可以去当车间组长了哈哈哈。

所以,select的默认上限一般是1024(FD_SETSIZE),当然我们可以手动去改,但是人家给个1024自然有人家的道理,改太大的话系统在这一块的负载就大了。 那句话怎么说的来着,你每次对系统的索取,其实都早已明码标价!哈哈哈。。。

所以,我们选用epoll模型。

讲到这里啊,如果对 select 不了解的朋友可以打开百度搜一下了,因为我们接下来要讲的内容需要你对 select 有点了解的。

为什么select最大只允许1024?

以前从没想过select和poll,一出来关注的就是epoll,后来明白了,对于select,也是有很多细节在里面的。源码之前,了无秘密!!!

#define FD_SETSIZE 1024
#define NFDBITS (8 * sizeof(unsigned long))
#define __FDSET_LONGS (FD_SETSIZE/NFDBITS)

typedef struct {
  unsigned long fds_bits[__FDSET_LONGS];
} fd_set;

为什么源码要写成1024? 我觉得是历史遗留原因吧,不信你看一下你服务器在不改动的情况下能允许的最大文件描述符数。

如果有人再问:那 select 就只能监听1024个?那我就要说道说道了,谁说有1024?select 性能就这样了,再往上开意义不大,真要开,编源码去嘛。

什么是epoll

epoll接口是为解决Linux内核处理大量文件描述符而提出的方案。该接口属于Linux下多路I/O复用接口中select/poll的增强。其经常应用于Linux下高并发服务型程序,特别是在大量并发连接中只有少部分连接处于活跃下的情况 (通常是这种情况),在该情况下能显著的提高程序的CPU利用率。

前面说,select就像亲自下基层视察的老板,那么epoll这个老板就要显得精明的多了。他可不亲自下基层,他找了个美女秘书,他只要盯着他的秘书看就行了,呸,他只需要听取他的秘书的汇报就行了。汇报啥呢?基层有任何消息,跟秘书说,秘书汇总之后一次性交给老板来处理。这样老板的时间不就大大的提高了嘛。

epoll设计思路简介

  • (1)epoll在Linux内核中构建了一个文件系统,该文件系统采用红黑树来构建,红黑树在增加和删除上面的效率极高,因此是epoll高效的原因之一。有兴趣可以百度红黑树了解,但在这里你只需知道其算法效率超高即可。
  • (2)epoll提供了两种触发模式,水平触发(LT)和边沿触发(ET)。当然,涉及到I/O操作也必然会有阻塞和非阻塞两种方案。目前效率相对较高的是 epoll+ET+非阻塞I/O 模型,在具体情况下应该合理选用当前情形中最优的搭配方案。
  • (3)epoll所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于1024,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以下面语句查看,一般来说这个数目和系统内存关系很大。
系统最大打开文件描述符数
cat /proc/sys/fs/file-max
进程最大打开文件描述符数
ulimit -n

修改这个配置:

sudo vi /etc/security/limits.conf
写入以下配置,soft软限制,hard硬限制

*                soft    nofile          65536
*                hard    nofile          100000

select、epoll原理图

(图片来源:https://blog.csdn.net/nanxiaotao/article/details/90612404):

缺点:

1)单进程可以打开fd有限制;

(2)对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低;

(3)用户空间和内核空间的复制非常消耗资源;

poll与select不同的地方:采用链表的方式替换原有fd_set数据结构,而使其没有连接数的限制。

(一图胜千言)

epoll支持的最大链接数是进程最大可打开的文件的数目。

epoll一定优于select吗?什么时候不是?

尽管如此,epoll 的性能并不必然比 select 高,对于 fd 数量较少并且 fd IO 都非常繁忙的情况 select 在性能上反而有优势。

听说把 epoll 和mmap扯上关系的都是骗子?

呃呃呃,其实源码翻一番也没看到mmap嘛,找不到源码的话这里:https://download.csdn.net/download/xumin330774233/10396875

其实我一开始也挺纳闷儿的,这是有多少个fd啊,要映射?

epoll的工作流程?

epol_create

在epoll文件系统建立了个file节点,并开辟epoll自己的内核高速cache区,建立红黑树,建立一个双向链表,用于存储准备就绪的事件。

epoll_create传入一个size参数,size参数只要>0即可,没有任何意义。epoll_create调用函数sys_epoll_create1实现eventpoll的初始化。sys_epoll_create1通过ep_alloc生成一个eventpoll对象,并初始化eventpoll的三个等待队列,wait,poll_wait以及rdlist (ready的fd list)。同时还会初始化被监视fs的rbtree 根节点。

1、首先创建一个struct eventpoll对象; 2、然后分配一个未使用的文件描述符; 3、然后创建一个struct file对象,将file中的struct file_operations *f_op设置为全局变量eventpoll_fops,将void *private指向刚创建的eventpoll对象ep; 4、然后设置eventpoll中的file指针; 5、最后将文件描述符添加到当前进程的文件描述符表中,并返回给用户

epoll_create返回的fd是什么?

这个模块在内核初始化时(操作系统启动)注册了一个新的文件系统,叫"eventpollfs"(在eventpoll_fs_type结构里),然后挂载此文件系统。另外还创建两个内核cache(在内核编程中,如果需要频繁分配小块内存,应该创建kmem_cahe来做“内存池”),分别用于存放struct epitem和eppoll_entry。这个内核高速cache区,就是建立连续的物理内存页,就是物理上分配好你想要的size的内存对象,每次使用时都是使用空闲的已分配好的内存。

现在想想epoll_create为什么会返回一个新的fd?

因为它就是在这个叫做"eventpollfs"的文件系统里创建了一个新文件!返回的就是这个文件的fd索引。完美地遵行了Linux一切皆文件的特色。

epoll_ctl

1、epoll_ctl()首先判断op是不是删除操作,如果不是则将event参数从用户空间拷贝到内核中。 2、接下来判断用户是否设置了EPOLLEXCLUSIVE标志,这个标志是4.5版本内核才有的,主要是为了解决同一个文件描述符同时被添加到多个epoll实例中造成的“惊群”问题,详细描述可以看这里。 这个标志的设置有一些限制条件,比如只能是在EPOLL_CTL_ADD操作中设置,而且对应的文件描述符本身不能是一个epoll实例。 3、接下来从传入的文件描述符开始,一步步获得struct file对象,再从struct file中的private_data字段获得struct eventpoll对象。(那这个头那个头的都在这里面了) 4、如果要添加的文件描述符本身也代表一个epoll实例,那么有可能会造成死循环,内核对此情况做了检查,如果存在死循环则返回错误。 5、接下来会从epoll实例的红黑树里寻找和被监控文件对应的epollitem对象,如果不存在,也就是之前没有添加过该文件,返回的会是NULL。 6、ep_find()函数本质是一个红黑树查找过程,红黑树查找和插入使用的比较函数是ep_cmp_ffd(),先比较struct file对象的地址大小,相同的话再比较文件描述符大小。struct file对象地址相同的一种情况是通过dup()系统调用将不同的文件描述符指向同一个struct file对象。 7、接下来会根据操作符op的不同做不同的处理,这里我们只看op等于EPOLL_CTL_ADD时的添加操作。首先会判断上一步操作中返回的epollitem对象地址是否为NULL,不是NULL说明该文件已经添加过了,返回错误,否则调用ep_insert()函数进行真正的添加操作。在添加文件之前内核会自动为该文件增加POLLERR和POLLHUP事件。 8、ep_insert()函数中,首先判断epoll实例中监视的文件数量是否已超过限制,没问题则为待添加的文件创建一个epollitem对象。

9、接下来是比较重要的操作:将epollitem对象添加到被监视文件的等待队列上去。等待队列实际上就是一个回调函数链表,定义在/include/linux/wait.h文件中。因为不同文件系统的实现不同,无法直接通过struct file对象获取等待队列,因此这里通过struct file的poll操作,以回调的方式返回对象的等待队列,这里设置的回调函数是ep_ptable_queue_proc。 10、结构体ep_queue的作用是能够在poll的回调函数中取得对应的epollitem对象,这种做法在Linux内核里非常常见。 11、在回调函数ep_ptable_queue_proc中,内核会创建一个struct eppoll_entry对象,然后将等待队列中的回调函数设置为ep_poll_callback()。也就是说,当被监控文件有事件到来时,比如socker收到数据时,ep_poll_callback()会被回调。 12、ep_item_poll()调用完成之后,会将epitem中的fllink字段添加到struct file中的f_ep_links链表中,这样就可以通过struct file找到所有对应的struct epollitem对象。 13、然后就是将epollitem插入到红黑树中。 14、最后再更新下状态就返回了,插入操作也就完成了。

(在这个实现时,将用户空间epoll_event拷贝到内核中,后续可以将其转化为epitem作为节点存入红黑树中,从eventpoll的红黑树中查找fd所对应的epitem实例(二分搜索),根据传入的op参数行为进行switch判断,对红黑树进行不同的操作。对于ep_insert,首先设置了对应的回调函数,然后调用被监控文件的poll方法(每个支持poll的设备驱动程序都要调用),其实就是在poll里调用了回调函数,这个回调函数实际上不是真正的回调函数,真正的回调函数(ep_poll_callback)在该函数内调用,这个回调函数只是创建了struct eppoll_entry,将真正回调函数和epitem关联起来,之后将其加入设备等待队列。当设备就绪,唤醒等待队列上的等待者,调用对应的真正的回调函数,这个回调函数实际上就是将红黑树上收到event的epitem插入到它的就绪队列中并唤醒调用epoll_wait进程。在ep_insert中还将epitem插入到eventpoll中的红黑树上,然后还会去判断当前插入的event是否是刚好发生,如果是直接将其加入就绪队列,然后唤醒epoll_wait。)

epoll_wait

观察就绪列表里面有没有数据,并进行提取和清空就绪列表,非常高效。

ep_poll_callback函数主要的功能是将被监视文件的等待事件就绪时,将文件对应的epitem实例添加到就绪队列中,当用户调用epoll_wait()时,内核会将就绪队列中的事件报告给用户。

在epoll_wait主要是调用了ep_poll,在ep_poll里直接判断就绪链表有无数据,有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。当有数据时,还需要将内核就绪事件拷贝到传入参数的events中的用户空间,就绪链表中的数据一旦拷贝就没有了,所以这里要区分LT和ET,如果是LT有可能会将后续的重新放入就绪链表。

ps:我们在调用ep_send_events_proc()将就绪队列中的事件拷贝给用户的期间,新就绪的events被挂载到eventpoll.ovflist所以我们需要遍历eventpoll.ovflist将所有已就绪的epitem 重新挂载到就绪队列中,等待下一次epoll_wait()进行交付…

LT和ET实现的区别?

由epoll_wait进行实现,如果是LT模式,它发现socket上还有未处理的事件,则在清理就绪列表后,重新把句柄放回刚刚清空的就绪列表。

LT(level triggered) 是 缺省 的工作方式 ,并且同时支持 block 和 no-block socket. 在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的 fd 进行 IO 操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的 select/poll 都是这种模型的代表.

ET(edge-triggered) 是高速工作方式 ,只支持 no-block socket 。在这种模式下,当描述符从未就绪变为就绪时,内核通过 epoll 告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了 ( 比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个 EWOULDBLOCK 错误)。但是请注意,如果一直不对这个 fd 作 IO 操作 ( 从而导致它再次变成未就绪 ) ,内核不会发送更多的通知 (only once), 不过在 TCP 协议中, ET 模式的加速效用仍需要更多的 benchmark 确认。

epoll 工作在 ET 模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读 / 阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用 ET 模式的 epoll 接口,在后面会介绍避免可能的缺陷。

  • 基于非阻塞文件句柄
  • 只有当 read(2) 或者 write(2) 返回 EAGAIN 时才需要挂起,等待。但这并不是说每次 read() 时都需要循环读,直到读到产生一个 EAGAIN 才认为此次事件处理完成,当 read() 返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。

红黑树中每个成员由描述符值和所要监控的文件描述符指向的文件表项的引用等组成。

关键数据结构

// epoll的核心实现,对应于一个epoll描述符  
struct eventpoll {  
    spinlock_t lock;  
    struct mutex mtx;  
    wait_queue_head_t wq; // sys_epoll_wait() 等待在这里  
    // f_op->poll()  使用的, 被其他事件通知机制利用的wait_address  
    wait_queue_head_t poll_wait;  
    //已就绪的需要检查的epitem 列表 
    struct list_head rdllist;  
    //保存所有加入到当前epoll的文件对应的epitem  
    struct rb_root rbr;  
    // 当正在向用户空间复制数据时, 产生的可用文件  
    struct epitem *ovflist;  
    /* The user that created the eventpoll descriptor */  
    struct user_struct *user;  
    struct file *file;  
    //优化循环检查,避免循环检查中重复的遍历
    int visited;  
    struct list_head visited_list_link;  
}  
// 对应于一个加入到epoll的文件  
struct epitem {  
    // 挂载到eventpoll 的红黑树节点  
    struct rb_node rbn;  
    // 挂载到eventpoll.rdllist 的节点  
    struct list_head rdllink;  
    // 连接到ovflist 的指针  
    struct epitem *next;  
    /* 文件描述符信息fd + file, 红黑树的key */  
    struct epoll_filefd ffd;  
    /* Number of active wait queue attached to poll operations */  
    int nwait;  
    // 当前文件的等待队列(eppoll_entry)列表  
    // 同一个文件上可能会监视多种事件,  
    // 这些事件可能属于不同的wait_queue中  
    // (取决于对应文件类型的实现),  
    // 所以需要使用链表  
    struct list_head pwqlist;  
    // 当前epitem 的所有者  
    struct eventpoll *ep;  
    /* List header used to link this item to the "struct file" items list */  
    struct list_head fllink;  
    /* epoll_ctl 传入的用户数据 */  
    struct epoll_event event;  
};  
// 与一个文件上的一个wait_queue_head 相关联,因为同一文件可能有多个等待的事件,
//这些事件可能使用不同的等待队列  
struct eppoll_entry {  
    // List struct epitem.pwqlist  
    struct list_head llink;  
    // 所有者  
    struct epitem *base;  
    // 添加到wait_queue 中的节点  
    wait_queue_t wait;  
    // 文件wait_queue 头  
    wait_queue_head_t *whead;  
}; 

红黑树在epoll中的作用

哈希表. 空间因素,可伸缩性. (1)频繁增删. 哈希表需要预估空间大小, 这个场景下无法做到. 间接影响响应时间,假如要resize,原来的数据还得移动.即使用了一致性哈希算法, 也难以满足非阻塞的timeout时间限制.(时间不稳定) (2) 百万级连接,哈希表有镂空的空间,太浪费内存.

跳表. 慢于红黑树. 空间也高. 红黑树. 经验判断,内核的其他地方如防火墙也使用红黑树,实践上看性能最优.

AVL树.平衡二叉树能够保证在最坏的情况下也能达到lgN,要实现这一目标,我们就要保证在插入完成后始终保持平衡状态。在一棵具有N个节点的树中,我们希望该树的高度能够维持在lgN左右,这样我们就能保证只需要lgN次比较操作就可以查找到想要的值。不幸的是,每次插入元素之后维持树的平衡状态太昂贵。所以就出现一些新的数据结构来保证在最坏的情况下插入和查找效率都能保证在对数的时间复杂度内完成。

文件描述符是以什么方式挂载在红黑树的节点中?是int吗?

/* 文件描述符信息fd + file, 红黑树的key */  
struct epoll_filefd ffd; 

epoll惊群了解多少?

和虚假唤醒有点像。

考虑如下场景: 主进程创建socket, bind, listen之后,fork出多个子进程,每个子进程都开始循环处理(accept)这个socket。每个进程都阻塞在accpet上,当一个新的连接到来时,所有的进程都会被唤醒,但其中只有一个进程会accept成功,其余皆失败,重新休眠。这就是accept惊群。

那么这个问题真的存在吗?

事实上,历史上,Linux 的 accpet 确实存在惊群问题,但现在的内核都解决该问题了。即,当多个进程/线程都阻塞在对同一个 socket 的 accept 调用上时,当有一个新的连接到来,内核只会唤醒一个进程,其他进程保持休眠,压根就不会被唤醒。

如上所述,accept 已经不存在惊群问题,但 epoll 上还是存在惊群问题。即,如果多个进程/线程阻塞在监听同一个 listening socket fd 的 epoll_wait 上,当有一个新的连接到来时,所有的进程都会被唤醒。

accept 确实应该只能被一个进程调用成功,内核很清楚这一点。但 epoll 不一样,他监听的文件描述符,除了可能后续被 accept 调用外,还有可能是其他网络 IO 事件的,而其他 IO 事件是否只能由一个进程处理,是不一定的,内核不能保证这一点,这是一个由用户决定的事情,例如可能一个文件会由多个进程来读写。所以,对 epoll 的惊群,内核则不予处理。

epoll是线程安全的吗?

简要结论就是epoll是通过锁来保证线程安全的, epoll中粒度最小的自旋锁ep->lock(spinlock)用来保护就绪的队列, 互斥锁ep->mtx用来保护epoll的重要数据结构红黑树。

先来看一下epoll的核心数据结构

struct eventpoll {
	...
	/* 一个自旋锁 */
	spinlock_t lock;
		
	/* 一个互斥锁 */
	struct mutex mtx;

	/* List of ready file descriptors */
	/* 就绪fd队列 */
	struct list_head rdllist;

	/* RB tree root used to store monitored fd structs */
	/* 红黑树 */
	struct rb_root_cached rbr;
	...
};

epoll_ctl()

以下代码即为epoll_ctl()接口的实现, 当需要根据不同的operation通过ep_insert() 或者ep_remove()等接口对epoll自身的数据结构进行操作时都提前获得了ep->mex锁.

/ * epoll_ctl() 接口 */
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
	struct epoll_event __user *, event)
{
	...
	/* 获得 mtx 锁 */
	mutex_lock_nested(&ep->mtx, 0);
	...

	epi = ep_find(ep, tf.file, fd);

	error = -EINVAL;
	switch (op) {
	case EPOLL_CTL_ADD:
		/* 
		 * 通过ep_insert()接口来完成EPOLL_CTL_ADD的操作
		 * /
		if (!epi) {
			epds.events |= POLLERR | POLLHUP;
			error = ep_insert(ep, &epds, tf.file, fd, full_check);
		} else
			error = -EEXIST;
		if (full_check)
			clear_tfile_check_list();
		break;
	case EPOLL_CTL_DEL:
		/* 
		 * 通过ep_insert()接口来完成EPOLL_CTL_ADD的操作
		 * /
		if (epi)
			error = ep_remove(ep, epi);
		else
			error = -ENOENT;
		break;
	case EPOLL_CTL_MOD:
		/* 
		 * 通过ep_insert()接口来完成EPOLL_CTL_ADD的操作
		 * /
		if (epi) {
			if (!(epi->event.events & EPOLLEXCLUSIVE)) {
				epds.events |= POLLERR | POLLHUP;
				error = ep_modify(ep, epi, &epds);
			}
		} else
			error = -ENOENT;
		break;
	}
	if (tep != NULL)
		mutex_unlock(&tep->mtx);
	mutex_unlock(&ep->mtx);		/* 释放mtx锁 */
	...
}

epoll_wait()

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	...
	/* Time to fish for events ... */
	/* 
	 * 可以看到epoll_wait()是通过ep_poll()来等待就绪事件的.
	 * /
	error = ep_poll(ep, events, maxevents, timeout);
	...
}
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
	int pwake = 0;
	unsigned long flags;
	struct epitem *epi = ep_item_from_wait(wait);
	struct eventpoll *ep = epi->ep;
	int ewake = 0;
	
	/* 获得自旋锁 ep->lock来保护就绪队列
	 * 自旋锁ep->lock在ep_poll()里被释放
	 * /
	spin_lock_irqsave(&ep->lock, flags);

	/* If this file is already in the ready list we exit soon */
	/* 在这里将就绪事件添加到rdllist */
	if (!ep_is_linked(&epi->rdllink)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);
		ep_pm_stay_awake_rcu(epi);
	}
	...
}

epoll API

回头来看看API吧。

头文件

#include<sys/epoll.h>

创建句柄

int epoll_create(int size);

创建一个epoll句柄,参数size用于告诉内核监听的文件描述符个数,跟内存大小有关。 返回epoll 文件描述符

epoll控制函数

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event ); // 成功,返0,失败返-1

控制某个epoll监控的文件描述符上的事件:注册,修改,删除

参数释义: epfd:为epoll的句柄 op:表示动作,用3个宏来表示 ··· EPOLL_CTL_ADD(注册新的 fd 到epfd) ··· EPOLL_CTL_DEL(从 epfd 中删除一个 fd) ··· EPOLL_CTL_MOD(修改已经注册的 fd 监听事件)

event:告诉内核需要监听的事件

typedef union epoll_data
{
    void* ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
} epoll_data_t;  /* 保存触发事件的某个文件描述符相关的数据 */
 
struct epoll_event
{
    __uint32_t events;  /* epoll event */
    epoll_data_t data;  /* User data variable */
};
/* epoll_event.events:
  EPOLLIN  表示对应的文件描述符可以读
  EPOLLOUT 表示对应的文件描述符可以写
  EPOLLPRI 表示对应的文件描述符有紧急的数据可读
  EPOLLERR 表示对应的文件描述符发生错误
  EPOLLHUP 表示对应的文件描述符被挂断
  EPOLLET  设置ET模式
*/

epoll消息读取

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

等待所监控文件描述符上有事件的产生

参数释义: events:用来从内核得到事件的集合 maxevent:用于告诉内核这个event有多大,这个maxevent不能大于创建句柄时的size timeout:超时时间 ··· -1:阻塞 ··· 0:立即返回 ···>0:指定微秒

成功返回有多少个文件描述符准备就绪,时间到返回0,出错返回-1.

代码示例

/* 实现功能:通过epoll, 处理多个socket
 * 监听一个端口,监听到有链接时,添加到epoll_event
 * xs
 */
 
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <poll.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <unistd.h> 

#define MYPORT 12345
 
//最多处理的connect
#define MAX_EVENTS 500
 
//当前的连接数
int currentClient = 0; 
 
//数据接受 buf
#define REVLEN 10
char recvBuf[REVLEN];
 
 
//epoll描述符
int epollfd;
//事件数组
struct epoll_event eventList[MAX_EVENTS];
 
void AcceptConn(int srvfd);
void RecvData(int fd);
 
int main()
{
    int i, ret, sinSize;
    int recvLen = 0;
    fd_set readfds, writefds;
    int sockListen, sockSvr, sockMax;
    int timeout;
    struct sockaddr_in server_addr;
    struct sockaddr_in client_addr;
    
    //socket
    if((sockListen=socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
        printf("socket error\n");
        return -1;
    }
    
    bzero(&server_addr, sizeof(server_addr));
    server_addr.sin_family  =  AF_INET;
    server_addr.sin_port = htons(MYPORT);
    server_addr.sin_addr.s_addr  =  htonl(INADDR_ANY); 
    
    //bind
    if(bind(sockListen, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0)
    {
        printf("bind error\n");
        return -1;
    }
    
    //listen
    if(listen(sockListen, 5) < 0)
    {
        printf("listen error\n");
        return -1;
    }
    
    // epoll 初始化
    epollfd = epoll_create(MAX_EVENTS);
    struct epoll_event event;
    event.events = EPOLLIN|EPOLLET;
    event.data.fd = sockListen;
    
    //add Event
    if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sockListen, &event) < 0)
    {
        printf("epoll add fail : fd = %d\n", sockListen);
        return -1;
    }
    
    //epoll
    while(1)
    {
        timeout=3000;                
        //epoll_wait
        int ret = epoll_wait(epollfd, eventList, MAX_EVENTS, timeout);
        
        if(ret < 0)
        {
            printf("epoll error\n");
            break;
        }
        else if(ret == 0)
        {
            printf("timeout ...\n");
            continue;
        }
        
        //直接获取了事件数量,给出了活动的流,这里是和poll区别的关键
        int i = 0;
        for(i=0; i<ret; i++)
        {
            //错误退出
            if ((eventList[i].events & EPOLLERR) ||
                (eventList[i].events & EPOLLHUP) ||
                !(eventList[i].events & EPOLLIN))
            {
              printf ( "epoll error\n");
              close (eventList[i].data.fd);
              return -1;
            }
            
            if (eventList[i].data.fd == sockListen)
            {
                AcceptConn(sockListen);
        
            }else{
                RecvData(eventList[i].data.fd);
            }
        }
    }
    
    close(epollfd);
    close(sockListen);
    
 
    return 0;
}
 
/**************************************************
函数名:AcceptConn
功能:接受客户端的链接
参数:srvfd:监听SOCKET
***************************************************/
void AcceptConn(int srvfd)
{
    struct sockaddr_in sin;
    socklen_t len = sizeof(struct sockaddr_in);
    bzero(&sin, len);
 
    int confd = accept(srvfd, (struct sockaddr*)&sin, &len);
 
    if (confd < 0)
    {
       printf("bad accept\n");
       return;
    }else
    {
        printf("Accept Connection: %d", confd);
    }
    //将新建立的连接添加到EPOLL的监听中
    struct epoll_event event;
    event.data.fd = confd;
    event.events =  EPOLLIN|EPOLLET;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, confd, &event);
}
 
//读取数据
void RecvData(int fd)
{
    int ret;
    int recvLen = 0;
    
    memset(recvBuf, 0, REVLEN);
    printf("RecvData function\n");
    
    if(recvLen != REVLEN)
    {
        while(1)
        {
            //recv数据
            ret = recv(fd, (char *)recvBuf+recvLen, REVLEN-recvLen, 0);
            if(ret == 0)
            {
                recvLen = 0;
                break;
            }
            else if(ret < 0)
            {
                recvLen = 0;
                break;
            }
            //数据接受正常
            recvLen = recvLen+ret;
            if(recvLen<REVLEN)
            {
                continue;
            }
            else
            {
                //数据接受完毕
                printf("buf = %s\n",  recvBuf);
                recvLen = 0;
                break;
            }
        }
    }
 
    printf("data is %s", recvBuf);
}

番外:高效的并发方式

并发编程的目的是让程序”同时”执行多个任务。如果程序是计算密集型的,并发编程并没有什么优势,反而由于任务的切换使效率降低。但如果程序是I/O密集型的,那就不同了。

并发模式是指I/O处理单元和多个逻辑单元之间协调完成任务的方法,服务器主要有两种并发编程模式:半同步/半异步(half-sync/half-async)模式和领导者/追随者(Leader/Followers)模式。

这里讲一个“半同步/半异步”。

下面的内容需要有一定的基础了,小白可以收藏一下以后变强了再看。

半同步/半异步模式

在半同步/半异步模式中,同步线程用于处理客户逻辑,异步线程用于处理I/O事件。异步线程监听到客户请求之后就将其封装成请求对象并插入到请求队列中。请求队列将通知某个工作在同步模式的工作线程来读取并处理该请求对象。

半同步/半反应堆模式(half-sync/half-reactive模式)

半同步/半反应堆模式是半同步/半异步模式的一种变体。 其结构如下图:

在上图中,异步线程只有一个,由主线程充当,负责监听socket上的事件。如果监听socket上有新的连接请求到来,主线程就接受新的连接socket,然后往epoll内核事件表中注册该socket上的读写事件。如果连接socket上有读写事件发生,即有新的客户请求到来或有数据要发送至客户端,主线程就将该连接socket插入到请求队列中,所有工作线程都睡眠在请求队列上,当有任务到来时,他们通过竞争来获取任务的接管权。 由于主线程插入请求队列中的任务是就绪的连接socket,所以该半同步/半反应堆模式所采用的事件处理模式是Reactor模式,即工作线程要自己从socket上读写数据。当然,半同步/半反应堆模式也可以用模拟的Proactor事件处理模式,即由主线程来完成数据的读写操作,此时主线程将应用程序数据、任务类型等信息封装为一个任务对象,然后将其插入到请求队列。

半同步/半反应堆模式的缺点: 主线程和工作线程共享请求队列,因而请求队列是临界资源,所以对请求队列操作的时候需要加锁保护。 每个工作线程在同一时间只能处理一个客户请求。如果客户数量增多,则请求队列中堆积任务太多,客户端的响应会越来越慢。如果增多工作线程的话,则线程的切花也将消耗大量的CPU时间。

高效的半同步/半异步模式

在半同步/半反应堆模式中,每个工作线程同时只能处理一个客户请求,如果并发量大的话,客户端响应会很慢。如果每个工作线程都能同时处理多个客户链接,则就能改善这种情况,所以就有了高效的半同步/半异步模式。 其结构如图:

主线程只管监听socket,当有新的连接socket到来时,主线程就接受连接并返回新的连接socket给某个工作线程。此后该新连接socket上的任何I/O操作都由被选中的工作线程来处理,直到客户端关闭连接。当工作线程检测到有新的连接socket到来时,就把该新的连接socket的读写事件注册到自己的epoll内核事件表中。 主线程和工作线程都维持自己的事件循环,他们各自独立的监听不同事件。因此在这种高效的半同步/半异步模式中,每个线程都工作在异步模式中,所以它并非严格意义上的半同步/半异步模式。

epoll源码学习

数据结构

eventpoll

// epoll的核心实现对应于一个epoll描述符  
struct eventpoll {  
    spinlock_t lock;  
    struct mutex mtx;  
    wait_queue_head_t wq; // sys_epoll_wait() 等待在这里  
    // f_op->poll()  使用的, 被其他事件通知机制利用的wait_address  
    wait_queue_head_t poll_wait;  
    //已就绪的需要检查的epitem 列表 
    struct list_head rdllist;  
    //保存所有加入到当前epoll的文件对应的epitem  
    struct rb_root rbr;  
    // 当正在向用户空间复制数据时, 产生的可用文件  
    struct epitem *ovflist;  
    /* The user that created the eventpoll descriptor */  
    struct user_struct *user;  
    struct file *file;  
    //优化循环检查,避免循环检查中重复的遍历
    int visited;  
    struct list_head visited_list_link;  
}  

epitem

// 对应于一个加入到epoll的文件  
struct epitem {  
    // 挂载到eventpoll 的红黑树节点  
    struct rb_node rbn;  
    // 挂载到eventpoll.rdllist 的节点  
    struct list_head rdllink;  
    // 连接到ovflist 的指针  
    struct epitem *next;  
    /* 文件描述符信息fd + file, 红黑树的key */  
    struct epoll_filefd ffd;  
    /* Number of active wait queue attached to poll operations */  
    int nwait;  
    // 当前文件的等待队列(eppoll_entry)列表  
    // 同一个文件上可能会监视多种事件,  
    // 这些事件可能属于不同的wait_queue中  
    // (取决于对应文件类型的实现),  
    // 所以需要使用链表  
    struct list_head pwqlist;  
    // 当前epitem 的所有者  
    struct eventpoll *ep;  
    /* List header used to link this item to the &quot;struct file&quot; items list */  
    struct list_head fllink;  
    /* epoll_ctl 传入的用户数据 */  
    struct epoll_event event;  
};  

eppoll_entry

// 与一个文件上的一个wait_queue_head 相关联,因为同一文件可能有多个等待的事件,
//这些事件可能使用不同的等待队列  
struct eppoll_entry {  
    // List struct epitem.pwqlist  
    struct list_head llink;  
    // 所有者  
    struct epitem *base;  
    // 添加到wait_queue 中的节点  
    wait_queue_t wait;  
    // 文件wait_queue 头  
    wait_queue_head_t *whead;  
}; 

函数接口

epoll_create()

//先进行判断size是否>=0,若是则直接调用epoll_create1
SYSCALL_DEFINE1(epoll_create, int, size)
{
	if (size <= 0)
    	return -EINVAL;
    return sys_epoll_create1(0);
}

SYSCALL_DEFINE1是一个宏,用于定义有一个参数的系统调用函数,上述宏展开后即成为: int sys_epoll_create(int size),这就是epoll_create系统调用的入口。至于为何要用宏而不是直接声明,主要是因为系统调用的参数个数、传参方式都有严格限制,最多六个参数。

/* 这才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error;
    struct eventpoll *ep = NULL;//主描述符
    /* Check the EPOLL_* constant for consistency.  */

    BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
    /* 对于epoll来讲, 目前唯一有效的flag就是CLOEXEC */
    if (flags & ~EPOLL_CLOEXEC)
        return -EINVAL;
    /*
     * Create the internal data structure ("struct eventpoll").
     */
    /* 分配一个struct eventpoll */
    error = ep_alloc(&ep);
    if (error < 0)
        return error;
    /*
     * Creates all the items needed to setup an eventpoll file. That is,
     * a file structure and a free file descriptor.
     */
    /* 这里是创建一个匿名fd。
      epollfd本身并不存在一个真正的文件与之对应, 所以内核需要创建一个
      "虚拟"的文件, 并为之分配真正的struct file结构, 而且有真正的fd.
      这里2个参数比较关键:
      eventpoll_fops, fops就是file operations, 就是当你对这个文件(这里是虚拟的)进行操作(比如读)时,
      fops里面的函数指针指向真正的操作实现, 类似C++里面虚函数和子类的概念.
      epoll只实现了poll和release(就是close)操作, 其它文件系统操作都有VFS全权处理了.
      ep, ep就是struct epollevent, 它会作为一个私有数据保存在struct file的private指针里面.
    */
    
    error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
                 O_RDWR | (flags & O_CLOEXEC));
    if (error < 0)
        ep_free(ep);
    return error;
}
// epoll 文件系统的相关实现  
// epoll 文件系统初始化, 在系统启动时会调用  

static int __init eventpoll_init(void)  
{  
    struct sysinfo si;  

    si_meminfo(&si);  
    // 限制可添加到epoll的最多的描述符数量  

    max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /  
                       EP_ITEM_COST;  
    BUG_ON(max_user_watches < 0);  

    // 初始化递归检查队列  
    ep_nested_calls_init(&poll_loop_ncalls);  
    ep_nested_calls_init(&poll_safewake_ncalls);  
    ep_nested_calls_init(&poll_readywalk_ncalls);  
    // epoll 使用的slab分配器分别用来分配epitem和eppoll_entry  
    epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),  
                                  0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);  
    pwq_cache = kmem_cache_create("eventpoll_pwq",  
                                  sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);  

    return 0;  
}  

epoll_ctl()

//创建好epollfd后, 接下来添加fd
//epoll_ctl的参数:epfd 表示epollfd;op 有ADD,MOD,DEL,
//fd 是需要监听的描述符,event 我们感兴趣的events
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,  
                struct epoll_event __user *, event)  
{  
    int error;  
    int did_lock_epmutex = 0;  
    struct file *file, *tfile;  
    struct eventpoll *ep;  
    struct epitem *epi;  
    struct epoll_event epds;  

    error = -EFAULT;  
    //错误处理以及从用户空间将epoll_event结构copy到内核空间.
    if (ep_op_has_event(op) &&  
            // 复制用户空间数据到内核  
            copy_from_user(&epds, event, sizeof(struct epoll_event))) {  
        goto error_return;  
    }  

    // 取得 epfd 对应的文件  
    error = -EBADF;  
    file = fget(epfd);  
    if (!file) {  
        goto error_return;  
    }  

    // 取得目标文件  
    tfile = fget(fd);  
    if (!tfile) {  
        goto error_fput;  
    }  

    // 目标文件必须提供 poll 操作  
    error = -EPERM;  
    if (!tfile->f_op || !tfile->f_op->poll) {  
        goto error_tgt_fput;  
    }  

    // 添加自身或epfd 不是epoll 句柄  
    error = -EINVAL;  
    if (file == tfile || !is_file_epoll(file)) {  
        goto error_tgt_fput;  
    }  

    // 取得内部结构eventpoll  
    ep = file->private_data;  

    // EPOLL_CTL_MOD 不需要加全局锁 epmutex  
    if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {  
        mutex_lock(&epmutex);  
        did_lock_epmutex = 1;  
    }  
    if (op == EPOLL_CTL_ADD) {  
        if (is_file_epoll(tfile)) {  
            error = -ELOOP;  
            // 目标文件也是epoll 检测是否有循环包含的问题  
            if (ep_loop_check(ep, tfile) != 0) {  
                goto error_tgt_fput;  
            }  
        } else  
        {  
            // 将目标文件添加到 epoll 全局的tfile_check_list 中  
            list_add(&tfile->f_tfile_llink, &tfile_check_list);  
        }  
    }  

    mutex_lock_nested(&ep->mtx, 0);  

    // 以tfile 和fd 为key 在rbtree 中查找文件对应的epitem  
    epi = ep_find(ep, tfile, fd);  

    error = -EINVAL;  
    switch (op) {  
    case EPOLL_CTL_ADD:  
        if (!epi) {  
            // 没找到, 添加额外添加ERR HUP 事件  
            epds.events |= POLLERR | POLLHUP;  
            error = ep_insert(ep, &epds, tfile, fd);  
        } else {  
            error = -EEXIST;  
        }  
        // 清空文件检查列表  
        clear_tfile_check_list();  
        break;  
    case EPOLL_CTL_DEL:  
        if (epi) {  
            error = ep_remove(ep, epi);  
        } else {  
            error = -ENOENT;  
        }  
        break;  
    case EPOLL_CTL_MOD:  
        if (epi) {  
            epds.events |= POLLERR | POLLHUP;  
            error = ep_modify(ep, epi, &epds);  
        } else {  
            error = -ENOENT;  
        }  
        break;  
    }  
    mutex_unlock(&ep->mtx);  

error_tgt_fput:  
    if (did_lock_epmutex) {  
        mutex_unlock(&epmutex);  
    }  

    fput(tfile);  
error_fput:  
    fput(file);  
error_return:  

    return error;  
}  
//ep_insert()在epoll_ctl()中被调用, 完成往epollfd里面添加一个监听fd的工作
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,  
                     struct file *tfile, int fd)  
{  
    int error, revents, pwake = 0;  
    unsigned long flags;  
    long user_watches;  
    struct epitem *epi;  
    struct ep_pqueue epq;  
    /* 
    struct ep_pqueue { 
        poll_table pt; 
        struct epitem *epi; 
    }; 
    */  

    // 增加监视文件数  
    user_watches = atomic_long_read(&ep->user->epoll_watches);  
    if (unlikely(user_watches >= max_user_watches)) {  
        return -ENOSPC;  
    }  

    // 分配初始化 epi  
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) {  
        return -ENOMEM;  
    }  

    INIT_LIST_HEAD(&epi->rdllink);  
    INIT_LIST_HEAD(&epi->fllink);  
    INIT_LIST_HEAD(&epi->pwqlist);  
    epi->ep = ep;  
    // 初始化红黑树中的key  
    ep_set_ffd(&epi->ffd, tfile, fd);  
    // 直接复制用户结构  
    epi->event = *event;  
    epi->nwait = 0;  
    epi->next = EP_UNACTIVE_PTR;  

    // 初始化临时的 epq  
    epq.epi = epi;  
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);  
    // 设置事件掩码  
    epq.pt._key = event->events;  
    //  内部会调用ep_ptable_queue_proc, 在文件对应的wait queue head 上  
    // 注册回调函数, 并返回当前文件的状态  
    revents = tfile->f_op->poll(tfile, &epq.pt);  

    // 检查错误  
    error = -ENOMEM;  
    if (epi->nwait < 0) { // f_op->poll 过程出错  
        goto error_unregister;  
    }  
    // 添加当前的epitem 到文件的f_ep_links 链表  
    spin_lock(&tfile->f_lock);  
    list_add_tail(&epi->fllink, &tfile->f_ep_links);  
    spin_unlock(&tfile->f_lock);  

    // 插入epi 到rbtree  
    ep_rbtree_insert(ep, epi);  

    /* now check if we've created too many backpaths */  
    error = -EINVAL;  
    if (reverse_path_check()) {  
        goto error_remove_epi;  
    }  

    spin_lock_irqsave(&ep->lock, flags);  

    /* 文件已经就绪插入到就绪链表rdllist */  
    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {  
        list_add_tail(&epi->rdllink, &ep->rdllist);  


        if (waitqueue_active(&ep->wq))  
            // 通知sys_epoll_wait , 调用回调函数唤醒sys_epoll_wait 进程  
        {  
            wake_up_locked(&ep->wq);  
        }  
        // 先不通知调用eventpoll_poll 的进程  
        if (waitqueue_active(&ep->poll_wait)) {  
            pwake++;  
        }  
    }  

    spin_unlock_irqrestore(&ep->lock, flags);  

    atomic_long_inc(&ep->user->epoll_watches);  

    if (pwake)  
        // 安全通知调用eventpoll_poll 的进程  
    {  
        ep_poll_safewake(&ep->poll_wait);  
    }  

    return 0;  

    error_remove_epi:  
    spin_lock(&tfile->f_lock);  
    // 删除文件上的 epi  
    if (ep_is_linked(&epi->fllink)) {  
        list_del_init(&epi->fllink);  
    }  
    spin_unlock(&tfile->f_lock);  

    // 从红黑树中删除  
    rb_erase(&epi->rbn, &ep->rbr);  

    error_unregister:  
    // 从文件的wait_queue 中删除, 释放epitem 关联的所有eppoll_entry  
    ep_unregister_pollwait(ep, epi);  

    spin_lock_irqsave(&ep->lock, flags);  
    if (ep_is_linked(&epi->rdllink)) {  
        list_del_init(&epi->rdllink);  
    }  
    spin_unlock_irqrestore(&ep->lock, flags);  

    // 释放epi  
    kmem_cache_free(epi_cache, epi);  

    return error;  
}
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)  
{  
    int pollflags;  
    struct eventpoll *ep = file->private_data;  
    // 插入到wait_queue  
    poll_wait(file, &ep->poll_wait, wait);  
    // 扫描就绪的文件列表, 调用每个文件上的poll 检测是否真的就绪,  
    // 然后复制到用户空间  
    // 文件列表中有可能有epoll文件, 调用poll的时候有可能会产生递归,  
    // 调用所以用ep_call_nested 包装一下, 防止死循环和过深的调用  
    pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,  
                               ep_poll_readyevents_proc, ep, ep, current);  
    // static struct nested_calls poll_readywalk_ncalls;  
    return pollflags != -1 ? pollflags : 0;  
}  
// 通用的poll_wait 函数, 文件的f_ops->poll 通常会调用此函数  
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)  
{  
    if (p && p->_qproc && wait_address) {  
        // 调用_qproc 在wait_address 上添加节点和回调函数  
        // 调用 poll_table_struct 上的函数指针向wait_address添加节点, 并设置节点的func  
        // (如果是select或poll 则是 __pollwait, 如果是 epoll 则是 ep_ptable_queue_proc),  
        p->_qproc(filp, wait_address, p);  
    }  
}
/* 
 * 该函数在调用f_op->poll()时会被调用.
 * 也就是epoll主动poll某个fd时, 用来将epitem与指定的fd关联起来的.
 * 关联的办法就是使用等待队列(waitqueue)
 */
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;
    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        /* 初始化等待队列, 指定ep_poll_callback为唤醒时的回调函数,
         * 当我们监听的fd发生状态改变时, 也就是队列头被唤醒时,
         * 指定的回调函数将会被调用. */
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        /* 将刚分配的等待队列成员加入到头中, 头是由fd持有的 */
        add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        /* nwait记录了当前epitem加入到了多少个等待队列中,
         * 我认为这个值最大也只会是1... */
        epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}
//回调函数, 当我们监听的fd发生状态改变时, 它会被调用.
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    int pwake = 0;
    unsigned long flags;
    //从等待队列获取epitem.需要知道哪个进程挂载到这个设备
    struct epitem *epi = ep_item_from_wait(wait);
    struct eventpoll *ep = epi->ep;//获取
    spin_lock_irqsave(&ep->lock, flags);

    if (!(epi->event.events & ~EP_PRIVATE_BITS))
        goto out_unlock;

    /* 没有我们关心的event... */
    if (key && !((unsigned long) key & epi->event.events))
        goto out_unlock;

    /* 
     * 这里看起来可能有点费解, 其实干的事情比较简单:
     * 如果该callback被调用的同时, epoll_wait()已经返回了,
     * 也就是说, 此刻应用程序有可能已经在循环获取events,
     * 这种情况下, 内核将此刻发生event的epitem用一个单独的链表
     * 链起来, 不发给应用程序, 也不丢弃, 而是在下一次epoll_wait
     * 时返回给用户.
     */
    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
        if (epi->next == EP_UNACTIVE_PTR) {
            epi->next = ep->ovflist;
            ep->ovflist = epi;
        }
        goto out_unlock;
    }

    /* 将当前的epitem放入ready list */
    if (!ep_is_linked(&epi->rdllink))
        list_add_tail(&epi->rdllink, &ep->rdllist);

    /* 唤醒epoll_wait... */
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    /* 如果epollfd也在被poll, 那就唤醒队列里面的所有成员. */
    if (waitqueue_active(&ep->poll_wait))
        pwake++;
        out_unlock:
    spin_unlock_irqrestore(&ep->lock, flags);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return 1;
}

epoll_wait()

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout)
{
    int error;
    struct file *file;
    struct eventpoll *ep;
    /* The maximum number of event must be greater than zero */
    if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
        return -EINVAL;
    /* Verify that the area passed by the user is writeable */
    /* 这个地方有必要说明一下:
     * 内核对应用程序采取的策略是"绝对不信任",
     * 所以内核跟应用程序之间的数据交互大都是copy, 不允许(也时候也是不能...)指针引用.
     * epoll_wait()需要内核返回数据给用户空间, 内存由用户程序提供,
     * 所以内核会用一些手段来验证这一段内存空间是不是有效的.
     */
    if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
        error = -EFAULT;
        goto error_return;
    }
    /* Get the "struct file *" for the eventpoll file */
    error = -EBADF;
    /* 获取epollfd的struct file, epollfd也是文件嘛 */
    file = fget(epfd);
    if (!file)
        goto error_return;

    error = -EINVAL;
    /* 检查一下它是不是一个真正的epollfd... */
    if (!is_file_epoll(file))
        goto error_fput;

    /* 获取eventpoll结构 */
    ep = file->private_data;

    /* 等待事件到来~~ */
    error = ep_poll(ep, events, maxevents, timeout);
    error_fput:
    fput(file);
    error_return:
    return error;
}
/* 这个函数真正将执行epoll_wait的进程带入睡眠状态... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    int res, eavail;
    unsigned long flags;
    long jtimeout;
    wait_queue_t wait;//等待队列

    /* 计算睡觉时间, 毫秒要转换为HZ */
    jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
        MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
    spin_lock_irqsave(&ep->lock, flags);
    res = 0;
    /* 如果ready list不为空, 就不睡了, 直接干活... */
    if (list_empty(&ep->rdllist)) {

        /* OK, 初始化一个等待队列, 准备直接把自己挂起,
         * 注意current是一个宏, 代表当前进程 */
        init_waitqueue_entry(&wait, current);//初始化等待队列,wait表示当前进程
        __add_wait_queue_exclusive(&ep->wq, &wait);//挂载到ep结构的等待队列
        for (;;) {
            /* 将当前进程设置位睡眠, 但是可以被信号唤醒的状态,
             * 注意这个设置是"将来时", 我们此刻还没睡! */
            set_current_state(TASK_INTERRUPTIBLE);
            /* 如果这个时候, ready list里面有成员了,
             * 或者睡眠时间已经过了, 就直接不睡了... */
            if (!list_empty(&ep->rdllist) || !jtimeout)
                break;
            /* 如果有信号产生, 也起床... */
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }
            /* 啥事都没有,解锁, 睡觉... */
            spin_unlock_irqrestore(&ep->lock, flags);
            /* jtimeout这个时间后, 会被唤醒,
             * ep_poll_callback()如果此时被调用,
             * 那么我们就会直接被唤醒, 不用等时间了... 
             * 再次强调一下ep_poll_callback()的调用时机是由被监听的fd
             * 的具体实现, 比如socket或者某个设备驱动来决定的,
             * 因为等待队列头是他们持有的, epoll和当前进程
             * 只是单纯的等待...
             **/
            jtimeout = schedule_timeout(jtimeout);//睡觉
            spin_lock_irqsave(&ep->lock, flags);
        }
        __remove_wait_queue(&ep->wq, &wait);
        /* OK 我们醒来了... */
        set_current_state(TASK_RUNNING);
    }
    /* Is it worth to try to dig for events ? */
    eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
    spin_unlock_irqrestore(&ep->lock, flags);

    /* 如果一切正常, 有event发生, 就开始准备数据copy给用户空间了... */
    if (!res && eavail &&
        !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
        goto retry;
    return res;
}
//调用p_scan_ready_list()
static int ep_send_events(struct eventpoll *ep,
              struct epoll_event __user *events, int maxevents)
{
    struct ep_send_events_data esed;
    esed.maxevents = maxevents;
    esed.events = events;
    return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
//由ep_send_events()调用本函数
static int ep_scan_ready_list(struct eventpoll *ep,
                  int (*sproc)(struct eventpoll *,
                       struct list_head *, void *),
                  void *priv)
{
    int error, pwake = 0;
    unsigned long flags;
    struct epitem *epi, *nepi;
    LIST_HEAD(txlist);

    mutex_lock(&ep->mtx);

    spin_lock_irqsave(&ep->lock, flags);
    /* 这一步要注意, 首先, 所有监听到events的epitem都链到rdllist上了,
     * 但是这一步之后, 所有的epitem都转移到了txlist上, 而rdllist被清空了,
     * 要注意哦, rdllist已经被清空了! */
    list_splice_init(&ep->rdllist, &txlist);
    /* ovflist, 在ep_poll_callback()里面我解释过, 此时此刻我们不希望
     * 有新的event加入到ready list中了, 保存后下次再处理... */
    ep->ovflist = NULL;
    spin_unlock_irqrestore(&ep->lock, flags);

    /* 在这个回调函数里面处理每个epitem
     * sproc 就是 ep_send_events_proc, 下面会注释到. */
    error = (*sproc)(ep, &txlist, priv);
    spin_lock_irqsave(&ep->lock, flags);

    /* 现在我们来处理ovflist, 这些epitem都是我们在传递数据给用户空间时
     * 监听到了事件. */
    for (nepi = ep->ovflist; (epi = nepi) != NULL;
         nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {

        /* 将这些直接放入readylist */
        if (!ep_is_linked(&epi->rdllink))
            list_add_tail(&epi->rdllink, &ep->rdllist);
    }

    ep->ovflist = EP_UNACTIVE_PTR;

    /* 上一次没有处理完的epitem, 重新插入到ready list */
    list_splice(&txlist, &ep->rdllist);
    /* ready list不为空, 直接唤醒... */
    if (!list_empty(&ep->rdllist)) {

        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }
    spin_unlock_irqrestore(&ep->lock, flags);
    mutex_unlock(&ep->mtx);
    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return error;
}

与[转帖]深入了解epoll模型 -- 开卷有益相似的内容:

[转帖]深入了解epoll模型 -- 开卷有益

https://cloud.tencent.com/developer/article/1992927?areaSource=&traceId= 希望打开此篇对你有所帮助。 文章目录 什么是epoll?或者说,它和select有什么判别? 什么是select 为什么select最大只允许1024?

[转帖]深入了解浏览器存储

https://cloud.tencent.com/developer/article/1954085?areaSource=104001.46&traceId=7WZNP412yK3vh7ebw4th0 前言 随着移动网络的发展与演化,我们手机上现在除了有原生 App,还能跑“WebApp”——它

[转帖]深入了解 gRPC:协议

https://cn.pingcap.com/blog/grpc 经过很长一段时间的开发,TiDB 终于发了 RC3。RC3 版本对于 TiKV 来说最重要的功能就是支持了 gRPC,也就意味着后面大家可以非常方便的使用自己喜欢的语言对接 TiKV 了。 gRPC 是基于 HTTP/2 协议的,要深

[转帖]从Linux零拷贝深入了解Linux-I/O

https://aijishu.com/a/1060000000375591 作者:kevineluo,腾讯 CSIG 后台开发工程师 本文将从文件传输场景以及零拷贝技术深究 Linux I/O 的发展过程、优化手段以及实际应用。 前言 存储器是计算机的核心部件之一,在完全理想的状态下,存储器应该要

[转帖]学习下Redis内存模型

https://baijiahao.baidu.com/s?id=1753357002197139126&wfr=spider&for=pc 前言 redis,对于一个java开发工程师来讲,其实算不得什么复杂新奇的技术,但可能也很少人去深入了解学习它的底层的一些东西。下面将通过对内存统计、内存划分

[转帖]学习下 Redis 内存模型

https://my.oschina.net/u/4090830/blog/5747217 前言 redis,对于一个 java 开发工程师来讲,其实算不得什么复杂新奇的技术,但可能也很少人去深入了解学习它的底层的一些东西。下面将通过对内存统计、内存划分、存储细节、对象类型 & 内部编码这四个模块来

[转帖]基本系统调用性能lmbench测试方法和下载

简介 Lmbench是一套简易,可移植的,符合ANSI/C标准为UNIX/POSIX而制定的微型测评工具。一般来说,它衡量两个关键特征:反应时间和带宽。 Lmbench旨在使系统开发者深入了解关键操作的基础成本。 本文以银河麒麟/Ubuntu为例。 文末获取【lmbench】工具下载地址 测试方法

【转帖】Linux查看二进制文件:一招制敌(linux二进制查看文件)

https://www.dbs724.com/146055.html 一招制敌:学会Linux查看二进制文件 在Linux操作系统中,二进制文件是一种常见的文件类型。如果你想深入了解一个二进制文件,可能需要查看其内容。但是,二进制文件是一种机器可读的文件,可能很难直接理解,因此需要特殊的工具来解析和

[转帖]QPS、TPS、RT、并发数、吞吐量理解和性能优化深入思考

https://baijiahao.baidu.com/s?id=1675704570461446033&wfr=spider&for=pc 吞吐量 在了解qps、tps、rt、并发数之前,首先我们应该明确一个系统的吞吐量到底代表什么含义,一般来说,系统吞吐量指的是系统的抗压、负载能力,代表一个系统

[转帖]Redis 运维实战 第01期:Redis 复制

https://cloud.tencent.com/developer/article/1986816 作者简介 马听,多年 DBA 实战经验,对 MySQL、 Redis、ClickHouse 等数据库有一定了解,专栏《一线数据库工程师带你深入理解 MySQL》作者。 从这篇文章开始,将出几期 R