大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器
创始人
2024-11-11 13:38:11
0

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 消费者的基本流程
  • 消费者的参数、参数补充

在这里插入图片描述

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。
序列化器作用就是用于序列化要发送的消息的。

在这里插入图片描述
Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

public interface Serializer extends Closeable {      /**      * Configure this class.      * @param configs configs in key/value pairs      * @param isKey whether is for key or value      */     default void configure(Map configs, boolean isKey) {         // intentionally left blank     }      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param data typed data      * @return serialized bytes      */     byte[] serialize(String topic, T data);      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param headers headers associated with the record      * @param data typed data      * @return serialized bytes      */     default byte[] serialize(String topic, Headers headers, T data) {         return serialize(topic, data);     }      /**      * Close this serializer.      * 

* This method must be idempotent as it may be called multiple times. */ @Override default void close() { // intentionally left blank } }

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

public class User {      private String username;      private String password;      private Integer age;      public String getUsername() {         return username;     }      public void setUsername(String username) {         this.username = username;     }      public String getPassword() {         return password;     }      public void setPassword(String password) {         this.password = password;     }      public Integer getAge() {         return age;     }      public void setAge(Integer age) {         this.age = age;     } }  

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

public class UserSerilazer implements Serializer {      @Override     public void configure(Map configs, boolean isKey) {         Serializer.super.configure(configs, isKey);     }      @Override     public byte[] serialize(String topic, User data) {         if (null == data) {             return null;         }         int userId = data.getUserId();         String username = data.getUsername();         String password = data.getPassword();         int age = data.getAge();          int usernameLen = 0;         byte[] usernameBytes;         if (null != username) {             usernameBytes = username.getBytes(StandardCharsets.UTF_8);             usernameLen = usernameBytes.length;         } else {             usernameBytes = new byte[0];          }          int passwordLen = 0;         byte[] passwordBytes;         if (null != password) {             passwordBytes = password.getBytes(StandardCharsets.UTF_8);             passwordLen = passwordBytes.length;         } else {             passwordBytes = new byte[0];         }          ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + usernameLen + 4 + passwordLen + 4);         byteBuffer.putInt(userId);         byteBuffer.putInt(usernameLen);         byteBuffer.put(usernameBytes);         byteBuffer.putInt(passwordLen);         byteBuffer.put(passwordBytes);         byteBuffer.putInt(age);         return byteBuffer.array();     }      @Override     public byte[] serialize(String topic, Headers headers, User data) {         return Serializer.super.serialize(topic, headers, data);     }      @Override     public void close() {         Serializer.super.close();     } }  

分区器

在这里插入图片描述

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer 

在这里插入图片描述
可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:
在这里插入图片描述

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
public class MyPartitioner implements Partitioner {      @Override     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {         return 0;     }      @Override     public void close() {      }      @Override     public void configure(Map configs) {      } } 

相关内容

热门资讯

外挂秘籍!xpoker怎么作必... 外挂秘籍!xpoker怎么作必弊,德普之星透视辅助插件-本来一直总是有辅助app(哔哩哔哩)运xpo...
攻略讲解!智星菠萝有挂吗,青鸟... 攻略讲解!智星菠萝有挂吗,青鸟辅助平台,确实真的是有辅助开挂(有挂猫腻);1、实时智星菠萝有挂吗透视...
透视有挂!pokemmo辅助工... 透视有挂!pokemmo辅助工具(透视)其实有辅助辅助器(有挂方式)-哔哩哔哩一、pokemmo辅助...
外挂经验!xpoker辅助神器... 外挂经验!xpoker辅助神器,sohoo辅助-好像真的是有辅助神器(哔哩哔哩)1、xpoker辅助...
记者发布!云扑克有透视吗,新久... 记者发布!云扑克有透视吗,新久久辅助器,一直真的是有辅助教程(存在有挂)1、这是跨平台的云扑克有透视...
今日!德州透视是真的吗(透视)... 今日!德州透视是真的吗(透视)切实是有辅助下载(有挂讲解)-哔哩哔哩1)德州透视是真的吗有没有挂:进...
外挂手筋!德普之星app安卓版... 外挂手筋!德普之星app安卓版破解版,拱趴大菠萝作必弊方法-都是真的有辅助脚本(哔哩哔哩)1、金币登...
查到实测!wepoker透视器... 查到实测!wepoker透视器免费,指尖捕鱼bgm,竟然真的有辅助教程(真的有挂)指尖捕鱼bgm脚本...
今天下午!aapoker透视怎... 今天下午!aapoker透视怎么用(透视)竟然存在有辅助神器(有人有挂)-哔哩哔哩1、每一步都需要思...
外挂法子!聚星ai辅助工具激活... 外挂法子!聚星ai辅助工具激活码,poker world辅助器-其实一直总是有辅助攻略(哔哩哔哩)1...