您现在的位置是:主页 > news > 学前端什么网站好/百度店面定位怎么申请

学前端什么网站好/百度店面定位怎么申请

admin2025/6/1 5:32:25news

简介学前端什么网站好,百度店面定位怎么申请,杭州网页设计招聘,济南做门户网站开发公司文章目录前言1. Redis 多线程模型2. Redis 多线程源码分析2.1 IO 线程的初始化2.2 IO 线程的启动2.3 IO 线程处理读任务的流程前言 Redis 6.0 中一个重大的改变就是引入了多线程IO。我们都知道 Redis 基于内存操作,几乎不存在 CPU 成为瓶颈的情况, 它主…

学前端什么网站好,百度店面定位怎么申请,杭州网页设计招聘,济南做门户网站开发公司文章目录前言1. Redis 多线程模型2. Redis 多线程源码分析2.1 IO 线程的初始化2.2 IO 线程的启动2.3 IO 线程处理读任务的流程前言 Redis 6.0 中一个重大的改变就是引入了多线程IO。我们都知道 Redis 基于内存操作,几乎不存在 CPU 成为瓶颈的情况, 它主…

文章目录

  • 前言
  • 1. Redis 多线程模型
  • 2. Redis 多线程源码分析
      • 2.1 IO 线程的初始化
      • 2.2 IO 线程的启动
      • 2.3 IO 线程处理读任务的流程

前言

Redis 6.0 中一个重大的改变就是引入了多线程IO。我们都知道 Redis 基于内存操作,几乎不存在 CPU 成为瓶颈的情况, 它主要受限于内存和网络。从 Redis 自身角度来说,读写网络的 read/write 系统调用占用了 Redis 执行期间大部分 CPU 时间,瓶颈其实主要在于网络的 IO 消耗。基于这种情况,Redis 优化的方向在于提高网络 IO 性能,而一个简单有效的方法就是使用多线程任务分摊 Redis 同步 IO 读写的负荷

1. Redis 多线程模型

Redis 6.0 版本以前的线程模型为典型的 单 Reactor 单线程 ,但是其多线程模型却与标准的 单 Reactor 多线程 不太相同,区别在于 Redis 多线程模型不是把业务逻辑处理交给子线程,而是把对网络数据的读写交给子线程处理,业务逻辑仍然由主线程完成

在这里插入图片描述

2. Redis 多线程源码分析

在这里插入图片描述

上图为 Redis 多线程相关的流程图,该图根据 Redis 6.0 源码阅读笔记(1)-Redis 服务端启动及命令执行 示意图适当增减而来,读者如有不理解的地方可以前往查看原文。以下为 Redis 多线程的源码分析

2.1 IO 线程的初始化

  1. Redis 服务端启动的时候会有 IO 线程的初始化步骤,其触发函数为 server.c#InitServerLast()

    void InitServerLast() {bioInit();initThreadedIO();set_jemalloc_bg_thread(server.jemalloc_bg_thread);server.initial_memory_usage = zmalloc_used_memory();
    }
    
  2. networking.c#initThreadedIO() 函数负责初始化 IO 线程的数据结构,是整个 IO 线程初始化的核心,其涉及的重要属性及步骤如下

    重要属性

    1. io_threads_active 标志 IO 线程的激活状态,默认为 0 ,也就是非激活态
    2. server.io_threads_num 配置文件中配置的 IO 线程数量,为 1 则只有主线程处理 IO ,不需要再创建线程,大于 IO_THREADS_MAX_NUM(128)则认为配置异常,退出程序
    3. io_threads_list 数组,数组元素为 list 列表,每个 IO 线程需要处理的 client 都放在数组对应下标的 list 中
    4. io_threads_pending 数组,记录每个 IO 线程待处理的 client 数量

    处理步骤

    1. 调用函数 listCreate() 为每个 IO 线程创建任务列表
    2. 初始化 IO 线程互斥对象,初始化 IO 线程当前未处理的任务数量为 0
    3. pthread_create() 函数设置 IO 线程运行时处理的函数为 IOThreadMain()
    #define IO_THREADS_MAX_NUM 128
    list *io_threads_list[IO_THREADS_MAX_NUM];void initThreadedIO(void) {io_threads_active = 0; /* We start with threads not active. *//* Don't spawn any thread if the user selected a single thread:* we'll handle I/O directly from the main thread. */if (server.io_threads_num == 1) return;if (server.io_threads_num > IO_THREADS_MAX_NUM) {serverLog(LL_WARNING,"Fatal: too many I/O threads configured. ""The maximum number is %d.", IO_THREADS_MAX_NUM);exit(1);}/* Spawn and initialize the I/O threads. */for (int i = 0; i < server.io_threads_num; i++) {/* Things we do for all the threads including the main thread. */io_threads_list[i] = listCreate();if (i == 0) continue; /* Thread 0 is the main thread. *//* Things we do only for the additional threads. */pthread_t tid;pthread_mutex_init(&io_threads_mutex[i],NULL);io_threads_pending[i] = 0;pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");exit(1);}io_threads[i] = tid;}
    }
    

2.2 IO 线程的启动

  1. 即便用户配置了 IO 多线程,Redis 在实际的处理中也不一定就会启用多线程处理网络 IO,而是根据系统状态动态地调整。上文已经提到 io_threads_active 标志 IO 线程的激活状态,默认 IO 线程为未激活的状态,追踪该变量即可知道 IO 线程启动的位置。我们先看看 server.c#beforeSleep() 函数, 在 Redis 6.0 源码阅读笔记(1)-Redis 服务端启动及命令执行 一文中已经提到 beforeSleep() 函数实际是在每次事件处理之前调用的函数,其内部处理的逻辑比较多,本文关注的主要有以下几个

    1. handleClientsWithPendingReadsUsingThreads() 函数使用 IO 线程处理等待读取数据的客户端
    2. handleClientsWithPendingWritesUsingThreads() 函数使用 IO 线程处理等待响应的客户端
    void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);/* Just call a subset of vital functions in case we are re-entering* the event loop from processEventsWhileBlocked(). Note that in this* case we keep track of the number of events we are processing, since* processEventsWhileBlocked() wants to stop ASAP if there are no longer* events to handle. */if (ProcessingEventsWhileBlocked) {uint64_t processed = 0;processed += handleClientsWithPendingReadsUsingThreads();processed += tlsProcessPendingData();processed += handleClientsWithPendingWrites();processed += freeClientsInAsyncFreeQueue();server.events_processed_while_blocked += processed;return;}/* Handle precise timeouts of blocked clients. */handleBlockedClientsTimeout();/* We should handle pending reads clients ASAP after event loop. */handleClientsWithPendingReadsUsingThreads();/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */tlsProcessPendingData();/* If tls still has pending unread data don't sleep at all. */aeSetDontWait(server.el, tlsHasPendingData());/* Call the Redis Cluster before sleep function. Note that this function* may change the state of Redis Cluster (from ok to fail or vice versa),* so it's a good idea to call it before serving the unblocked clients* later in this function. */if (server.cluster_enabled) clusterBeforeSleep();/* Run a fast expire cycle (the called function will return* ASAP if a fast cycle is not needed). */if (server.active_expire_enabled && server.masterhost == NULL)activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);/* Unblock all the clients blocked for synchronous replication* in WAIT. */if (listLength(server.clients_waiting_acks))processClientsWaitingReplicas();/* Check if there are clients unblocked by modules that implement* blocking commands. */if (moduleCount()) moduleHandleBlockedClients();/* Try to process pending commands for clients that were just unblocked. */if (listLength(server.unblocked_clients))processUnblockedClients();/* Send all the slaves an ACK request if at least one client blocked* during the previous event loop iteration. Note that we do this after* processUnblockedClients(), so if there are multiple pipelined WAITs* and the just unblocked WAIT gets blocked again, we don't have to wait* a server cron cycle in absence of other event loop events. See #6623. */if (server.get_ack_from_slaves) {robj *argv[3];argv[0] = createStringObject("REPLCONF",8);argv[1] = createStringObject("GETACK",6);argv[2] = createStringObject("*",1); /* Not used argument. */replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);decrRefCount(argv[0]);decrRefCount(argv[1]);decrRefCount(argv[2]);server.get_ack_from_slaves = 0;}/* Send the invalidation messages to clients participating to the* client side caching protocol in broadcasting (BCAST) mode. */trackingBroadcastInvalidationMessages();/* Write the AOF buffer on disk */flushAppendOnlyFile(0);/* Handle writes with pending output buffers. */handleClientsWithPendingWritesUsingThreads();/* Close clients that need to be closed asynchronous */freeClientsInAsyncFreeQueue();/* Before we are going to sleep, let the threads access the dataset by* releasing the GIL. Redis main thread will not touch anything at this* time. */if (moduleCount()) moduleReleaseGIL();
    }
    
  2. networking.c#handleClientsWithPendingWritesUsingThreads() 函数源码如下,其处理流程较为重要

    1. 首先查看数组 server.clients_pending_write 长度是否为 0,为 0 则没有在等待响应的客户端,不需要使用 IO 线程处理。 Redis 使用了server.clients_pending_write 全局数组来存放等待响应的客户端,另一个数组 server.clients_pending_read 则存放等待读取数据的客户端
    2. server.io_threads_num 配置的线程数为 1 或者 stopThreadedIOIfNeeded() 函数返回 true,说明用户没有配置多线程 IO 或者系统动态判断当前不需要使用多线程 IO,则直接调用 handleClientsWithPendingWrites() 函数完成客户端响应
    3. 接下来判断 io_threads_active 全局变量不为 1,说明 IO 线程还没有激活,则调用 startThreadedIO() 函数启动 IO 线程
    4. 将数组 server.clients_pending_write 中存放的待响应客户端按照server.io_threads_num 取余分配到各个 IO 线程的任务列表 io_threads_list[target_id] 中
    5. 设置 io_threads_op 全局 IO 操作标志为 IO_THREADS_OP_WRITE,则 IO 线程都处理写任务,并更新io_threads_pending 数组。这部分暂不展开,下文详细分析
    6. 主线程空循环,等待 IO 线程处理任务完毕。这部分逻辑主要靠io_threads_pending 数组记录每个 IO 线程待处理的 client 数量来判断,如果各个 IO 线程待处理的 client 数量相加为 0,则任务处理完毕,主线程跳出循环
    7. 最后如果还有待处理的客户端则继续处理,处理完毕清空 server.clients_pending_write 数组
    int handleClientsWithPendingWritesUsingThreads(void) {int processed = listLength(server.clients_pending_write);if (processed == 0) return 0; /* Return ASAP if there are no clients. *//* If I/O threads are disabled or we have few clients to serve, don't* use I/O threads, but thejboring synchronous code. */if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {return handleClientsWithPendingWrites();}/* Start threads if needed. */if (!io_threads_active) startThreadedIO();if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);/* Distribute the clients across N different lists. */listIter li;listNode *ln;listRewind(server.clients_pending_write,&li);int item_id = 0;while((ln = listNext(&li))) {client *c = listNodeValue(ln);c->flags &= ~CLIENT_PENDING_WRITE;int target_id = item_id % server.io_threads_num;listAddNodeTail(io_threads_list[target_id],c);item_id++;}/* Give the start condition to the waiting threads, by setting the* start condition atomic var. */io_threads_op = IO_THREADS_OP_WRITE;for (int j = 1; j < server.io_threads_num; j++) {int count = listLength(io_threads_list[j]);io_threads_pending[j] = count;}/* Also use the main thread to process a slice of clients. */listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);writeToClient(c,0);}listEmpty(io_threads_list[0]);/* Wait for all the other threads to end their work. */while(1) {unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)pending += io_threads_pending[j];if (pending == 0) break;}if (tio_debug) printf("I/O WRITE All threads finshed\n");/* Run the list of clients again to install the write handler where* needed. */listRewind(server.clients_pending_write,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);/* Install the write handler if there are pending writes in some* of the clients. */if (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){freeClientAsync(c);}}listEmpty(server.clients_pending_write);return processed;
    }
    
  3. networking.c#startThreadedIO() 函数较为简单,可以看到其逻辑主要为以下几步:

    1. 首先将创建 IO 线程时锁住的互斥对象解锁,也就是使 IO 线程得以运行
    2. 将全局变量io_threads_active 赋值为 1 ,标志 IO 线程已经激活
    void startThreadedIO(void) {if (tio_debug) { printf("S"); fflush(stdout); }if (tio_debug) printf("--- STARTING THREADED IO ---\n");serverAssert(io_threads_active == 0);for (int j = 1; j < server.io_threads_num; j++)pthread_mutex_unlock(&io_threads_mutex[j]);io_threads_active = 1;
    }
    

2.3 IO 线程处理读任务的流程

  1. IO 线程处理网络读取的主要流程在 networking.c#readQueryFromClient()函数中,该函数在 Redis 6.0 源码阅读笔记(1)-Redis 服务端启动及命令执行 一文中已经提到过,主要负责解析客户端传输过来的命令及参数,其和多线程 IO 相关的部分为调用 networking.c#postponeClientRead() 函数将客户端放入等待队列中。需注意如果客户端成功入队,则 readQueryFromClient() 函数不再继续执行,直接 return 了

    void readQueryFromClient(connection *conn) {client *c = connGetPrivateData(conn);int nread, readlen;size_t qblen;/* Check if we want to read from the client later when exiting from* the event loop. This is the case if threaded I/O is enabled. */if (postponeClientRead(c)) return;......
    }
  2. networking.c#postponeClientRead() 函数需要判断多种条件才能决定客户端是否能入队,具体如下:

    1. io_threads_active 必须为 1 ,也就是 IO 线程必须是激活状态
    2. server.io_threads_do_reads 用户配置必须是运行使用 IO 线程读取数据
    3. 客户端的标志位 flags 必须不是 CLIENT_MASTER、CLIENT_SLAVE、CLIENT_PENDING_READ
    4. 以上条件都满足,则将客户端的 flags 标志位设置为CLIENT_PENDING_READ,并将其入队到 server.clients_pending_read 数组
    int postponeClientRead(client *c) {if (io_threads_active &&server.io_threads_do_reads &&!ProcessingEventsWhileBlocked &&!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))){c->flags |= CLIENT_PENDING_READ;listAddNodeHead(server.clients_pending_read,c);return 1;} else {return 0;}
    }
    
  3. 至此客户端任务已经入队,其处理则由beforeSleep() 函数调用networking.c#handleClientsWithPendingReadsUsingThreads() 完成,可以看到这里的处理与多线程 IO 写数据响应客户端是类似的

    1. 先判断 IO 线程激活状态和用户配置是否允许使用 IO线程处理读数据操作
    2. 再判断server.clients_pending_read数组中待处理的客户端数量是否为 0
    3. 取出server.clients_pending_read数组中的客户端,取余分配到各个 IO 线程的任务列表 io_threads_list[target_id]
    4. 将全局的线程操作标志 io_threads_op 设置为IO_THREADS_OP_READ,也就是 IO 线程都处理读任务,并更新io_threads_pending 数组
    5. 主线程自己也处理 IO 读任务,完成后开启空循环等待 IO 线程处理任务完毕
    6. 最后再次处理server.clients_pending_read数组中的客户端,如果客户端 flags 标志位为 CLIENT_PENDING_COMMAND(也就是 IO 线程把客户端的命令及参数解析完成),则调用 processCommandAndResetClient() 函数直接执行命令,否则调用 processInputBuffer() 函数继续解析客户端的命令
    int handleClientsWithPendingReadsUsingThreads(void) {if (!io_threads_active || !server.io_threads_do_reads) return 0;int processed = listLength(server.clients_pending_read);if (processed == 0) return 0;if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);/* Distribute the clients across N different lists. */listIter li;listNode *ln;listRewind(server.clients_pending_read,&li);int item_id = 0;while((ln = listNext(&li))) {client *c = listNodeValue(ln);int target_id = item_id % server.io_threads_num;listAddNodeTail(io_threads_list[target_id],c);item_id++;}/* Give the start condition to the waiting threads, by setting the* start condition atomic var. */io_threads_op = IO_THREADS_OP_READ;for (int j = 1; j < server.io_threads_num; j++) {int count = listLength(io_threads_list[j]);io_threads_pending[j] = count;}/* Also use the main thread to process a slice of clients. */listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);readQueryFromClient(c->conn);}listEmpty(io_threads_list[0]);/* Wait for all the other threads to end their work. */while(1) {unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)pending += io_threads_pending[j];if (pending == 0) break;}if (tio_debug) printf("I/O READ All threads finshed\n");/* Run the list of clients again to process the new buffers. */while(listLength(server.clients_pending_read)) {ln = listFirst(server.clients_pending_read);client *c = listNodeValue(ln);c->flags &= ~CLIENT_PENDING_READ;listDelNode(server.clients_pending_read,ln);if (c->flags & CLIENT_PENDING_COMMAND) {c->flags &= ~CLIENT_PENDING_COMMAND;if (processCommandAndResetClient(c) == C_ERR) {/* If the client is no longer valid, we avoid* processing the client later. So we just go* to the next. */continue;}}processInputBuffer(c);}return processed;
    }
    
  4. IO 线程处理读写的核心在其初始化时设置的 IOThreadMain() 函数,其处理主要分为以下几个步骤

    1. 开启空循环扫描 io_threads_pending 数组,如果找到属于当前线程的那个下标在数组中的值不为 0 则跳出扫描
    2. 再次检查当前线程待处理客户端的数量,如果为 0 ,则当前线程停止运行
    3. io_threads_list 列表数组中取出当前线程待处理的 client 的列表,根据 io_threads_op 全局标志位决定对这些 client 做对应的处理,比如 IO_THREADS_OP_READ 读操作则调用 readQueryFromClient() 函数继续处理
    4. 处理完毕后,清空 io_threads_list 列表数组中当前线程待处理的 client 的列表,并将 io_threads_pending 对应下标值置为 0,主线程利用该数组即可知道 IO 线程是否执行完所有读写任务
    void *IOThreadMain(void *myid) {/* The ID is the thread number (from 0 to server.iothreads_num-1), and is* used by the thread to just manipulate a single sub-array of clients. */long id = (unsigned long)myid;char thdname[16];snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);redis_set_thread_title(thdname);redisSetCpuAffinity(server.server_cpulist);while(1) {/* Wait for start */for (int j = 0; j < 1000000; j++) {if (io_threads_pending[id] != 0) break;}/* Give the main thread a chance to stop this thread. */if (io_threads_pending[id] == 0) {pthread_mutex_lock(&io_threads_mutex[id]);pthread_mutex_unlock(&io_threads_mutex[id]);continue;}serverAssert(io_threads_pending[id] != 0);if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));/* Process: note that the main thread will never touch our list* before we drop the pending count to 0. */listIter li;listNode *ln;listRewind(io_threads_list[id],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);if (io_threads_op == IO_THREADS_OP_WRITE) {writeToClient(c,0);} else if (io_threads_op == IO_THREADS_OP_READ) {readQueryFromClient(c->conn);} else {serverPanic("io_threads_op value is unknown");}}listEmpty(io_threads_list[id]);io_threads_pending[id] = 0;if (tio_debug) printf("[%ld] Done\n", id);}
    }
    
  5. 可以看到处理客户端命令的函数 networking.c#readQueryFromClient()再次被调用,不过这次客户端不会再被 networking.c#postponeClientRead() 函数入队,因为客户端在第一次入队的时候 flags 标志位就已经变为了 CLIENT_PENDING_READ,故 readQueryFromClient() 函数可继续向下执行,直到执行networking.c#processInputBuffer()函数

    这个函数在上一篇文章中已经分析过,此处不再赘述,只是需要注意当 IO 线程将客户端的命令解析完毕后不会立即执行,因为客户端的 flags 是 CLIENT_PENDING_READ,此处的处理只是将客户端的 flags 标志位更新为 CLIENT_PENDING_COMMAND 并返回,命令的执行由主线程完成,也就是本节步骤3的最后一个处理动作

    void processInputBuffer(client *c) {/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {......if (c->reqtype == PROTO_REQ_INLINE) {if (processInlineBuffer(c) != C_OK) break;/* If the Gopher mode and we got zero or one argument, process* the request in Gopher mode. */if (server.gopher_enabled &&((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||c->argc == 0)){processGopherRequest(c);resetClient(c);c->flags |= CLIENT_CLOSE_AFTER_REPLY;break;}} else if (c->reqtype == PROTO_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != C_OK) break;} else {serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* If we are in the context of an I/O thread, we can't really* execute the command here. All we can do is to flag the client* as one that needs to process the command. */if (c->flags & CLIENT_PENDING_READ) {c->flags |= CLIENT_PENDING_COMMAND;break;}/* We are finally ready to execute the command. */if (processCommandAndResetClient(c) == C_ERR) {/* If the client is no longer valid, we avoid exiting this* loop and trimming the client buffer later. So we return* ASAP in that case. */return;}}}/* Trim to pos */if (c->qb_pos) {sdsrange(c->querybuf,c->qb_pos,-1);c->qb_pos = 0;}
    }