您现在的位置是:主页 > news > vue怎么做网站/怎么推广自己的网站?
vue怎么做网站/怎么推广自己的网站?
admin2025/5/5 0:29:08【news】
简介vue怎么做网站,怎么推广自己的网站?,沈阳祥云男科,武汉网站搭建1、概述 网络轮询机制就是Go语言在运行的时候用来处理I/O操作的关键组件,它使用了操作系统提供的 I/O 多路复用机制增强程序的并发处理能力,他不仅仅只是用于监控网络I/O,还能用于监控文件的I/O。 关于epoll:https://blog.csdn.…
1、概述
网络轮询机制就是Go语言在运行的时候用来处理I/O操作的关键组件,它使用了操作系统提供的 I/O 多路复用机制增强程序的并发处理能力,他不仅仅只是用于监控网络I/O,还能用于监控文件的I/O。
关于epoll:https://blog.csdn.net/raoxiaoya/article/details/106185479
相关文件
net/sock_posix.go
net/fd_unix.go
net/fd_posix.go
internal/poll/fd_unix.go
internal/poll/fd_poll_runtime.go
runtime/netpoll.go
runtime/netpoll_epoll.go
runtime/proc.go
2、epoll初始化的流程
// net/sock_posix.go
netFD.init()// net/fd_unix.go
netfd.pfd.Init(netfd.net, true)// internal/poll/fd_unix.go
pfd.pd.init(pfd)// internal/poll/fd_poll_runtime.go
// 连接到 runtime/netpoll.go
func (pd *pollDesc) init(fd *FD) error {// 调用runtime// 如果epoll没有初始化,那就执行初始化操作,使用sync.Once确保只执行一次// 如果你在程序中启动了多个监听服务,那么只需要将listen_fd加入此epoll即可serverInit.Do(runtime_pollServerInit)// Sysfd为系统的fd// 构建pd,并返回// 将fd加入到epollctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))if errno != 0 {return errnoErr(syscall.Errno(errno))}// 指向 runtime 层的 pollDesc// pollDesc 是 netpoll 中最重要的结构,net.netFD 和 poll.FD 都是对 pollDesc 的包装pd.runtimeCtx = ctxreturn nil
}事件的结构
type epollevent struct {events uint32 // 事件类型 _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET 等data [8]byte // 额外数据
}一个不常见的新事件 EPOLLRDHUP,这个事件是在较新的内核版本添加的,目的是解决对端socket关闭,epoll本身并不能直接
感知到这个关闭动作的问题。
epoll、kqueue、solaries
等I/O多路复用模块都要实现以下五个函数,这五个函数构成一个虚拟的接口, runtime/netpoll.go
是一个代理,针对不同的平台会去加载不同的实现,比如以Linux下的epoll为例,会去调用runtime/netpoll_epoll.go
下的方法。
func netpollinit()
初始化网络轮询器,通过 sync.Once 和 netpollInited 变量保证函数只会调用一次;func netpollopen(fd uintptr, pd *pollDesc) int32
将 fd 加入到 epfd,pd 放入 event.data。
使用边缘触发(ET)模式func netpoll(delta int64) gList
启动网络轮询器
delta 参数为设置超时时间,delta < 0 ,则无限阻塞;delta == 0,则不会阻塞,delta > 0 ,则阻塞指定的纳秒。返回值
为已经就绪的goroutine列表。func netpollBreak()
可以中断epoll_wait调用,如果它处于阻塞状态的话。
计时器向前修改时间后会通过该函数中断网络轮询器。func netpollIsPollDescriptor(fd uintptr) bool
判断文件描述符是否可以被轮询器使用。func netpollIsPollDescriptor(fd uintptr) bool {return fd == uintptr(epfd) || fd == netpollBreakRd || fd == netpollBreakWr
}
3、关于 pollDesc
// internal/poll/fd_poll_runtime.go
// poll.pollDesc
type pollDesc struct {runtimeCtx uintptr
}// poll.pollDesc 指向一个 runtime.pollDesc// runtime/netpoll.go
// runtime.pollDesc
type pollDesc struct {link *pollDesc // in pollcache, protected by pollcache.lockfd uintptr // constant for pollDesc usage lifetimeatomicInfo atomic.Uint32 // atomic pollInforg atomic.Uintptr // pdReady, pdWait, G waiting for read or nilwg atomic.Uintptr // pdReady, pdWait, G waiting for write or nillock mutex // protects the following fieldsclosing booluser uint32 // user settable cookierseq uintptr // protects from stale read timersrt timer // read deadline timer (set if rt.f != nil)rd int64 // read deadline (a nanotime in the future, -1 when expired)wseq uintptr // protects from stale write timerswt timer // write deadline timerwd int64 // write deadline (a nanotime in the future, -1 when expired)self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
pollDesc由pollcache.alloc()产生pollDesc中有两个二进制的信号量,rg, wg,分别用来阻塞读G和写G,如果fd未就绪的话。
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil
rg和wg为原子的操作,可能的值有
4、零值 nil, 02、pdWait uintptr = 2
这是一个临时状态,一个G来读或写这个未就绪的fd时,那么rg/wg先被置为pdWait,之后有三种可能:
a、此G成功的commit,将rg/wg设置为G的指针,此时可以说,此G已经成功的park在了这个fd上。
b、并行的其他线程,在commit之前,有可能将rg/wg置为了pdReady,G的park失败。
c、并行的其他线程,在commit之前,有超时/关闭的操作,于是rg/wg被置为0,G的park失败。3、G pointer
此G已经成功的park在了这个fd上。
之后,如果有就绪通知,那么rg/wg会被置为pdReady,如果有timeout/close,那么rg/wg会被置为0,此时这个G就会被唤醒。1、pdReady uintptr = 1
表示此pd的 i/o 已经就绪,或者 timeout/closed。
于是对应的G被唤醒,来进行读或写操作,完成之后,rg/wg的状态从pdReady切换到0,表示消费了这个通知,将状态Reset为0。顺序为:0 --> pdWait --> G pointer --> pdReady --> 0
EINTR
在执行系统调用的过程中,可能会发生了信号中断,中断了的系统调用是没有完成的调用,它的失败是临时性的,如果再次调用则
可能成功,这并不是真正的失败,所以要对这种情况进行处理。EAGAIN
在设置了nonblock以非阻塞的模式进行读写的时候,此时一般使用忙轮训的方式不停的读或写,在第一次调用未就绪的fd时,或
者在发送数据的时候发送了一部分而缓冲区就满了,那么内核就会返回 EAGAIN 错误,此时需要做一定的处理,比如sleep一段
时间再来操作。但是对于读和写,后续的流程有点差异,对于读操作,只需要接着读就行;对于写操作,需要记录写了多少还剩多
少,将剩下的接着写。
4、以读操作为例
// net/fd_posix.go
func (fd *netFD) Read(p []byte) (n int, err error) {n, err = fd.pfd.Read(p)runtime.KeepAlive(fd)return n, wrapSyscallError(readSyscallName, err)
}
// internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {if err := fd.readLock(); err != nil {return 0, err}defer fd.readUnlock()if len(p) == 0 {return 0, nil}// 调用 runtime_pollReset,先检查pd的状态,再将 rg 置0if err := fd.pd.prepareRead(fd.isFile); err != nil {return 0, err}if fd.IsStream && len(p) > maxRW {p = p[:maxRW]}for {// 执行读操作,如果错误码为EINTR,则忽略,继续读;如果非EINTR则返回。n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)if err != nil {n = 0// 如果错误码为 EAGAIN,则准备阻塞if err == syscall.EAGAIN && fd.pd.pollable() {// 调用runtime_pollWait实现阻塞以及rg的状态变化if err = fd.pd.waitRead(fd.isFile); err == nil {continue}}}err = fd.eofError(n, err)return n, err}
}
func ignoringEINTRIO(fn func(fd int, p []byte) (int, error), fd int, p []byte) (int, error) {// 忙轮训for {n, err := fn(fd, p)if err != syscall.EINTR {return n, err}}
}// runtime/netpoll.go// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {errcode := netpollcheckerr(pd, int32(mode))if errcode != pollNoError {return errcode}// As for now only Solaris, illumos, and AIX use level-triggered IO.if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {netpollarm(pd, mode)}for !netpollblock(pd, int32(mode), false) {errcode = netpollcheckerr(pd, int32(mode))if errcode != pollNoError {return errcode}// Can happen if timeout has fired and unblocked us,// but before we had a chance to run, timeout has been reset.// Pretend it has not happened and retry.}return pollNoError
}// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {// gpp = rg/wggpp := &pd.rgif mode == 'w' {gpp = &pd.wg}// set the gpp semaphore to pdWaitfor {// Consume notification if already ready.// 如果 gpp 已经是 pdReady,表明已经就绪,将值设置为0,同时结束阻塞,正常消费if gpp.CompareAndSwap(pdReady, 0) {return true}// 如果成功的从0 设置为pdWait,说明处于未就绪状态if gpp.CompareAndSwap(0, pdWait) {break}// Double check that this isn't corrupt; otherwise we'd loop// forever.if v := gpp.Load(); v != pdReady && v != 0 {throw("runtime: double wait")}}// 以下是 gpp = pdWait// pdWait 这个状态不太稳定,会存在并行的其他程序来修改,比如netpoll和deadline定时器// 因此需要再netpollcheckerr一次,以确保效率,避免不必要的挂起// need to recheck error states after setting gpp to pdWait// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wgif waitio || netpollcheckerr(pd, mode) == pollNoError {// 将当前G置为waiting状态,然后调用netpollblockcommit// 如果 netpollblockcommit 返回 false,那么G的阻塞失败,也就是说还是处于激活状态。gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)}// G 被唤醒了,并且在别处有程序将 gpp 设置为了 pdReady// be careful to not lose concurrent pdReady notification// 消费掉old := gpp.Swap(0)if old > pdWait {throw("runtime: corrupted polldesc")}return old == pdReady
}func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {// 将 gpp 的值由 pdWait 换成 G pointerr := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))if r {// Bump the count of goroutines waiting for the poller.// The scheduler uses this to decide whether to block// waiting for the poller if there is nothing else to do.atomic.Xadd(&netpollWaiters, 1)}return r
}// 检查pd的异常
func netpollcheckerr(pd *pollDesc, mode int32) int {info := pd.info()// 是否已关闭if info.closing() {return pollErrClosing}// 是否已超时if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {return pollErrTimeout}// Report an event scanning error only on a read event.// An error on a write event will be captured in a subsequent// write call that is able to report a more specific error.if mode == 'r' && info.eventErr() {return pollErrNotPollable}return pollNoError
}
5、accept过程
// net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error)
获取到连接之后调用 netFD.init() 来初始化pd,也就是重复上面的流程。// internal/poll/fd_unix.go
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {if err := fd.readLock(); err != nil {return -1, nil, "", err}defer fd.readUnlock()if err := fd.pd.prepareRead(fd.isFile); err != nil {return -1, nil, "", err}for {// 发起 accept 系统调用// 将得到的fd设置 closeOnExec 以及 nonblocks, rsa, errcall, err := accept(fd.Sysfd)if err == nil {return s, rsa, "", err}switch err {case syscall.EINTR:continuecase syscall.EAGAIN:if fd.pd.pollable() {// 挂起,直到可读就会被唤醒if err = fd.pd.waitRead(fd.isFile); err == nil {continue}}case syscall.ECONNABORTED:// This means that a socket on the listen// queue was closed before we Accept()ed it;// it's a silly error, so try again.continue}return -1, nil, errcall, err}
}
通过上面的分析,我们发现 read, write, accept 都是发生在设置了 nonblock 的fd上,也就是都是非阻塞的操作,如果未就绪,那就由运行时调度器来将G挂起,并且让此G脱离当前的M,M就可以去执行其他任务,总结一下就是,nonblock 使得M不会因系统调用而阻塞,同时gopack又使G挂起在运行时看起来像阻塞操作,这是精髓所在。
6、启动轮训 netpoll
前面的代码中并没有看到调用netpoll_epoll.go
中的netpoll
方法,这是执行epoll_wait
的方法,那是因为,这是由运行时来调用的,在runtime/proc.go
文件中,runtime在执行findrunnable, startTheWorldWithSema, sysmon, pollWork
函数时,都会调用netpoll
方法,epoll_wait
的返回值为整型,并且以引用的方式返回了已经就绪的epollevent
数组,epollevent结构中包含了事件类型以及data,此data指向了pollDesc。而再前面的过程,我们已经将rg/wg
指向了G pointer
,此后我们就知道该唤醒哪些G啦。被唤醒的G的状态会被修改为Grunnable,接下来将就绪的G加入到调度队列中,等待调度运行。
func netpoll(delay int64) gList
func netpollready(toRun *gList, pd *pollDesc, mode int32)
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g
7、关于 netpollBreak
在netpoll初始化的时候
// runtime/netpoll_epoll.go
func netpollinit() {epfd = epollcreate1(_EPOLL_CLOEXEC)if epfd < 0 {epfd = epollcreate(1024)if epfd < 0 {println("runtime: epollcreate failed with", -epfd)throw("runtime: netpollinit failed")}closeonexec(epfd)}// 创建一个管道r, w, errno := nonblockingPipe()if errno != 0 {println("runtime: pipe failed with", -errno)throw("runtime: pipe failed")}ev := epollevent{events: _EPOLLIN,}*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd// 将读端fd加入了epoll,关注 EPOLLINerrno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)if errno != 0 {println("runtime: epollctl failed with", -errno)throw("runtime: epollctl failed")}netpollBreakRd = uintptr(r) // 读端netpollBreakWr = uintptr(w) // 写端
}
// runtime/nbpipe_pipe2.go
func nonblockingPipe() (r, w int32, errno int32) {r, w, errno = pipe2(_O_NONBLOCK | _O_CLOEXEC)if errno == -_ENOSYS {r, w, errno = pipe()if errno != 0 {return -1, -1, errno}// 设置 closeonexec 以及 nonblockcloseonexec(r)setNonblock(r)closeonexec(w)setNonblock(w)}return r, w, errno
}
pipe系统调用需要打开两个文件,filedes[0]
用来读数据,filedes[1]
用来写数据。
将数据写入管道使用write()函数,管道的长度受到限制,管道满时写入操作会被阻塞,如果设置了非阻塞模式,那么管道满时write()函数返回0。
读取数据使用read()函数,读取的顺序与写入顺序相同。当数据被读取后,这些数据将自动被管道清除。如果读取的管道为空,并且管道写入端口是打开的,read()函数将被阻塞。如果设置了非阻塞模式,那么管道为空read()函数返回0。
管道虽然有2个端口,但同时只有一个端口能被打开,这样避免了同时对管道进行读和写的操作。关闭端口使用的是close()函数,关闭读端口时,在管道上进行写操作的进程将收到SIGPIPE信号。关闭写端口时,进行读操作的read()函数将返回0。
此处pipe的作用是实现netpollBreak
,通过向netpollBreakWr
写入数据,使epollwait
返回,也可以理解为中断epollwait
调用,使其提前于 delay 时间返回。
8、关于读/写超时的设置与触发
超时设置在 I/O 操作中,尤其是网络调用中很关键,网络请求存在很高的不确定因素,我们需要设置一个截止日期保证程序的正常运行。
在 http.Server 中,会为每一个 Accept 得到的 conn 设置 ReadDeadline 和 WriteDeadline。也就是说,此 conn 后续的所有操作加起来不能超过这个时间(包括IO的时间,阻塞的时间,以及调度的时间),否则,读操作会返回eof,写操作会返回对应的err,当然在过程中可以再次设置 Deadline,那么计时器就会重新计时。
func (c *conn) serve(ctx context.Context)
然后调用
// net/fd_posix.go
func (fd *netFD) SetReadDeadline(t time.Time) error {return fd.pfd.SetReadDeadline(t)
}// internal/poll/fd_poll_runtime.go
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)// runtime/netpoll.go
//go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int)
设置
pd.rt timer // read deadline timer (set if rt.f != nil)
pd.rd int64 // read deadline (a nanotime in the future, -1 when expired)
pd.wt timer // write deadline timer
pd.wd int64 // write deadline (a nanotime in the future, -1 when expired)func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool)原理就是,通过为 pd 设置 timer 和 timer.f ,timer由运行时来管理,当时间到了就会执行 timer.f,这个函数的功能是设置fd为
就绪状态pdReady,并直接唤醒对应的G,同时设置 pd 的超时状态 pd.rd = -1, pd.wd = -1,然后调用 pd.publishInfo 保存在
pd.atomicInfo,这个在 netpollcheckerr 中会用到,用来判断pd的错误。
9、关于gopark
网络IO阻塞的时候,G会被当前P的本地队列踢出,集中在netpoll的队列中,
// runtime/proc.go// Puts the current goroutine into a waiting state and calls unlockf on the
// system stack.
//
// If unlockf returns false, the goroutine is resumed.
//
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
//
// Note that because unlockf is called after putting the G into a waiting
// state, the G may have already been readied by the time unlockf is called
// unless there is external synchronization preventing the G from being
// readied. If unlockf returns false, it must guarantee that the G cannot be
// externally readied.
//
// Reason explains why the goroutine has been parked. It is displayed in stack
// traces and heap dumps. Reasons should be unique and descriptive. Do not
// re-use reasons, add new ones.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {if reason != waitReasonSleep {checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy}// 获取当前Mmp := acquirem()// 获取当前Ggp := mp.curgstatus := readgstatus(gp)if status != _Grunning && status != _Gscanrunning {throw("gopark: bad g status")}// 存储在 M 上mp.waitlock = lockmp.waitunlockf = unlockfgp.waitreason = reasonmp.waittraceev = traceEvmp.waittraceskip = traceskipreleasem(mp)// can't do anything that might move the G between Ms here.// 在 M.G0 这个栈上调用 park_m ,而不是在当前G的栈mcall(park_m)
}// park continuation on g0.
// gp 就是G
func park_m(gp *g) {// 获取 G0,_g_ := getg()if trace.enabled {traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)}// 将 G 的状态从 _Grunning 切换到 _Gwaitingcasgstatus(gp, _Grunning, _Gwaiting)// 让G和M脱离关系dropg()// 执行unlockfif fn := _g_.m.waitunlockf; fn != nil {ok := fn(gp, _g_.m.waitlock)_g_.m.waitunlockf = nil_g_.m.waitlock = nilif !ok {if trace.enabled {traceGoUnpark(gp, 2)}// 将 G 的状态从 _Gwaiting 切换到 _Grunnablecasgstatus(gp, _Gwaiting, _Grunnable)// 在当前 M 上继续执行 Gexecute(gp, true) // Schedule it back, never returns.}}// 这里是调度当前M继续执行其他G// 而不是上面执行executeschedule()
}