<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-apis -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
package com.lhy.demo.rocketMqGRPC;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>
* MyProducer
* </p>
*
* @author ocean
* @version 1.0.0
* @since 2023/11/27 15:33
*/
@Slf4j
@Configuration
public class MyProducerConfig {
private static final String ACCESS_KEY = "yourAccessKey";
private static final String SECRET_KEY = "yourSecretKey";
private static final String ENDPOINTS = "127.0.0.1:9875";
private static volatile Producer PRODUCER;
private static volatile Producer TRANSACTIONAL_PRODUCER;
@Bean(name = "producer")
public Producer producer() throws ClientException {
return getInstance("normalTopic");
}
// @Bean(name = "transactionalProducer")
// public Producer transactionalProducer() throws ClientException {
// return getTransactionalInstance(null, "");
// }
private Producer buildProducer(TransactionChecker checker, String... topics) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 凭据提供程序对于客户端配置是可选的。只有在启用服务器ACL时,此参数才是必需的。否则默认情况下不需要进行设置。
//SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(ENDPOINTS)
// 在某些Windows平台上,您可能会遇到SSL兼容性问题。尝试关闭中的SSL选项如果SSL不是必需的,请使用客户端配置来解决问题。
.enableSsl(false)
// .setCredentialProvider(sessionCredentialsProvider)
.build();
final ProducerBuilder builder = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
//设置主题名称,这是可选的,但建议使用。它使得生产者可以在消息发布之前预取主题路由。
.setTopics(topics);
if (checker != null) {
// 设置事务检查器.
builder.setTransactionChecker(checker);
}
return builder.build();
}
private Producer getInstance(String... topics) throws ClientException {
if (null == PRODUCER) {
synchronized (MyProducerConfig.class) {
if (null == PRODUCER) {
PRODUCER = buildProducer(null, topics);
}
}
}
return PRODUCER;
}
private Producer getTransactionalInstance(TransactionChecker checker, String... topics) throws ClientException {
if (null == TRANSACTIONAL_PRODUCER) {
synchronized (MyProducerConfig.class) {
if (null == TRANSACTIONAL_PRODUCER) {
TRANSACTIONAL_PRODUCER = buildProducer(checker, topics);
}
}
}
return TRANSACTIONAL_PRODUCER;
}
}
package com.lhy.demo.rocketMqGRPC;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
* <p>
* MyProducerController
* </p>
*
* @author ocean
* @version 1.0.0
* @since 2023/11/27 16:21
*/
@RestController
@Slf4j
public class MyProducerController {
@Resource
private Producer producer;
@GetMapping("sendNormalMsg/{messageBody}")
public String sendNormalMsg(@PathVariable("messageBody") String messageBody) {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "normalTopic";
byte[] body = messageBody.getBytes(StandardCharsets.UTF_8);
String key = "key1";
String tag = "tag1";
final Message message = provider.newMessageBuilder()
// 消息主题.
.setTopic(topic)
// 消息标签.
.setTag(tag)
// 消息的密钥,除了消息id之外的另一种标记消息的方式.
.setKeys(key)
.setBody(body)
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
}
// 当你不再需要的时候,关闭制作人。您可以手动关闭它,也可以将其添加到JVM关闭挂钩中。
// producer.close();
return "OK";
}
}
?
package com.lhy.demo.rocketMqGRPC;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Collections;
/**
* <p>
* MyPushConsumerConfig
* </p>
*
* @author ocean
* @version 1.0.0
* @since 2023/11/27 15:33
*/
@Slf4j
@Configuration
public class MyPushConsumerConfig {
private static final String ACCESS_KEY = "accessKey";
private static final String SECRET_KEY = "secretKey";
private static final String ENDPOINTS = "127.0.0.1:9875";
@Bean(name = "normalTopicPushConsumer")
public PushConsumer normalTopicPushConsumer() throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 凭据提供程序对于客户端配置是可选的。只有在启用服务器ACL时,此参数才是必需的。否则默认情况下不需要进行设置。
//SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(ENDPOINTS)
// On some Windows platforms, you may encounter SSL compatibility issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not essential.
.enableSsl(false)
// .setCredentialProvider(sessionCredentialsProvider)
.build();
String tag = "tag1";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "consumerGroup1";
String topic = "normalTopic";
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// Handle the received message and return consume result.
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
return pushConsumer;
}
}
?
使用 gRPC 协议时务端至少升级到5.0版本,并启用 gRPC Proxy 才可兼容。
### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/proxy.log
The broker[broker-a,192.169.1.2:10911] boot success...
修改rocketmq5.1.4/conf/rmq-proxy.json文件
{
"grpcServerPort": 9875,
"rocketMQClusterName": "DefaultCluster"
}
?
?