您现在的位置是:主页 > news > 学前端什么网站好/百度店面定位怎么申请
学前端什么网站好/百度店面定位怎么申请
admin2025/6/1 5:32:25【news】
简介学前端什么网站好,百度店面定位怎么申请,杭州网页设计招聘,济南做门户网站开发公司文章目录前言1. Redis 多线程模型2. Redis 多线程源码分析2.1 IO 线程的初始化2.2 IO 线程的启动2.3 IO 线程处理读任务的流程前言 Redis 6.0 中一个重大的改变就是引入了多线程IO。我们都知道 Redis 基于内存操作,几乎不存在 CPU 成为瓶颈的情况, 它主…
文章目录
- 前言
- 1. Redis 多线程模型
- 2. Redis 多线程源码分析
- 2.1 IO 线程的初始化
- 2.2 IO 线程的启动
- 2.3 IO 线程处理读任务的流程
前言
Redis 6.0 中一个重大的改变就是引入了多线程IO。我们都知道 Redis 基于内存操作,几乎不存在 CPU 成为瓶颈的情况, 它主要受限于内存和网络。从 Redis 自身角度来说,读写网络的 read/write 系统调用占用了 Redis 执行期间大部分 CPU 时间,瓶颈其实主要在于网络的 IO 消耗。基于这种情况,Redis 优化的方向在于提高网络 IO 性能,而一个简单有效的方法就是使用多线程任务分摊 Redis 同步 IO 读写的负荷
1. Redis 多线程模型
Redis 6.0 版本以前的线程模型为典型的 单 Reactor 单线程 ,但是其多线程模型却与标准的 单 Reactor 多线程 不太相同,区别在于 Redis 多线程模型不是把业务逻辑处理交给子线程,而是把对网络数据的读写交给子线程处理,业务逻辑仍然由主线程完成
2. Redis 多线程源码分析
上图为 Redis 多线程相关的流程图,该图根据 Redis 6.0 源码阅读笔记(1)-Redis 服务端启动及命令执行 示意图适当增减而来,读者如有不理解的地方可以前往查看原文。以下为 Redis 多线程的源码分析
2.1 IO 线程的初始化
-
Redis 服务端启动的时候会有 IO 线程的初始化步骤,其触发函数为
server.c#InitServerLast()
void InitServerLast() {bioInit();initThreadedIO();set_jemalloc_bg_thread(server.jemalloc_bg_thread);server.initial_memory_usage = zmalloc_used_memory(); }
-
networking.c#initThreadedIO()
函数负责初始化 IO 线程的数据结构,是整个 IO 线程初始化的核心,其涉及的重要属性及步骤如下重要属性:
io_threads_active
标志 IO 线程的激活状态,默认为 0 ,也就是非激活态server.io_threads_num
配置文件中配置的 IO 线程数量,为 1 则只有主线程处理 IO ,不需要再创建线程,大于 IO_THREADS_MAX_NUM(128)则认为配置异常,退出程序io_threads_list
数组,数组元素为 list 列表,每个 IO 线程需要处理的 client 都放在数组对应下标的 list 中io_threads_pending
数组,记录每个 IO 线程待处理的 client 数量
处理步骤:
- 调用函数 listCreate() 为每个 IO 线程创建任务列表
- 初始化 IO 线程互斥对象,初始化 IO 线程当前未处理的任务数量为 0
- pthread_create() 函数设置 IO 线程运行时处理的函数为 IOThreadMain()
#define IO_THREADS_MAX_NUM 128 list *io_threads_list[IO_THREADS_MAX_NUM];void initThreadedIO(void) {io_threads_active = 0; /* We start with threads not active. *//* Don't spawn any thread if the user selected a single thread:* we'll handle I/O directly from the main thread. */if (server.io_threads_num == 1) return;if (server.io_threads_num > IO_THREADS_MAX_NUM) {serverLog(LL_WARNING,"Fatal: too many I/O threads configured. ""The maximum number is %d.", IO_THREADS_MAX_NUM);exit(1);}/* Spawn and initialize the I/O threads. */for (int i = 0; i < server.io_threads_num; i++) {/* Things we do for all the threads including the main thread. */io_threads_list[i] = listCreate();if (i == 0) continue; /* Thread 0 is the main thread. *//* Things we do only for the additional threads. */pthread_t tid;pthread_mutex_init(&io_threads_mutex[i],NULL);io_threads_pending[i] = 0;pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");exit(1);}io_threads[i] = tid;} }
2.2 IO 线程的启动
-
即便用户配置了 IO 多线程,Redis 在实际的处理中也不一定就会启用多线程处理网络 IO,而是根据系统状态动态地调整。上文已经提到
io_threads_active
标志 IO 线程的激活状态,默认 IO 线程为未激活的状态,追踪该变量即可知道 IO 线程启动的位置。我们先看看server.c#beforeSleep()
函数, 在 Redis 6.0 源码阅读笔记(1)-Redis 服务端启动及命令执行 一文中已经提到beforeSleep()
函数实际是在每次事件处理之前调用的函数,其内部处理的逻辑比较多,本文关注的主要有以下几个- handleClientsWithPendingReadsUsingThreads() 函数使用 IO 线程处理等待读取数据的客户端
- handleClientsWithPendingWritesUsingThreads() 函数使用 IO 线程处理等待响应的客户端
void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);/* Just call a subset of vital functions in case we are re-entering* the event loop from processEventsWhileBlocked(). Note that in this* case we keep track of the number of events we are processing, since* processEventsWhileBlocked() wants to stop ASAP if there are no longer* events to handle. */if (ProcessingEventsWhileBlocked) {uint64_t processed = 0;processed += handleClientsWithPendingReadsUsingThreads();processed += tlsProcessPendingData();processed += handleClientsWithPendingWrites();processed += freeClientsInAsyncFreeQueue();server.events_processed_while_blocked += processed;return;}/* Handle precise timeouts of blocked clients. */handleBlockedClientsTimeout();/* We should handle pending reads clients ASAP after event loop. */handleClientsWithPendingReadsUsingThreads();/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */tlsProcessPendingData();/* If tls still has pending unread data don't sleep at all. */aeSetDontWait(server.el, tlsHasPendingData());/* Call the Redis Cluster before sleep function. Note that this function* may change the state of Redis Cluster (from ok to fail or vice versa),* so it's a good idea to call it before serving the unblocked clients* later in this function. */if (server.cluster_enabled) clusterBeforeSleep();/* Run a fast expire cycle (the called function will return* ASAP if a fast cycle is not needed). */if (server.active_expire_enabled && server.masterhost == NULL)activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);/* Unblock all the clients blocked for synchronous replication* in WAIT. */if (listLength(server.clients_waiting_acks))processClientsWaitingReplicas();/* Check if there are clients unblocked by modules that implement* blocking commands. */if (moduleCount()) moduleHandleBlockedClients();/* Try to process pending commands for clients that were just unblocked. */if (listLength(server.unblocked_clients))processUnblockedClients();/* Send all the slaves an ACK request if at least one client blocked* during the previous event loop iteration. Note that we do this after* processUnblockedClients(), so if there are multiple pipelined WAITs* and the just unblocked WAIT gets blocked again, we don't have to wait* a server cron cycle in absence of other event loop events. See #6623. */if (server.get_ack_from_slaves) {robj *argv[3];argv[0] = createStringObject("REPLCONF",8);argv[1] = createStringObject("GETACK",6);argv[2] = createStringObject("*",1); /* Not used argument. */replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);decrRefCount(argv[0]);decrRefCount(argv[1]);decrRefCount(argv[2]);server.get_ack_from_slaves = 0;}/* Send the invalidation messages to clients participating to the* client side caching protocol in broadcasting (BCAST) mode. */trackingBroadcastInvalidationMessages();/* Write the AOF buffer on disk */flushAppendOnlyFile(0);/* Handle writes with pending output buffers. */handleClientsWithPendingWritesUsingThreads();/* Close clients that need to be closed asynchronous */freeClientsInAsyncFreeQueue();/* Before we are going to sleep, let the threads access the dataset by* releasing the GIL. Redis main thread will not touch anything at this* time. */if (moduleCount()) moduleReleaseGIL(); }
-
networking.c#handleClientsWithPendingWritesUsingThreads()
函数源码如下,其处理流程较为重要- 首先查看数组
server.clients_pending_write
长度是否为 0,为 0 则没有在等待响应的客户端,不需要使用 IO 线程处理。 Redis 使用了server.clients_pending_write
全局数组来存放等待响应的客户端,另一个数组server.clients_pending_read
则存放等待读取数据的客户端 server.io_threads_num
配置的线程数为 1 或者stopThreadedIOIfNeeded()
函数返回 true,说明用户没有配置多线程 IO 或者系统动态判断当前不需要使用多线程 IO,则直接调用 handleClientsWithPendingWrites() 函数完成客户端响应- 接下来判断
io_threads_active
全局变量不为 1,说明 IO 线程还没有激活,则调用 startThreadedIO() 函数启动 IO 线程 - 将数组
server.clients_pending_write
中存放的待响应客户端按照server.io_threads_num
取余分配到各个 IO 线程的任务列表 io_threads_list[target_id] 中 - 设置
io_threads_op
全局 IO 操作标志为 IO_THREADS_OP_WRITE,则 IO 线程都处理写任务,并更新io_threads_pending
数组。这部分暂不展开,下文详细分析 - 主线程空循环,等待 IO 线程处理任务完毕。这部分逻辑主要靠
io_threads_pending
数组记录每个 IO 线程待处理的 client 数量来判断,如果各个 IO 线程待处理的 client 数量相加为 0,则任务处理完毕,主线程跳出循环 - 最后如果还有待处理的客户端则继续处理,处理完毕清空
server.clients_pending_write
数组
int handleClientsWithPendingWritesUsingThreads(void) {int processed = listLength(server.clients_pending_write);if (processed == 0) return 0; /* Return ASAP if there are no clients. *//* If I/O threads are disabled or we have few clients to serve, don't* use I/O threads, but thejboring synchronous code. */if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {return handleClientsWithPendingWrites();}/* Start threads if needed. */if (!io_threads_active) startThreadedIO();if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);/* Distribute the clients across N different lists. */listIter li;listNode *ln;listRewind(server.clients_pending_write,&li);int item_id = 0;while((ln = listNext(&li))) {client *c = listNodeValue(ln);c->flags &= ~CLIENT_PENDING_WRITE;int target_id = item_id % server.io_threads_num;listAddNodeTail(io_threads_list[target_id],c);item_id++;}/* Give the start condition to the waiting threads, by setting the* start condition atomic var. */io_threads_op = IO_THREADS_OP_WRITE;for (int j = 1; j < server.io_threads_num; j++) {int count = listLength(io_threads_list[j]);io_threads_pending[j] = count;}/* Also use the main thread to process a slice of clients. */listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);writeToClient(c,0);}listEmpty(io_threads_list[0]);/* Wait for all the other threads to end their work. */while(1) {unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)pending += io_threads_pending[j];if (pending == 0) break;}if (tio_debug) printf("I/O WRITE All threads finshed\n");/* Run the list of clients again to install the write handler where* needed. */listRewind(server.clients_pending_write,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);/* Install the write handler if there are pending writes in some* of the clients. */if (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){freeClientAsync(c);}}listEmpty(server.clients_pending_write);return processed; }
- 首先查看数组
-
networking.c#startThreadedIO()
函数较为简单,可以看到其逻辑主要为以下几步:- 首先将创建 IO 线程时锁住的互斥对象解锁,也就是使 IO 线程得以运行
- 将全局变量
io_threads_active
赋值为 1 ,标志 IO 线程已经激活
void startThreadedIO(void) {if (tio_debug) { printf("S"); fflush(stdout); }if (tio_debug) printf("--- STARTING THREADED IO ---\n");serverAssert(io_threads_active == 0);for (int j = 1; j < server.io_threads_num; j++)pthread_mutex_unlock(&io_threads_mutex[j]);io_threads_active = 1; }
2.3 IO 线程处理读任务的流程
-
IO 线程处理网络读取的主要流程在
networking.c#readQueryFromClient()
函数中,该函数在 Redis 6.0 源码阅读笔记(1)-Redis 服务端启动及命令执行 一文中已经提到过,主要负责解析客户端传输过来的命令及参数,其和多线程 IO 相关的部分为调用networking.c#postponeClientRead()
函数将客户端放入等待队列中。需注意如果客户端成功入队,则 readQueryFromClient() 函数不再继续执行,直接 return 了void readQueryFromClient(connection *conn) {client *c = connGetPrivateData(conn);int nread, readlen;size_t qblen;/* Check if we want to read from the client later when exiting from* the event loop. This is the case if threaded I/O is enabled. */if (postponeClientRead(c)) return;...... }
-
networking.c#postponeClientRead()
函数需要判断多种条件才能决定客户端是否能入队,具体如下:io_threads_active
必须为 1 ,也就是 IO 线程必须是激活状态server.io_threads_do_reads
用户配置必须是运行使用 IO 线程读取数据- 客户端的标志位 flags 必须不是
CLIENT_MASTER、CLIENT_SLAVE、CLIENT_PENDING_READ
- 以上条件都满足,则将客户端的 flags 标志位设置为
CLIENT_PENDING_READ
,并将其入队到server.clients_pending_read
数组
int postponeClientRead(client *c) {if (io_threads_active &&server.io_threads_do_reads &&!ProcessingEventsWhileBlocked &&!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))){c->flags |= CLIENT_PENDING_READ;listAddNodeHead(server.clients_pending_read,c);return 1;} else {return 0;} }
-
至此客户端任务已经入队,其处理则由
beforeSleep()
函数调用networking.c#handleClientsWithPendingReadsUsingThreads()
完成,可以看到这里的处理与多线程 IO 写数据响应客户端是类似的- 先判断 IO 线程激活状态和用户配置是否允许使用 IO线程处理读数据操作
- 再判断
server.clients_pending_read
数组中待处理的客户端数量是否为 0 - 取出
server.clients_pending_read
数组中的客户端,取余分配到各个 IO 线程的任务列表io_threads_list[target_id]
- 将全局的线程操作标志
io_threads_op
设置为IO_THREADS_OP_READ
,也就是 IO 线程都处理读任务,并更新io_threads_pending
数组 - 主线程自己也处理 IO 读任务,完成后开启空循环等待 IO 线程处理任务完毕
- 最后再次处理
server.clients_pending_read
数组中的客户端,如果客户端 flags 标志位为CLIENT_PENDING_COMMAND
(也就是 IO 线程把客户端的命令及参数解析完成),则调用 processCommandAndResetClient() 函数直接执行命令,否则调用 processInputBuffer() 函数继续解析客户端的命令
int handleClientsWithPendingReadsUsingThreads(void) {if (!io_threads_active || !server.io_threads_do_reads) return 0;int processed = listLength(server.clients_pending_read);if (processed == 0) return 0;if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);/* Distribute the clients across N different lists. */listIter li;listNode *ln;listRewind(server.clients_pending_read,&li);int item_id = 0;while((ln = listNext(&li))) {client *c = listNodeValue(ln);int target_id = item_id % server.io_threads_num;listAddNodeTail(io_threads_list[target_id],c);item_id++;}/* Give the start condition to the waiting threads, by setting the* start condition atomic var. */io_threads_op = IO_THREADS_OP_READ;for (int j = 1; j < server.io_threads_num; j++) {int count = listLength(io_threads_list[j]);io_threads_pending[j] = count;}/* Also use the main thread to process a slice of clients. */listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);readQueryFromClient(c->conn);}listEmpty(io_threads_list[0]);/* Wait for all the other threads to end their work. */while(1) {unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)pending += io_threads_pending[j];if (pending == 0) break;}if (tio_debug) printf("I/O READ All threads finshed\n");/* Run the list of clients again to process the new buffers. */while(listLength(server.clients_pending_read)) {ln = listFirst(server.clients_pending_read);client *c = listNodeValue(ln);c->flags &= ~CLIENT_PENDING_READ;listDelNode(server.clients_pending_read,ln);if (c->flags & CLIENT_PENDING_COMMAND) {c->flags &= ~CLIENT_PENDING_COMMAND;if (processCommandAndResetClient(c) == C_ERR) {/* If the client is no longer valid, we avoid* processing the client later. So we just go* to the next. */continue;}}processInputBuffer(c);}return processed; }
-
IO 线程处理读写的核心在其初始化时设置的
IOThreadMain()
函数,其处理主要分为以下几个步骤- 开启空循环扫描
io_threads_pending
数组,如果找到属于当前线程的那个下标在数组中的值不为 0 则跳出扫描 - 再次检查当前线程待处理客户端的数量,如果为 0 ,则当前线程停止运行
- 从
io_threads_list
列表数组中取出当前线程待处理的 client 的列表,根据io_threads_op
全局标志位决定对这些 client 做对应的处理,比如IO_THREADS_OP_READ
读操作则调用readQueryFromClient()
函数继续处理 - 处理完毕后,清空
io_threads_list
列表数组中当前线程待处理的 client 的列表,并将io_threads_pending
对应下标值置为 0,主线程利用该数组即可知道 IO 线程是否执行完所有读写任务
void *IOThreadMain(void *myid) {/* The ID is the thread number (from 0 to server.iothreads_num-1), and is* used by the thread to just manipulate a single sub-array of clients. */long id = (unsigned long)myid;char thdname[16];snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);redis_set_thread_title(thdname);redisSetCpuAffinity(server.server_cpulist);while(1) {/* Wait for start */for (int j = 0; j < 1000000; j++) {if (io_threads_pending[id] != 0) break;}/* Give the main thread a chance to stop this thread. */if (io_threads_pending[id] == 0) {pthread_mutex_lock(&io_threads_mutex[id]);pthread_mutex_unlock(&io_threads_mutex[id]);continue;}serverAssert(io_threads_pending[id] != 0);if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));/* Process: note that the main thread will never touch our list* before we drop the pending count to 0. */listIter li;listNode *ln;listRewind(io_threads_list[id],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);if (io_threads_op == IO_THREADS_OP_WRITE) {writeToClient(c,0);} else if (io_threads_op == IO_THREADS_OP_READ) {readQueryFromClient(c->conn);} else {serverPanic("io_threads_op value is unknown");}}listEmpty(io_threads_list[id]);io_threads_pending[id] = 0;if (tio_debug) printf("[%ld] Done\n", id);} }
- 开启空循环扫描
-
可以看到处理客户端命令的函数
networking.c#readQueryFromClient()
再次被调用,不过这次客户端不会再被networking.c#postponeClientRead()
函数入队,因为客户端在第一次入队的时候 flags 标志位就已经变为了 CLIENT_PENDING_READ,故readQueryFromClient()
函数可继续向下执行,直到执行networking.c#processInputBuffer()
函数这个函数在上一篇文章中已经分析过,此处不再赘述,只是需要注意当 IO 线程将客户端的命令解析完毕后不会立即执行,因为客户端的 flags 是
CLIENT_PENDING_READ
,此处的处理只是将客户端的 flags 标志位更新为CLIENT_PENDING_COMMAND
并返回,命令的执行由主线程完成,也就是本节步骤3的最后一个处理动作void processInputBuffer(client *c) {/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {......if (c->reqtype == PROTO_REQ_INLINE) {if (processInlineBuffer(c) != C_OK) break;/* If the Gopher mode and we got zero or one argument, process* the request in Gopher mode. */if (server.gopher_enabled &&((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||c->argc == 0)){processGopherRequest(c);resetClient(c);c->flags |= CLIENT_CLOSE_AFTER_REPLY;break;}} else if (c->reqtype == PROTO_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != C_OK) break;} else {serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* If we are in the context of an I/O thread, we can't really* execute the command here. All we can do is to flag the client* as one that needs to process the command. */if (c->flags & CLIENT_PENDING_READ) {c->flags |= CLIENT_PENDING_COMMAND;break;}/* We are finally ready to execute the command. */if (processCommandAndResetClient(c) == C_ERR) {/* If the client is no longer valid, we avoid exiting this* loop and trimming the client buffer later. So we return* ASAP in that case. */return;}}}/* Trim to pos */if (c->qb_pos) {sdsrange(c->querybuf,c->qb_pos,-1);c->qb_pos = 0;} }