redis7源码分析:redis 多线程模型解析

redis7,源码,分析,redis,多线程,模型,解析 · 浏览次数 : 4


在 eventloop 的 beforeSleep 中,当 io_threads_op 为 IO_THREADS_OP_READ 时,会将 io_threads_op 设置为 IO_THREADS_OP_READ,并给所有 IO线程设置 start condition,使其等待所有客户端请求完成。 `listRewind(io_threads_list[0],&li)` 首先将 `io_threads_list[0]` 的元素添加到 `li` 中。 `for (int j = 1; j < server.io_threads_num; j++) setIOPendingCount(j, count);` 将所有其他线程的等待队列长度设为 `server.io_threads_num`。 在循环结束后,如果 `pending` 为 0,则表示所有客户端请求都完成,将 `io_threads_op` 设置为 `IO_THREADS_OP_IDLE`,关闭所有 IO 线程。



void InitServerLast() {
    // 关键一步, 这里启动了多条线程,用于执行命令,redis起名为IO 线程
    server.initial_memory_usage = zmalloc_used_memory();

1. 看initThreadedIO


void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */

    /* Indicate that io-threads are currently idle */
    io_threads_op = IO_THREADS_OP_IDLE;

    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);

    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        // 为线程创建一个 list,并将该list 的指针加入到io_threads_list中
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        // 这里创建线程,并将线程id 假如到io_threads数组中
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
        io_threads[i] = tid;
这样在该函数中,创建了线程 并 为该线程分配了一个 list

2. 当client 发命令过来时,会调用到readQueryFromClient

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, big_arg = 0;
    size_t qblen, readlen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    // 多线程模式时,在这里会return
    if (postponeClientRead(c)) return;

3. 看postponeClientRead

int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        io_threads_op == IO_THREADS_OP_IDLE)
        // 把该client 假如到server的clients_pending_read中,放到list头
        // 互相引用一下
        c->pending_read_list_node = listFirst(server.clients_pending_read);
        return 1;
    } else {
        return 0;

4. IO线程创建后 持续干活,会调用IOThreadMain

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    // myid 也是对应的list数组的id
    long id = (unsigned long)myid;
    char thdname[16];


    while(1) {

        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        // 拿到对应的 list
        // 遍历list
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                // 执行读取操作
            } else {
                serverPanic("io_threads_op value is unknown");
        setIOPendingCount(id, 0);

那么什么时候去将io_threads_op 设置为IO_THREADS_OP_READ
在eventloop 的 beforeSleep中, beforeSleep 会调用handleClientsWithPendingReadsUsingThreads
int handleClientsWithPendingReadsUsingThreads(void) {
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    // 获取之前postponeClientRead中添加到list中 的所有元素list
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        // 将元素即对应的client,添加到对应的线程的list中去

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    // 将io线程标志 置为 IO_THREADS_OP_READ
    io_threads_op = IO_THREADS_OP_READ;
    // 设置所有 IO线程 待处理的请求的个数,求和
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);

    /* Also use the main thread to process a slice of clients. */
    // 主线程也有自己的 list,即index == 0 的位置的list, 在这里也参与IO操作,不浪费一点性能
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

    /* Wait for all the other threads to end their work. */
    // 这里是再次求和,知道发现所有的客户端请求完成才退出循环
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;

    // 这里都执行完成了,再讲IO线程标志位职位IDLE
    io_threads_op = IO_THREADS_OP_IDLE;
会影响 beforeSleep函数一直卡住,导致后续的命令一直pending,不能加入到io线程的list中


5. 最后每个线程单独调用 readQueryFromClient, 由上面分析,当前io_threads_op 为 read 状态
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, big_arg = 0;
    size_t qblen, readlen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
	// 此时不会return, 会往下执行命令操作了
    if (postponeClientRead(c)) return;

int postponeClientRead(client *c) {
    // io线程来的, io_threads_op 为 read,返回0
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        io_threads_op == IO_THREADS_OP_IDLE)
        c->pending_read_list_node = listFirst(server.clients_pending_read);
        return 1;
    } else {
        return 0;


多线程模型为 主线程接收 accept 和 read, write, 但不操作,将对应的client 假如到对应iothread的list中,然后由子线程处理。

与 redis7源码分析:redis 多线程模型解析相似的内容:

redis7源码分析:redis 多线程模型解析

多线程模式中,在main函数中会执行InitServerLast void InitServerLast() { bioInit(); // 关键一步, 这里启动了多条线程,用于执行命令,redis起名为IO 线程 initThreadedIO(); set_jemalloc_bg_thread(s

redis7源码分析:redis 单线程模型解析,一条get命令执行流程

有了下文的梳理后 redis 启动流程 再来解析redis 在单线程模式下解析并处理客户端发来的命令 1. 当 client fd 可读时,会回调readQueryFromClient函数 void readQueryFromClient(connection *conn) { client *c

redis7源码分析:redis 启动流程

1. redis 由 server.c 的main函数启动 int main(int argc, char **argv) { ... // 上面的部分为读取配置和启动命令参数解析,看到这一行下面为启动流程 serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is

redis 源码分析:Jedis 哨兵模式连接原理

1. 可以从单元测试开始入手 查看类JedisSentinelPool private static final String MASTER_NAME = "mymaster"; protected static final HostAndPort sentinel1 = HostAndPorts.

[转帖]Redis 4.0 自动内存碎片整理(Active Defrag)源码分析

阅读本文前建议先阅读此篇博客: Redis源码从哪里读起 Redis 4.0 版本增加了许多不错的新功能,其中自动内存碎片整理功能 activedefrag 肯定是非常诱人的一个,这让 Redis 集群回收内存碎片相比 Redis 3.0 更加优雅,便利。我们升级 Redis 4.0 后直接开启了a



记一次Redis Cluster Pipeline导致的死锁问题

本文介绍了一次排查Dubbo线程池耗尽问题的过程。通过查看Dubbo线程状态、分析Jedis连接池获取连接的源码、排查死锁条件等方面,最终确认是因为使用了cluster pipeline模式且没有设置超时时间导致死锁问题。

[转帖]redis集群报错CROSSSLOT Keys in request don‘t hash to the same slot

先上结果: $redis->sDiffStore('live_room:robots:data:' . $info['id'], 'user_info:robots_list', ''); 上述代码执行后redis抛出一个异常。来看redis源码是如何抛出这个异常的(附redis源码地址:redis

[转帖]Redis配置文件介绍 Redis在源码包中存放了一个Redis配置实例文件,文件中对各个配置点进行了简单的介绍,我也通过这个文件来对Redis的一些配置进行一些简单介绍。 一.UNITS(单位)【了解】 1.Redis服务

[转帖]一文读懂Redis6的--bigkeys选项源码以及redis-bigkey-online项目介绍 本文分为两个部分,第一是详细讲解Redis6的--bigkeys选项相关源码是怎样实现的,第二部分为自己对--bigkeys源码的优化项目redis-bigkey-online的介绍。redis-bigkey-online