大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器
创始人
2024-11-11 13:38:11
0

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 消费者的基本流程
  • 消费者的参数、参数补充

在这里插入图片描述

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。
序列化器作用就是用于序列化要发送的消息的。

在这里插入图片描述
Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

public interface Serializer extends Closeable {      /**      * Configure this class.      * @param configs configs in key/value pairs      * @param isKey whether is for key or value      */     default void configure(Map configs, boolean isKey) {         // intentionally left blank     }      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param data typed data      * @return serialized bytes      */     byte[] serialize(String topic, T data);      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param headers headers associated with the record      * @param data typed data      * @return serialized bytes      */     default byte[] serialize(String topic, Headers headers, T data) {         return serialize(topic, data);     }      /**      * Close this serializer.      * 

* This method must be idempotent as it may be called multiple times. */ @Override default void close() { // intentionally left blank } }

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

public class User {      private String username;      private String password;      private Integer age;      public String getUsername() {         return username;     }      public void setUsername(String username) {         this.username = username;     }      public String getPassword() {         return password;     }      public void setPassword(String password) {         this.password = password;     }      public Integer getAge() {         return age;     }      public void setAge(Integer age) {         this.age = age;     } }  

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

public class UserSerilazer implements Serializer {      @Override     public void configure(Map configs, boolean isKey) {         Serializer.super.configure(configs, isKey);     }      @Override     public byte[] serialize(String topic, User data) {         if (null == data) {             return null;         }         int userId = data.getUserId();         String username = data.getUsername();         String password = data.getPassword();         int age = data.getAge();          int usernameLen = 0;         byte[] usernameBytes;         if (null != username) {             usernameBytes = username.getBytes(StandardCharsets.UTF_8);             usernameLen = usernameBytes.length;         } else {             usernameBytes = new byte[0];          }          int passwordLen = 0;         byte[] passwordBytes;         if (null != password) {             passwordBytes = password.getBytes(StandardCharsets.UTF_8);             passwordLen = passwordBytes.length;         } else {             passwordBytes = new byte[0];         }          ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + usernameLen + 4 + passwordLen + 4);         byteBuffer.putInt(userId);         byteBuffer.putInt(usernameLen);         byteBuffer.put(usernameBytes);         byteBuffer.putInt(passwordLen);         byteBuffer.put(passwordBytes);         byteBuffer.putInt(age);         return byteBuffer.array();     }      @Override     public byte[] serialize(String topic, Headers headers, User data) {         return Serializer.super.serialize(topic, headers, data);     }      @Override     public void close() {         Serializer.super.close();     } }  

分区器

在这里插入图片描述

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer 

在这里插入图片描述
可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:
在这里插入图片描述

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
public class MyPartitioner implements Partitioner {      @Override     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {         return 0;     }      @Override     public void close() {      }      @Override     public void configure(Map configs) {      } } 

相关内容

热门资讯

七分钟德州!(wePoKe)软... 您好,德州这款游戏可以开挂的,确实是有挂的,需要了解加微【841106723】很多玩家在这款游戏中打...
1分钟测试!(同仁娱乐)外挂透... 1分钟测试!(同仁娱乐)外挂透视辅助挂,Wepoke新更新软件透明挂,详细教程(有挂工具)-哔哩哔哩...
1分钟苹果版本!(Wepoke... 1分钟苹果版本!(Wepoke代打ai)是有挂辅助挂吗,最新版wpk2024已更新,详细教程(今日头...
3分钟德州!(智星菠萝)其实一... 您好,智星菠萝这款游戏可以开挂的,确实是有挂的,需要了解加微【841106723】很多玩家在这款游戏...
6分钟分钟!WEPOke真的是... 6分钟分钟!WEPOke真的是有挂吗,Wepoke ai代打其实是真的有挂的,详细教程(2024已更...
八分钟机器人!(pokerwo... 八分钟机器人!(pokerworld)确实真有挂辅助挂吗,微扑克透明挂其实真实是有挂,详细教程(果真...
八分钟游戏!(广西八一字牌)外... 八分钟游戏!(广西八一字牌)外挂透视辅助挂,竞技联盟确实是有挂猫腻详细教程(真的有挂)-哔哩哔哩;A...
七分钟检测!(Wepoke渠道... 七分钟检测!(Wepoke渠道)原来确实是有挂,WPK科技其实一直总是有挂,详细教程(有挂功能)-哔...
十分钟新版!(WPK外挂)是有... 您好,天天欢乐德州这款游戏可以开挂的,确实是有挂的,需要了解加微【757446909】很多玩家在这款...
九分钟苹果版!(k体育)确实是... 九分钟苹果版!(k体育)确实是有挂吗,k体育2020已更新,详细教程(有挂攻略)-哔哩哔哩;德扑锦标...