1.在application.propertie新增如下配置:
fusioninsight.kafka.bootstrap-servers= ${KAFKA_URL:10.7.212.111:9200}
fusioninsight.kafka.security.protocol= SASL_PLAINTEXT
fusioninsight.kafka.kerberos.domain.name= hadoop.hadoop.com
fusioninsight.kafka.sasl.kerberos.service.name= kafka
kerberos.jaas=${JAAS_PATH:E:\demo\huawei\\jaas.conf}
kerberos.krb5=${KRB5_PATH:E:\demo\huawei\\krb5.conf}
2.新增kafka配置
@Configuration
@Slf4j
public class PlatformConsumerConfig {
@Bean(name = "authKafkaContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
ContainerProperties properties = factory.getContainerProperties();
properties.setMissingTopicsFatal(false);
properties.setPollTimeout(1500);
//设置kafka监听工厂禁止自启动
factory.setAutoStartup(false);
factory.setBatchListener(true);
return factory;
}
@Bean
public PlatformListener platformListener() {
return new PlatformListener();
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Value("${fusioninsight.kafka.bootstrap-servers}")
public String boostrapServers;
@Value("${fusioninsight.kafka.security.protocol}")
public String securityProtocol;
@Value("${fusioninsight.kafka.kerberos.domain.name}")
public String kerberosDomainName;
@Value("${fusioninsight.kafka.sasl.kerberos.service.name}")
public String kerberosServiceName;
@Value("${kerberos.krb5}")
private String kerberoskrb5;
@Value("${kerberos.jaas}")
private String kerberosJaas;
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
configs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
configs.put("sasl.kerberos.service.name", kerberosServiceName);
configs.put("kerberos.domain.name", kerberosDomainName);
return new KafkaAdmin(configs);
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put("security.protocol", securityProtocol);
configs.put("kerberos.domain.name", kerberosDomainName);
configs.put("bootstrap.servers", boostrapServers);
configs.put("sasl.kerberos.service.name", kerberosServiceName);
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> configs = new HashMap<>();
configs.put("sasl.kerberos.service.name", kerberosServiceName);
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("security.protocol", securityProtocol);
configs.put("kerberos.domain.name", kerberosDomainName);
configs.put("bootstrap.servers", boostrapServers);
// 这里开始认证 使用自己配置的文件路径
System.setProperty("java.security.auth.login.config", kerberosJaas);
System.setProperty("java.security.krb5.conf", kerberoskrb5);
log.info("---kerberos on kafka use default--"+ "jaas:" + System.getProperty("java.security.auth.login.config") + " krb5:" + System.getProperty("java.security.krb5.conf"));
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);
return new KafkaTemplate<>(producerFactory);
}
}
3.使用
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
dataMessageVO.setAccountList(accountList); messageSend2KafkaVo.setData(dataMessageVO); messageSend2KafkaVo.setType("account"); log.info("--message to kafka for account:-->" + JSON.toJSONString(messageSend2KafkaVo)); ProducerRecord<String, String> record = new ProducerRecord<String, String>("asset_rep", JSON.toJSONString(messageSend2KafkaVo)); Object o = kafkaTemplate.send(record).get(); log.info("--message to kafka for account result2:--->" + o.toString());