redis7源码分析:redis 多线程模型解析

redis7,源码,分析,redis,多线程,模型,解析 · 浏览次数 : 4

小编点评

在 eventloop 的 beforeSleep 中,当 io_threads_op 为 IO_THREADS_OP_READ 时,会将 io_threads_op 设置为 IO_THREADS_OP_READ,并给所有 IO线程设置 start condition,使其等待所有客户端请求完成。 `listRewind(io_threads_list[0],&li)` 首先将 `io_threads_list[0]` 的元素添加到 `li` 中。 `for (int j = 1; j < server.io_threads_num; j++) setIOPendingCount(j, count);` 将所有其他线程的等待队列长度设为 `server.io_threads_num`。 在循环结束后,如果 `pending` 为 0,则表示所有客户端请求都完成,将 `io_threads_op` 设置为 `IO_THREADS_OP_IDLE`,关闭所有 IO 线程。

正文

多线程模式中,在main函数中会执行InitServerLast

void InitServerLast() {
    bioInit();
    // 关键一步, 这里启动了多条线程,用于执行命令,redis起名为IO 线程
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}

1. 看initThreadedIO

networking.c

void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */

    /* Indicate that io-threads are currently idle */
    io_threads_op = IO_THREADS_OP_IDLE;

    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        // 为线程创建一个 list,并将该list 的指针加入到io_threads_list中
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        // 这里创建线程,并将线程id 假如到io_threads数组中
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}
这样在该函数中,创建了线程 并 为该线程分配了一个 list

2. 当client 发命令过来时,会调用到readQueryFromClient

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, big_arg = 0;
    size_t qblen, readlen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    // 多线程模式时,在这里会return
    if (postponeClientRead(c)) return;
......
}

3. 看postponeClientRead

int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
        io_threads_op == IO_THREADS_OP_IDLE)
    {
        // 把该client 假如到server的clients_pending_read中,放到list头
        listAddNodeHead(server.clients_pending_read中,c);
        // 互相引用一下
        c->pending_read_list_node = listFirst(server.clients_pending_read);
        return 1;
    } else {
        return 0;
    }
}

4. IO线程创建后 持续干活,会调用IOThreadMain

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    // myid 也是对应的list数组的id
    long id = (unsigned long)myid;
    char thdname[16];

......

    while(1) {
......

        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        // 拿到对应的 list
        listRewind(io_threads_list[id],&li);
        // 遍历list
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                // 执行读取操作
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        setIOPendingCount(id, 0);
    }
}

那么什么时候去将io_threads_op 设置为IO_THREADS_OP_READ
在eventloop 的 beforeSleep中, beforeSleep 会调用handleClientsWithPendingReadsUsingThreads
int handleClientsWithPendingReadsUsingThreads(void) {
......
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    // 获取之前postponeClientRead中添加到list中 的所有元素list
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        // 将元素即对应的client,添加到对应的线程的list中去
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    // 将io线程标志 置为 IO_THREADS_OP_READ
    io_threads_op = IO_THREADS_OP_READ;
    // 设置所有 IO线程 待处理的请求的个数,求和
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    // 主线程也有自己的 list,即index == 0 的位置的list, 在这里也参与IO操作,不浪费一点性能
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    // 这里是再次求和,知道发现所有的客户端请求完成才退出循环
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    // 这里都执行完成了,再讲IO线程标志位职位IDLE
    io_threads_op = IO_THREADS_OP_IDLE;
......
}
从源码中的解释也可以看出,虽然加入了多线程,加快了命令处理的速度,但是如果某个命令特别耗时,
会影响 beforeSleep函数一直卡住,导致后续的命令一直pending,不能加入到io线程的list中

所以还是要注意命令的执行效率

5. 最后每个线程单独调用 readQueryFromClient, 由上面分析,当前io_threads_op 为 read 状态
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, big_arg = 0;
    size_t qblen, readlen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
	// 此时不会return, 会往下执行命令操作了
    if (postponeClientRead(c)) return;
......
}

int postponeClientRead(client *c) {
    // io线程来的, io_threads_op 为 read,返回0
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
        io_threads_op == IO_THREADS_OP_IDLE)
    {
        listAddNodeHead(server.clients_pending_read,c);
        c->pending_read_list_node = listFirst(server.clients_pending_read);
        return 1;
    } else {
        return 0;
    }
}

到此分析结束

多线程模型为 主线程接收 accept 和 read, write, 但不操作,将对应的client 假如到对应iothread的list中,然后由子线程处理。

与 redis7源码分析:redis 多线程模型解析相似的内容:

redis7源码分析:redis 多线程模型解析

多线程模式中,在main函数中会执行InitServerLast void InitServerLast() { bioInit(); // 关键一步, 这里启动了多条线程,用于执行命令,redis起名为IO 线程 initThreadedIO(); set_jemalloc_bg_thread(s

redis7源码分析:redis 单线程模型解析,一条get命令执行流程

有了下文的梳理后 redis 启动流程 再来解析redis 在单线程模式下解析并处理客户端发来的命令 1. 当 client fd 可读时,会回调readQueryFromClient函数 void readQueryFromClient(connection *conn) { client *c

redis7源码分析:redis 启动流程

1. redis 由 server.c 的main函数启动 int main(int argc, char **argv) { ... // 上面的部分为读取配置和启动命令参数解析,看到这一行下面为启动流程 serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is

redis 源码分析:Jedis 哨兵模式连接原理

1. 可以从单元测试开始入手 查看类JedisSentinelPool private static final String MASTER_NAME = "mymaster"; protected static final HostAndPort sentinel1 = HostAndPorts.

[转帖]Redis 4.0 自动内存碎片整理(Active Defrag)源码分析

阅读本文前建议先阅读此篇博客: Redis源码从哪里读起 Redis 4.0 版本增加了许多不错的新功能,其中自动内存碎片整理功能 activedefrag 肯定是非常诱人的一个,这让 Redis 集群回收内存碎片相比 Redis 3.0 更加优雅,便利。我们升级 Redis 4.0 后直接开启了a

跳跃表数据结构与算法分析

目前市面上充斥着大量关于跳跃表结构与Redis的源码解析,但是经过长期观察后发现大都只是在停留在代码的表面,而没有系统性地介绍跳跃表的由来以及各种常量的由来。作为一种概率数据结构,理解各种常量的由来可以更好地进行变化并应用到高性能功能开发中。本文没有重复地以对现有优秀实现进行代码分析,而是通过对跳跃表进行了系统性地介绍与形式化分析,并给出了在特定场景下的跳跃表扩展方式,方便读者更好地理解跳跃表数据

记一次Redis Cluster Pipeline导致的死锁问题

本文介绍了一次排查Dubbo线程池耗尽问题的过程。通过查看Dubbo线程状态、分析Jedis连接池获取连接的源码、排查死锁条件等方面,最终确认是因为使用了cluster pipeline模式且没有设置超时时间导致死锁问题。

[转帖]redis集群报错CROSSSLOT Keys in request don‘t hash to the same slot

先上结果: $redis->sDiffStore('live_room:robots:data:' . $info['id'], 'user_info:robots_list', ''); 上述代码执行后redis抛出一个异常。来看redis源码是如何抛出这个异常的(附redis源码地址:redis

[转帖]Redis配置文件介绍

https://www.cnblogs.com/fanqisoft/p/10422381.html Redis在源码包中存放了一个Redis配置实例文件,文件中对各个配置点进行了简单的介绍,我也通过这个文件来对Redis的一些配置进行一些简单介绍。 一.UNITS(单位)【了解】 1.Redis服务

[转帖]一文读懂Redis6的--bigkeys选项源码以及redis-bigkey-online项目介绍

https://www.jianshu.com/p/9e150d72ffc9 本文分为两个部分,第一是详细讲解Redis6的--bigkeys选项相关源码是怎样实现的,第二部分为自己对--bigkeys源码的优化项目redis-bigkey-online的介绍。redis-bigkey-online