您现在的位置是:主页 > news > 广州网站开发定制公司/seo网络培训

广州网站开发定制公司/seo网络培训

admin2025/6/6 0:48:48news

简介广州网站开发定制公司,seo网络培训,潍坊网站建设优化排名,佛系汉化组.wordpress com在工作中遇到要对开发的接口做压力测试,以前没有做过开清楚什么压测工具好用,正好接口不是什么复杂的接口,curl -X post "接口地址" --data-binary 二进制认证文件 OK!(表示验证数据是文件类型)既然这样那我就写个脚本好…

广州网站开发定制公司,seo网络培训,潍坊网站建设优化排名,佛系汉化组.wordpress com在工作中遇到要对开发的接口做压力测试,以前没有做过开清楚什么压测工具好用,正好接口不是什么复杂的接口,curl -X post "接口地址" --data-binary 二进制认证文件 OK!(表示验证数据是文件类型)既然这样那我就写个脚本好…

在工作中遇到要对开发的接口做压力测试,以前没有做过开清楚什么压测工具好用,正好接口不是什么复杂的接口,curl -X post "接口地址" --data-binary @二进制认证文件 OK!(@表示验证数据是文件类型)

既然这样那我就写个脚本好了,脚本内容如下:

======================================================================

#!/usr/bin/evn python

#_coding:utf8_

from multiprocessing import Pool,Queue

import time,subprocess,os

class YaCe(object):

def init(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"):

self.api = api

self.binfile = binfile

self.status = status

self.maxpool = maxpool

self.maxrequest = maxrequest

self.qu = qu

def prorequest(self):

for i in range(self.maxrequest):

self.qu.put(i)

print(i)

for i in range(int(self.maxpool)):

self.qu.put(None)

print("None")

def conumers(self,i):

while True:

data = self.qu.get(True)

if data == None:

print("进程%s任务完成..."%i)

break

else:

command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))

if self.status == "success":

logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time

if "CgoyMDAwMDAwMDAw" in command:

print("进程%s__%s..."%(str(i),str(data)))

with open(logfile,"a") as f:

f.write(command+"\n")

f.close()

else:

print("进程%s__%s..."%(str(i),str(data)))

with open(logfile,"a") as f:

f.write("Faild\n")

f.write(command+"\n")

f.close()

else:

logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time

#print("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))

command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))

if "CAES+" in command:

print("进程%s__%s..."%(str(i),str(data)))

info = command.split('\n')[-3:]

info1 = "\n".join(info)

with open(logfile,"a") as f:

f.write(info1+"\n")

f.close()

else:

print("进程%s__%s..."%(str(i),str(data)))

with open(logfile,"a") as f:

f.write("Faild\n")

f.write(command+"\n")

f.close()

def multirun(self):

ps = int(int(self.maxpool) - 1)

p = Pool(ps)

for i in range(self.maxpool):

print("开启子进程%s"%i)

p.apply_async(self.conumers,args=(self,i))

print('等待所有添加的进程运行完毕。。。')

p.close()

p.join()

endtime = time.strftime("%Y%m%d_%X",time.localtime())

if self.status == "success":

logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time

else:

logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time

with open(logfile,"a") as f:

f.write("============[%s]============\n"%endtime)

f.close()

print('End!!,PID:%s'% os.getpid())

if name == "main":

q = Queue()

Yc = YaCe('压测接口','二进制证认文件',开多少个进程,queue(队列),maxrequest=100(模拟测试多少次访问),status="faild"(这里因为测试的两个接口,返回不一样用status参数区分测试的接口的返回值处理))

Yc.prorequest()

print("++++++")

global date_time

datetime = time.strftime("%Y%m%d%X",time.localtime())

Yc.multirun()

====================================================================================

问题

到这里写完了,测试的问题来了,从脚本来看如果运行成功,会有多进程在处理队列的输出,可是结果的输出确是如下:

01

2

3

4

5

6

7

8

9

None

None

++++++

开启子进程0

开启子进程1

等待所有添加的进程运行完毕。。。

End!!,PID:4819

原因

子进程conumers方法完全没有运行,也没有报错这就尴尬了;查了大量的文档资料;发现这个pool方法都使用了queue.Queue将task传递给工作进程。multiprocessing必须将数据序列化以在进程间传递。方法只有在模块的顶层时才能被序列化,跟类绑定的方法不能被序列化,就会出现上面的异常 ; 那肿么办,我不是一个轻易放弃的人,终于被我找到了方法;

注意

解决方作者是在python3下测试了,python2下用脚本的subprocess要换成value,command = commands.getstatusoutput

解决方法1(亲测)

1.首先要看报错,需要对脚本修改如下:

YaCe类下的multirun方法下修改

for i in range(self.maxpool):

print("开启子进程%s"%i)

p.apply_async(self.conumers,args=(self,i))

for i in range(self.maxpool):

print("开启子进程%s"%i)

res = p.apply_async(self.conumers,args=(self,i))

print(res.get)

这就可以看到报错:

cPickle.PicklingError: Can't pickle : attribute lookup builtin.instancemethod failed

2.解决方法如下在脚本中加一个新的函数

(1).def conumers_wrapper(cls_instance,i):

return cls_instance.conumers(i)

(2).修改YaCe下multirun方法

for i in range(self.maxpool):

print("开启子进程%s"%i)

res = p.apply_async(self.conumers,args=(self,i))

print(res.get())

for i in range(self.maxpool):

print("开启子进程%s"%i)

res = p.apply_async(conumers_wrapper,args=(self,i))

print(res.get)

问题解决了,运行一下脚本结果还有报错:

RuntimeError: Queue objects should only be shared between processes through inheritance

原因

这里不可以用Queue,要改用Manager.Queue;因为进程之前的同共离用Queue会用问题;

完结

最终代码如下:

==================================================================================

#!/usr/bin/evn python

#_coding:utf8_

from multiprocessing import Pool,Queue,Manager

import time,subprocess,os

class YaCe(object):

def init(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"):

self.api = api

self.binfile = binfile

self.status = status

self.maxpool = maxpool

self.maxrequest = maxrequest

self.qu = qu

def prorequest(self):

for i in range(self.maxrequest):

self.qu.put(i)

print(i)

for i in range(int(self.maxpool)):

self.qu.put(None)

print("None")

def conumers(self,i):

while True:

data = self.qu.get(True)

if data == None:

print("进程%s任务完成..."%i)

break

else:

#print("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))

command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))

#command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))

if self.status == "success":

logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time

if "CgoyMDAwMDAwMDAw" in command:

print("进程%s__%s..."%(str(i),str(data)))

with open(logfile,"a") as f:

f.write(command+"\n")

f.close()

else:

with open(logfile,"a") as f:

f.write("Faild\n")

f.write(command+"\n")

f.close()

else:

logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time

#print("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))

command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))

#command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))

if "CAES+" in command:

print("进程%s__%s..."%(str(i),str(data)))

info = command.split('\n')[-3:]

info1 = "\n".join(info)

with open(logfile,"a") as f:

f.write(info1+"\n")

f.close()

else:

print("进程%s__%s..."%(str(i),str(data)))

with open(logfile,"a") as f:

f.write("Faild\n")

f.write(command+"\n")

f.close()

def multirun(self):

ps = int(int(self.maxpool) - 1)

p = Pool(ps)

for i in range(self.maxpool):

print("开启子进程%s"%i)

p.apply_async(conumers_wrapper,args=(self,i))

#print(res.get)

print('等待所有添加的进程运行完毕。。。')

p.close()

p.join()

endtime = time.strftime("%Y%m%d_%X",time.localtime())

if self.status == "success":

logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time

else:

logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time

with open(logfile,"a") as f:

f.write("============[%s]============\n"%endtime)

f.close()

print('End!!,PID:%s'% os.getpid())

def conumers_wrapper(cls_instance,i):

return cls_instance.conumers(i)

if name == "main":

q = Manager().Queue()

Yc = YaCe('压测接口','二进制证认文件',开多少个进程,queue(队列),maxrequest=100(模拟测试多少次访问),status="faild"(这里因为测试的两个接口,返回不一样用status参数区分测试的接口的返回值处理))

Yc.prorequest()

print("++++++")

global date_time

datetime = time.strftime("%Y%m%d%X",time.localtime())

Yc.multirun()