public interface Header {
String key();
byte[] value();
}
KafkaHeaderMapper策略用于在Kafka Headers和MessageHeaders之间映射header条目。其接口定义如下:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
SimpleKafkaHeaderMapper将原始headers映射为byte[],并提供用于转换为String值的配置选项。
DefaultKafkaHeaderMapper将键(key)映射为MessageHeaders header名称,并且为了支持出站(outbound)消息的丰富header类型,执行JSON转换。一个“特殊”头(key为spring_json_header_types)包含一个:的json映射。此header用于入站(inbound)端,以提供每个header值到原始类型的适当转换。
在入站(inbound)端,所有Kafka Header实例都映射到MessageHeaders。在出站端,默认情况下,除了id、timestamp和映射到ConsumerRecord属性的头之外,所有MessageHeader都被映射。
通过向映射器提供patterns,可以指定要为出站(outbound)消息映射哪些headers。以下列表显示了一些映射示例:
public DefaultKafkaHeaderMapper() {
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
...
}
public DefaultKafkaHeaderMapper(String... patterns) {
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
...
}
第一个方法,使用默认的Jackson ObjectMapper并映射大多数标头。
第二个方法,使用提供的Jackson ObjectMapper并映射大多数标头。
第三个方法,使用默认的Jackson ObjectMapper并根据提供的patterns映射headers。
第四个方法,使用提供的Jackson ObjectMapper并根据提供的patterns映射headers。
Patterns相当简单,可以包含前导通配符、尾随通配符,也可以同时包含这两个通配符(例如*.cat.)。你可以用前导!来否定patterns。匹配header 名称(无论是正还是负)的第一个pattern 获得通过。
当你提供自己的patterns时,建议你包括!id和!timestamp,因为这些头在入站(inbound )端是只读的。
默认情况下,mapper只反序列化java.lang和java.util中的类。你可以通过使用addTrustedPackages 方法来添加其他(或所有)受信任的包。如果你会收到来自不受信任来源的消息,你可能希望只添加你信任的程序包。若要信任所有包,可以使用mapper.addTrustedPackages("")。
在与不知道mapper JSON格式的系统通信时,以原始形式映射String header非常有用。
你可以指定某些字符串值headers不使用JSON进行映射,而是应映射到byte[]。AbstractKafkaHeaderMapper有属性mapAllStringsOut,当这个属性设置为true时,所有字符串值的headers都将使用charset属性(默认UTF-8)转换为byte[]。此外,还有一个属性rawMappedHeaders,它是header name : boolean的map;如果这个map包含一个header名称,而header包含一个String值,则它将使用字符集映射为原始byte[]。此映射还用于使用字符集将传入的原始byte[] headers映射到String,仅当map value中的布尔值为true时发生。如果布尔值为false,或者header名称不在具有true值的map中,则传入header将简单地映射为原始unmapped header。
下面的测试用例说明了这种机制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,SimpleKafkaHeaderMapper和DefaultKafkaHeaderMapper都映射所有入站报头。现在,patterns也可以应用于入站(inbound)映射。要为入站映射创建映射器,请在相应的映射器上使用以下静态方法之一:
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除以abc开头的所有标头,并包括其他所有标头。
默认情况下,只要Jackson在类路径上,就会在MessagingMessageConverter和BatchMessagingMessageConverter中使用DefaultKafkaHeaderMapper。
使用batch converter,转换后的headers可在“KafkaHeaders.BATCH_CONVERTED_HEADERS”中作为List<Map<String, Object>>获得,其中列表中某个位置的map对应于payload中的数据位置。
如果没有转换器(可能是因为Jackson不存在,也可能是因为它被明确设置为null),consumer记录中的头将在“KafkaHeaders.NATIVE_HEADERS”提供,并且是未转换的。此header是Headers对象(在batch转换器的情况下是List),其中列表中的位置对应于payload中的数据位置)。
某些类型不适合JSON序列化,对于这些类型,可能首选简单的toString()序列化。DefaultKafkaHeaderMapper有一个名为addToStringClasses()的方法,它允许你提供应该以这种方式处理出站(outbound )映射的类的名称。在入站(inbound)映射过程中,它们被映射为String。默认情况下,只有org.springframework.util.MimeType和org.springframework.http.MediaType以这种方式映射。
2.3版本以后,字符串值headers的处理被简化了。默认情况下,此类标头不再是JSON编码的(即,它们没有添加“…?“)。该类型仍然添加到JSON_TYPES标头中,因此接收系统可以转换回String(从byte[])。mapper可以处理(解码)旧版本生成的标头(它检查开始位置的");通过这种方式,使用2.3的应用程序可以消费旧版本的记录。
为了与早期版本兼容,如果使用2.3的版本生成的记录可能被使用早期版本的应用程序使用,请将encodeStrings设置为true。当所有应用程序都使用2.3或更高版本时,可以将属性保留为默认值false。
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用Spring Boot,它会自动配置这个converter bean到KafkaTemplate;否则,你要手动将此转换器添加到template中。