【Spring连载】使用Spring访问 Apache Kafka(十五)----消息头

发布时间:2024年01月23日

【Spring连载】使用Spring访问 Apache Kafka(十五)----消息头Message Headers


Apache Kafka client中提供了对消息中headers的支持。Spring for Apache Kafka现在支持将这些headers映射到spring-messaging MessageHeaders和从MessageHeaders中映射这些headers。
以前的版本将ConsumerRecord和ProducerRecord映射到spring-messaging Message<?>,其中value属性被映射到payload,其他属性(topic, partition等)被映射到headers。当前版本这种情况仍然存在,但现在可以映射其他(任意)headers。
Apache Kafka headers 有一个简单的API,如下接口定义所示:

public interface Header {

    String key();

    byte[] value();

}

KafkaHeaderMapper策略用于在Kafka Headers和MessageHeaders之间映射header条目。其接口定义如下:

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

SimpleKafkaHeaderMapper将原始headers映射为byte[],并提供用于转换为String值的配置选项。
DefaultKafkaHeaderMapper将键(key)映射为MessageHeaders header名称,并且为了支持出站(outbound)消息的丰富header类型,执行JSON转换。一个“特殊”头(key为spring_json_header_types)包含一个:的json映射。此header用于入站(inbound)端,以提供每个header值到原始类型的适当转换。
在入站(inbound)端,所有Kafka Header实例都映射到MessageHeaders。在出站端,默认情况下,除了id、timestamp和映射到ConsumerRecord属性的头之外,所有MessageHeader都被映射。
通过向映射器提供patterns,可以指定要为出站(outbound)消息映射哪些headers。以下列表显示了一些映射示例:

public DefaultKafkaHeaderMapper() { 
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { 
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { 
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { 
    ...
}
第一个方法,使用默认的Jackson ObjectMapper并映射大多数标头。

第二个方法,使用提供的Jackson ObjectMapper并映射大多数标头。

第三个方法,使用默认的Jackson ObjectMapper并根据提供的patterns映射headers。

第四个方法,使用提供的Jackson ObjectMapper并根据提供的patterns映射headers。

Patterns相当简单,可以包含前导通配符、尾随通配符,也可以同时包含这两个通配符(例如*.cat.)。你可以用前导!来否定patterns。匹配header 名称(无论是正还是负)的第一个pattern 获得通过。
当你提供自己的patterns时,建议你包括!id和!timestamp,因为这些头在入站(inbound )端是只读的。
默认情况下,mapper只反序列化java.lang和java.util中的类。你可以通过使用addTrustedPackages 方法来添加其他(或所有)受信任的包。如果你会收到来自不受信任来源的消息,你可能希望只添加你信任的程序包。若要信任所有包,可以使用mapper.addTrustedPackages("
")。
在与不知道mapper JSON格式的系统通信时,以原始形式映射String header非常有用。

你可以指定某些字符串值headers不使用JSON进行映射,而是应映射到byte[]。AbstractKafkaHeaderMapper有属性mapAllStringsOut,当这个属性设置为true时,所有字符串值的headers都将使用charset属性(默认UTF-8)转换为byte[]。此外,还有一个属性rawMappedHeaders,它是header name : boolean的map;如果这个map包含一个header名称,而header包含一个String值,则它将使用字符集映射为原始byte[]。此映射还用于使用字符集将传入的原始byte[] headers映射到String,仅当map value中的布尔值为true时发生。如果布尔值为false,或者header名称不在具有true值的map中,则传入header将简单地映射为原始unmapped header。
下面的测试用例说明了这种机制。

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

默认情况下,SimpleKafkaHeaderMapper和DefaultKafkaHeaderMapper都映射所有入站报头。现在,patterns也可以应用于入站(inbound)映射。要为入站映射创建映射器,请在相应的映射器上使用以下静态方法之一:

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

例如:

DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

这将排除以abc开头的所有标头,并包括其他所有标头。
默认情况下,只要Jackson在类路径上,就会在MessagingMessageConverter和BatchMessagingMessageConverter中使用DefaultKafkaHeaderMapper。
使用batch converter,转换后的headers可在“KafkaHeaders.BATCH_CONVERTED_HEADERS”中作为List<Map<String, Object>>获得,其中列表中某个位置的map对应于payload中的数据位置。
如果没有转换器(可能是因为Jackson不存在,也可能是因为它被明确设置为null),consumer记录中的头将在“KafkaHeaders.NATIVE_HEADERS”提供,并且是未转换的。此header是Headers对象(在batch转换器的情况下是List),其中列表中的位置对应于payload中的数据位置)。
某些类型不适合JSON序列化,对于这些类型,可能首选简单的toString()序列化。DefaultKafkaHeaderMapper有一个名为addToStringClasses()的方法,它允许你提供应该以这种方式处理出站(outbound )映射的类的名称。在入站(inbound)映射过程中,它们被映射为String。默认情况下,只有org.springframework.util.MimeType和org.springframework.http.MediaType以这种方式映射。
2.3版本以后,字符串值headers的处理被简化了。默认情况下,此类标头不再是JSON编码的(即,它们没有添加“…?“)。该类型仍然添加到JSON_TYPES标头中,因此接收系统可以转换回String(从byte[])。mapper可以处理(解码)旧版本生成的标头(它检查开始位置的");通过这种方式,使用2.3的应用程序可以消费旧版本的记录。
为了与早期版本兼容,如果使用2.3的版本生成的记录可能被使用早期版本的应用程序使用,请将encodeStrings设置为true。当所有应用程序都使用2.3或更高版本时,可以将属性保留为默认值false。

@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用Spring Boot,它会自动配置这个converter bean到KafkaTemplate;否则,你要手动将此转换器添加到template中。

文章来源:https://blog.csdn.net/gabriel_wang_sh/article/details/135739655
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。