locust快速入门--使用locust-plugins保存类似jmeter的csv数据

发布时间:2024年01月22日

背景:

将locust测试的数据保存为类似jmeter一样的csv文件。

实现目标:

  1. 利用locust-plugins的功能,将数据保存为类似jmeter一样的csv文件
  2. 每次结束测试时不需要退出locust程序,就可以将本次测试的数据进行保存

实现方式:

  1. 安装locust插件库pip install locust-plugins

  2. 引入插件库,使用提供jmeter方法,实现csv文件保存。

    # -*- coding:UTF-8 -*-
    
    """
     @ProjectName  : pyExamples 
     @FileName     : locust_demo
     @Description  : 引入插件保存jmeter csv文件的功能。
     								启动locust时会创建保存的文件,终止locust时才会把数据写入文件。
     								如果在一次启动和终止locust期间进行多次测试,会把数据存到一个文件中。
     @Time         : 2024/1/22 下午11:30
     @Author       : Qredsun
     """
    @events.init.add_listener
    def on_locust_init(environment, **kwargs):
        environment.listener = jmeter.JmeterListener(env=environment, testplan='test_plan')
    
    
    class UserRun(HttpUser):
        # wait_time = between(min_wait=0.1, max_wait=0.2)  # 设置task运行间隔
        wait_time = constant(1)
    
        @task  # 装饰器,说明下面是一个任务
        def getuser_(self):
            url = 'https://blog.csdn.net/qq_17328759/article/details/135582079'  # 接口请求的URL地址
            payload = {}
            with  self.client.get(url, json=payload, catch_response=True) as rsp:
                if rsp.status_code == 200:
                    rsp.success()
                else:
                    rsp.failure(f'接口调用失败:{rsp.json()}')
    
  3. 改造为每次测试结束即保存数据。

    1. 利用locust的test_start,对csv文件实现多次创建
    2. 利用locust的test_stop,实现对csv文件数据的写入
    3. 增加了多工作节点的判断,保证对LocalRunner和MasterRunner、WorkerRunner模式的支持
    # -*- coding:UTF-8 -*-
    
    """
     @ProjectName  : pyExamples 
     @FileName     : locust_demo
     @Description  : 
     @Time         : 2024/1/22 下午11:30
     @Author       : Qredsun
     """
    import os
    import socket
    import psutil
    import time
    from multiprocessing import Process
    from locust import HttpUser, events, task, between
    from locust import constant
    from locust.env import Environment
    from locust.log import setup_logging
    from logging import getLogger
    from locust.runners import MasterRunner
    from locust.runners import LocalRunner
    from locust.runners import WorkerRunner
    from locust_plugins.listeners import jmeter
    
    setup_logging("DEBUG")
    logger = getLogger(__file__)
    
    
    @events.init.add_listener
    def on_locust_init(environment, **kwargs):
        environment.listener = jmeter.JmeterListener(env=environment, testplan='test_plan')
        if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):
            environment.listener._create_results_log()
            environment.listener._write_final_log()
            os.remove(environment.listener.results_filename)
            environment.events.quitting.remove_listener(environment.listener._write_final_log)
            environment.events.worker_report.remove_listener(environment.listener._worker_report)
        else:
            environment.events.report_to_master.remove_listener(environment.listener._report_to_master)
    
        environment.events.request.remove_listener(environment.listener._request)
    
    
    @events.test_start.add_listener
    def on_test_start(environment, **kwargs):
        msg = f'开始测试'
    
        if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):
            logger.info(msg)
            # 每次测试时创建一个测试数据存放文件
            environment.listener.results_filename = time.strftime("test_data_%m_%d_%H_%M_%S.csv")
            logger.warning(environment.listener.results_filename)
            environment.listener.results_file = environment.listener._create_results_log()
            environment.events.worker_report.add_listener(environment.listener._worker_report)
        else:
            msg += ' -- work node -- '
            logger.info(msg)
            environment.events.report_to_master.add_listener(environment.listener._report_to_master)
        environment.events.request.add_listener(environment.listener._request)
    
    
    @events.test_stop.add_listener
    def on_test_stop(environment, **kwargs):
        msg = f'测试'
        if isinstance(environment.runner, WorkerRunner):
            msg += ' -- work node -- '
            logger.info(msg)
            return
        if isinstance(environment.runner, LocalRunner):
            if environment.runner.state == "stopped":
                environment.listener._write_final_log()
                logger.info('保存数据至csv文件')
        else:
            if environment.runner.state == "stopped" and all(
                    x.state in ["stopped", "ready"] for x in environment.runner.clients.all):
                environment.listener._write_final_log()
                logger.info('保存数据至csv文件')
        logger.info(msg)
    
    
    class UserRun(HttpUser):
        # wait_time = between(min_wait=0.1, max_wait=0.2)  # 设置task运行间隔
        wait_time = constant(1)# 每个task运行间隔固定间隔1s
    
        @task  # 装饰器,说明下面是一个任务
        def getuser_(self):
            url = 'https://blog.csdn.net/qq_17328759/article/details/135582079'  # 接口请求的URL地址
            payload = {}
            with  self.client.get(url, json=payload, catch_response=True) as rsp:
                if rsp.status_code == 200:
                    rsp.success()
                else:
                    rsp.failure(f'接口调用失败:{rsp.json()}')
    
文章来源:https://blog.csdn.net/qq_17328759/article/details/135738154
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。