配置props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
public Map producerConfigs(){ Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class); return props; } public ProducerFactory producerFactory(){ return new DefaultKafkaProducerFactory<>(producerConfigs()); } // 覆盖spring-kafka中的配置 @Bean public KafkaTemplate kafkaTemplate(){ return new KafkaTemplate(producerFactory()); }
public class CustomerProducerInterceptor implements ProducerInterceptor { // 发送消息时,对消息拦截。 @Override public ProducerRecord onSend(ProducerRecord producerRecord) { System.out.println("拦截消息" + producerRecord.toString()); return null; } // 服务器是否收到了当前这条消息 @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if(recordMetadata != null){ System.out.println("服务器收到消息" + recordMetadata.offset()); }else{ // 没有收到消息发送失败 System.out.println("消息发送失败!!!"); } } @Override public void close() { } @Override public void configure(Map map) { } }