将locust测试的数据保存为类似jmeter一样的csv文件。
安装locust插件库pip install locust-plugins
引入插件库,使用提供jmeter方法,实现csv文件保存。
# -*- coding:UTF-8 -*-
"""
@ProjectName : pyExamples
@FileName : locust_demo
@Description : 引入插件保存jmeter csv文件的功能。
启动locust时会创建保存的文件,终止locust时才会把数据写入文件。
如果在一次启动和终止locust期间进行多次测试,会把数据存到一个文件中。
@Time : 2024/1/22 下午11:30
@Author : Qredsun
"""
@events.init.add_listener
def on_locust_init(environment, **kwargs):
environment.listener = jmeter.JmeterListener(env=environment, testplan='test_plan')
class UserRun(HttpUser):
# wait_time = between(min_wait=0.1, max_wait=0.2) # 设置task运行间隔
wait_time = constant(1)
@task # 装饰器,说明下面是一个任务
def getuser_(self):
url = 'https://blog.csdn.net/qq_17328759/article/details/135582079' # 接口请求的URL地址
payload = {}
with self.client.get(url, json=payload, catch_response=True) as rsp:
if rsp.status_code == 200:
rsp.success()
else:
rsp.failure(f'接口调用失败:{rsp.json()}')
改造为每次测试结束即保存数据。
# -*- coding:UTF-8 -*-
"""
@ProjectName : pyExamples
@FileName : locust_demo
@Description :
@Time : 2024/1/22 下午11:30
@Author : Qredsun
"""
import os
import socket
import psutil
import time
from multiprocessing import Process
from locust import HttpUser, events, task, between
from locust import constant
from locust.env import Environment
from locust.log import setup_logging
from logging import getLogger
from locust.runners import MasterRunner
from locust.runners import LocalRunner
from locust.runners import WorkerRunner
from locust_plugins.listeners import jmeter
setup_logging("DEBUG")
logger = getLogger(__file__)
@events.init.add_listener
def on_locust_init(environment, **kwargs):
environment.listener = jmeter.JmeterListener(env=environment, testplan='test_plan')
if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):
environment.listener._create_results_log()
environment.listener._write_final_log()
os.remove(environment.listener.results_filename)
environment.events.quitting.remove_listener(environment.listener._write_final_log)
environment.events.worker_report.remove_listener(environment.listener._worker_report)
else:
environment.events.report_to_master.remove_listener(environment.listener._report_to_master)
environment.events.request.remove_listener(environment.listener._request)
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
msg = f'开始测试'
if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):
logger.info(msg)
# 每次测试时创建一个测试数据存放文件
environment.listener.results_filename = time.strftime("test_data_%m_%d_%H_%M_%S.csv")
logger.warning(environment.listener.results_filename)
environment.listener.results_file = environment.listener._create_results_log()
environment.events.worker_report.add_listener(environment.listener._worker_report)
else:
msg += ' -- work node -- '
logger.info(msg)
environment.events.report_to_master.add_listener(environment.listener._report_to_master)
environment.events.request.add_listener(environment.listener._request)
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
msg = f'测试'
if isinstance(environment.runner, WorkerRunner):
msg += ' -- work node -- '
logger.info(msg)
return
if isinstance(environment.runner, LocalRunner):
if environment.runner.state == "stopped":
environment.listener._write_final_log()
logger.info('保存数据至csv文件')
else:
if environment.runner.state == "stopped" and all(
x.state in ["stopped", "ready"] for x in environment.runner.clients.all):
environment.listener._write_final_log()
logger.info('保存数据至csv文件')
logger.info(msg)
class UserRun(HttpUser):
# wait_time = between(min_wait=0.1, max_wait=0.2) # 设置task运行间隔
wait_time = constant(1)# 每个task运行间隔固定间隔1s
@task # 装饰器,说明下面是一个任务
def getuser_(self):
url = 'https://blog.csdn.net/qq_17328759/article/details/135582079' # 接口请求的URL地址
payload = {}
with self.client.get(url, json=payload, catch_response=True) as rsp:
if rsp.status_code == 200:
rsp.success()
else:
rsp.failure(f'接口调用失败:{rsp.json()}')