大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器
创始人
2024-11-11 13:38:11
0

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 消费者的基本流程
  • 消费者的参数、参数补充

在这里插入图片描述

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。
序列化器作用就是用于序列化要发送的消息的。

在这里插入图片描述
Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

public interface Serializer extends Closeable {      /**      * Configure this class.      * @param configs configs in key/value pairs      * @param isKey whether is for key or value      */     default void configure(Map configs, boolean isKey) {         // intentionally left blank     }      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param data typed data      * @return serialized bytes      */     byte[] serialize(String topic, T data);      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param headers headers associated with the record      * @param data typed data      * @return serialized bytes      */     default byte[] serialize(String topic, Headers headers, T data) {         return serialize(topic, data);     }      /**      * Close this serializer.      * 

* This method must be idempotent as it may be called multiple times. */ @Override default void close() { // intentionally left blank } }

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

public class User {      private String username;      private String password;      private Integer age;      public String getUsername() {         return username;     }      public void setUsername(String username) {         this.username = username;     }      public String getPassword() {         return password;     }      public void setPassword(String password) {         this.password = password;     }      public Integer getAge() {         return age;     }      public void setAge(Integer age) {         this.age = age;     } }  

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

public class UserSerilazer implements Serializer {      @Override     public void configure(Map configs, boolean isKey) {         Serializer.super.configure(configs, isKey);     }      @Override     public byte[] serialize(String topic, User data) {         if (null == data) {             return null;         }         int userId = data.getUserId();         String username = data.getUsername();         String password = data.getPassword();         int age = data.getAge();          int usernameLen = 0;         byte[] usernameBytes;         if (null != username) {             usernameBytes = username.getBytes(StandardCharsets.UTF_8);             usernameLen = usernameBytes.length;         } else {             usernameBytes = new byte[0];          }          int passwordLen = 0;         byte[] passwordBytes;         if (null != password) {             passwordBytes = password.getBytes(StandardCharsets.UTF_8);             passwordLen = passwordBytes.length;         } else {             passwordBytes = new byte[0];         }          ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + usernameLen + 4 + passwordLen + 4);         byteBuffer.putInt(userId);         byteBuffer.putInt(usernameLen);         byteBuffer.put(usernameBytes);         byteBuffer.putInt(passwordLen);         byteBuffer.put(passwordBytes);         byteBuffer.putInt(age);         return byteBuffer.array();     }      @Override     public byte[] serialize(String topic, Headers headers, User data) {         return Serializer.super.serialize(topic, headers, data);     }      @Override     public void close() {         Serializer.super.close();     } }  

分区器

在这里插入图片描述

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer 

在这里插入图片描述
可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:
在这里插入图片描述

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
public class MyPartitioner implements Partitioner {      @Override     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {         return 0;     }      @Override     public void close() {      }      @Override     public void configure(Map configs) {      } } 

相关内容

热门资讯

第9分钟妙计!掌中乐游戏中心破... 第9分钟妙计!掌中乐游戏中心破解版,财神十三章辅助(辅助)真是是有下载(哔哩哔哩)一、掌中乐游戏中心...
九分钟诀窍!雀姬手游辅助脚本,... 九分钟诀窍!雀姬手游辅助脚本,小程序财神十三张脚本小游戏(辅助)果然真的是有脚本(哔哩哔哩)1、小程...
4分钟秘籍!免费辅助神器app... 4分钟秘籍!免费辅助神器app,超凡辅助软件(辅助)好像真的有脚本(哔哩哔哩)1、很好的工具软件,可...
第3分钟指南!哈糖菠萝怎么挂,... 第3分钟指南!哈糖菠萝怎么挂,海贝之城辅助(辅助)其实真的有安装(哔哩哔哩)1、哈糖菠萝怎么挂脚本辅...
8分钟指南!新金龙辅助工具,葫... 8分钟指南!新金龙辅助工具,葫芦娃辅助软件(辅助)果然是有下载(哔哩哔哩)葫芦娃辅助软件透视方法中分...
三分钟攻略!微信小程序财神十三... 三分钟攻略!微信小程序财神十三脚本app,三哥玩辅助器免费下载(辅助)确实真的有辅助(哔哩哔哩)1、...
第8分钟绝活儿!免费雀神挂件怎... 第8分钟绝活儿!免费雀神挂件怎么安装,三哥玩辅助器免费下载(辅助)都是有挂辅助(哔哩哔哩)1、玩家可...
第5分钟经验!新道游正版辅助,... 第5分钟经验!新道游正版辅助,财神破解版全自动脚本(辅助)竟然有挂工具(哔哩哔哩)1、起透看视 财神...
第七分钟机巧!威信茶馆有挂的吗... 第七分钟机巧!威信茶馆有挂的吗,新518互游辅助(辅助)原来真的有辅助(哔哩哔哩)1、进入游戏-大厅...
4分钟大纲!决战十水三修改器,... 4分钟大纲!决战十水三修改器,新天道怎么看底牌(辅助)切实是有app(哔哩哔哩)1、实时新天道怎么看...