作者:森元
新业务上线前,我们通常需要对系统的不同中间件进行压测,找到当前配置下中间件承受流量的上限,从而确定上游链路的限流规则,保护系统不因突发流量而崩溃。阿里云 PTS 的 JMeter 压测可以支持用户上传自定义的 JMeter 脚本,按照自定义的逻辑,借助 PTS 强大的分布式压测能力,对系统的不同中间件进行压测。下面,将以 JMeter5.5 和 RocketMQ5.0 系列为例,详细介绍如何使用 PTS 的 JMeter 场景压测 RocketMQ。
JMeter 提供了扩展性极强的 JavaSampler,我们可以通过继承 AbstractJavaSamplerClient 类来自定义在 JavaSampler 中执行的逻辑,从而实现对 RocketMQ 进行压测。
<dependency>
<groupId>org.apache.jmeter</groupId>
<artifactId>ApacheJMeter_java</artifactId>
<version>5.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.5</version>
</dependency>
ApacheJMeter_java 是 JMeter JavaSampler 的依赖,rocketmq-client 是 RocketMQ 的客户端依赖(此处用 4.x 版本是因为 4.x 版本的客户端可以兼容 5.x 版本的服务端实例,但是 5.x 版本的客户端不能兼容 4.x 版本的服务端实例,可根据自己需求调整)。其中,要注意的是 ApacheJMeter_java 依赖的 scope 定义为? provided,JMeter 的 lib/ext 目录下已有该 JAR 包,因此不必将该依赖一起打包。
<build>
<finalName>jmeter-rocketmq4</finalName>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.4.2</version>
<configuration>
<!-- 打包方式 -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
AbstractJavaSamplerClient 类继承了 JavaSamplerClient 接口,该接口包含 setupTest、runTest、teardownTest 和 getDefaultParameters 四个方法:
setupTest
JMeter 将为测试中的每个线程创建一个 JavaSamplerClient 实现实例,测试开始时,将在每个线程的 JavaSamplerClient 实例上调用 setupTest 来初始化客户端,本例中即初始化 RocketMQ 的 producer。
runTest
每个线程每次迭代会调用一次 runTest 方法,本例中,需要在 runTest 方法里面定义消息发送的方法和采样结果的设置逻辑。
teardownTest
迭代完设置的次数或时间后,此方法将会被执行,本例中,需要在此方法关闭 producer。
getDefaultParameters
此方法定义了参数列表,这些参数通过会 JavaSamplerContext 传递给上述方法方法,在此方法内定义的参数,可以在 JMeter JavaRequest Sampler 的 GUI 界面设置值,本例中,需要定义 RocketMQ 的 broker 地址、topic 名称、消息 key、消息内容等参数。
新建子类参考如下:
import java.nio.charset.StandardCharsets;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class JavaSamplerForRocketMQ extends AbstractJavaSamplerClient {
private DefaultMQProducer producer;
private static final String NAME_SRV_ADDRESS = "nameSrvAddress";
private static final String TOPIC = "topic";
private static final String PRODUCER_GROUP = "producer group";
private static final String MSG_BODY = "messageBody";
private static final String MSG_KEY = "messageKey";
private static final String MSG_TAG = "messageTag";
private static final String ERROR_CODE = "500";
@Override
public void setupTest(JavaSamplerContext javaSamplerContext) {
try {
// 初始化producer
producer = new DefaultMQProducer(javaSamplerContext.getParameter(PRODUCER_GROUP));
producer.setNamesrvAddr(javaSamplerContext.getParameter(NAME_SRV_ADDRESS));
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
}
@Override
public SampleResult runTest(JavaSamplerContext javaSamplerContext) {
SampleResult sampleResult = new SampleResult();
sampleResult.setSampleLabel("rocketmq-producer");
// 请求开始
sampleResult.sampleStart();
// 普通消息发送
Message message = new Message(
javaSamplerContext.getParameter(TOPIC),
javaSamplerContext.getParameter(MSG_TAG),
javaSamplerContext.getParameter(MSG_BODY).getBytes()
);
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendResult sendResult = producer.send(message);
// 设置发送请求的字节数
sampleResult.setSentBytes(message.toString().getBytes(StandardCharsets.UTF_8).length);
sampleResult.setDataType(SampleResult.TEXT);
// 设置请求内容
sampleResult.setSamplerData(message.toString());
// 设置响应内容
sampleResult.setResponseData(String.format("Msg Id:%s", sendResult.getMsgId()).getBytes());
sampleResult.setSuccessful(true);
sampleResult.setResponseCodeOK();
} catch (MQBrokerException | InterruptedException | RemotingException | MQClientException e) {
sampleResult.setSuccessful(false);
sampleResult.setResponseCode(ERROR_CODE);
sampleResult.setResponseData(String.format("Error Msg:%s", e).getBytes());
return sampleResult;
} finally {
// 请求结束
sampleResult.sampleEnd();
}
return sampleResult;
}
@Override
public void teardownTest(JavaSamplerContext javaSamplerContext) {
producer.shutdown();
}
@Override
public Arguments getDefaultParameters() {
Arguments arguments = new Arguments();
arguments.addArgument(NAME_SRV_ADDRESS, "");
arguments.addArgument(PRODUCER_GROUP, "");
arguments.addArgument(TOPIC, "");
arguments.addArgument(MSG_KEY, "");
arguments.addArgument(MSG_TAG, "");
arguments.addArgument(MSG_BODY, "");
return arguments;
}
}
通过 mvn clean package 将项目打包,在 target 目录中可见 jmeter-rocketmq4.jar 和 jmeter-rocketmq4-jar-with-dependencies.jar 两个 JAR 包,其中 jmeter-rocketmq4-jar-with-dependencies.jar 包括了所需的依赖,在后续步骤中使用此 JAR 包。
.
├── pom.xml
├── src
│ ├── main
│ │ ├── java
│ │ │ └── JavaSamplerForRocketMQ4.java
│ │ └── resources
│ └── test
│ └── java
└── target
├── jmeter-rocketmq4-jar-with-dependencies.jar
├── jmeter-rocketmq4.jar
将打包好的 JAR 包和依赖的 JAR 包复制到 JMETER_HOME/lib/ext 目录下,然后执行命令 JMETER_HOME/bin/jmeter 打开 JMeter GUI。
新建线程组后添加 Java 请求取样器。
为线程组添加“查看结果树”和“汇总报告”监听器,然后启动测试计划,在结果树和汇总报告中验证测试的结果是否符合预期。
保存该测试计划为 JMX 文件。
a. 进入 PTS 控制台,选择“JMeter 环境”;
b. 输入自定义的环境名;
c. 点击上传文件,选择步骤三中打包的 JAR 包;
d. 点击保存。
a. 自定义场景名;
b. 点击上传文件,选择步骤四中保存的 JMX 文件;
c. 在“使用依赖环境?”下拉框中选择“是,使用依赖环境”;
d. 在“选择依赖环境”下拉框选择刚刚创建的 JMeter 环境。
小建议:由于我们是想通过压测找到 RocektMQ 能承受的最大并发请求数,因此建议选择 RPS 模式,这样可以直接衡量 RocektMQ 的承压能力。同时,考虑到公网带宽限制,应该选择阿里云 VPC 内网压测。
a. 选择压力来源为阿里云 VPC 内网,同时选择部署被压测 RocketMQ 的 ECS 所在区域;
b. 设置 ECS 的 VPC、安全组和交换机,注意 VPC 和安全组一定要和 ECS 相同,安全组中要打开响应的端口(在 ECS 控制台设置);
c. 设置压力模式为 RPS 模式;
d. 设置起始 RPS、最大 RPS 和压测时长,本文设置起始 RPS 为 90000,最大 RPS 为 110000,持续 2 分钟。
e. 指定循环一般设置为否,表示执行一次就结束,指定 IP 数会根据设置的 RPS 自动生成。
其余设置请根据需求参考 JMeter 压测_性能测试-阿里云帮助中心 [ 2] 。
保存配置并调试场景,确认和 RocketMQ 的连通,之后可以开始进行压测。
JMeter 的压测报告通用解读可以参考如何查看 JMeter 压测数据、采样日志及施压机性能_性能测试-阿里云帮助中心 [ 3] ,下一节将介绍如何使用 PTS 的压测报告来找到 RocketMQ 的承压能力。
a.?首先点击“成功率(时序)”,然后点击“Edit”,可进入成功率大盘的编辑界面,复制成功率的查询 PromQL:
sum(rate(pts_api_response_total{task_id="$task_id", code=~"200|302"}[5s]))/sum(rate(pts_api_response_total{task_id="$task_id"}[5s]))
b.?然后进入吞吐量大盘的编辑界面,使用成功率的 PromQL 替换虚拟用户数的 PromQL,并更改 Grafana 的相关配置(下图中红框),便可得到展示吞吐量和成功率的面板。
该面板展示的数据统计精度为 1 秒,可得到更精确的数据,在 18:54:05 秒时,成功率开始下降,此时 TPS 为 96561.9。
c. 为了更好的评估 RocketMQ 的性能,我们还可以统计出成功率保持 100% 的时间范围内的平均 TPS,首先找到成功率为 100% 的持续时间,下图中为 47 秒,然后将计算 TPS 的指标的时间范围改成 47s,这样每个点都代表前 47s 的平均 TPS,将鼠标移动到成功率为 100% 的最后一个时间,当前时间的 TPS 值即为成功率为 100% 时间范围内的平均 TPS,即 89357.5。
实验结果显示,对于当前 ECS 配置部署的 RocketMQ,适当调大堆内存可以有效提高 RocketMQ 的性能,当堆内存提高到 24g 时(此事 ECS 内存使用率达到 85.39%),性能没有显著提高;适当提高 sendMessageThreadPoolNums 的值可以提高 RocketMQ 的性能,当 sendMessageThreadPoolNums 超过 16 后,性能没有显著提高,甚至略有下降。用户可以根据实际情况,进行更详细的对比实验,来充分评估所部署的 RocketMQ 承压能力。
本文介绍了使用阿里云 PTS 的 JMeter 场景压测 RocketMQ 的详细步骤,对各环节逐一进行了说明,最后,通过对压测报告的自定义分析,展现了 PTS 强大的压测结果分析能力,借助 JMeter 和 PTS,用户可以对各类中间件进行灵活多维的分析,助力其构建起稳定健壮的系统。
最新活动&免费试用
相关链接:
[1]?JMeter 环境管理的查看、修改及创建_性能测试-阿里云帮助中心
https://help.aliyun.com/document_detail/170857.html?spm=a2c4g.103173.0.0.292c20f8wnWyCV
[2]?JMeter 压测_性能测试-阿里云帮助中心
https://help.aliyun.com/document_detail/97876.html?spm=a2c4g.91788.0.0.2fde6f338aHIDI
[3]?如何查看 JMeter 压测数据、采样日志及施压机性能_性能测试-阿里云帮助中心
https://help.aliyun.com/document_detail/127454.html?spm=a2c4g.94066.0.0.4a5164bepHmzWD