大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器
创始人
2024-11-11 13:38:11
0

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 消费者的基本流程
  • 消费者的参数、参数补充

在这里插入图片描述

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。
序列化器作用就是用于序列化要发送的消息的。

在这里插入图片描述
Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

public interface Serializer extends Closeable {      /**      * Configure this class.      * @param configs configs in key/value pairs      * @param isKey whether is for key or value      */     default void configure(Map configs, boolean isKey) {         // intentionally left blank     }      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param data typed data      * @return serialized bytes      */     byte[] serialize(String topic, T data);      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param headers headers associated with the record      * @param data typed data      * @return serialized bytes      */     default byte[] serialize(String topic, Headers headers, T data) {         return serialize(topic, data);     }      /**      * Close this serializer.      * 

* This method must be idempotent as it may be called multiple times. */ @Override default void close() { // intentionally left blank } }

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

public class User {      private String username;      private String password;      private Integer age;      public String getUsername() {         return username;     }      public void setUsername(String username) {         this.username = username;     }      public String getPassword() {         return password;     }      public void setPassword(String password) {         this.password = password;     }      public Integer getAge() {         return age;     }      public void setAge(Integer age) {         this.age = age;     } }  

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

public class UserSerilazer implements Serializer {      @Override     public void configure(Map configs, boolean isKey) {         Serializer.super.configure(configs, isKey);     }      @Override     public byte[] serialize(String topic, User data) {         if (null == data) {             return null;         }         int userId = data.getUserId();         String username = data.getUsername();         String password = data.getPassword();         int age = data.getAge();          int usernameLen = 0;         byte[] usernameBytes;         if (null != username) {             usernameBytes = username.getBytes(StandardCharsets.UTF_8);             usernameLen = usernameBytes.length;         } else {             usernameBytes = new byte[0];          }          int passwordLen = 0;         byte[] passwordBytes;         if (null != password) {             passwordBytes = password.getBytes(StandardCharsets.UTF_8);             passwordLen = passwordBytes.length;         } else {             passwordBytes = new byte[0];         }          ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + usernameLen + 4 + passwordLen + 4);         byteBuffer.putInt(userId);         byteBuffer.putInt(usernameLen);         byteBuffer.put(usernameBytes);         byteBuffer.putInt(passwordLen);         byteBuffer.put(passwordBytes);         byteBuffer.putInt(age);         return byteBuffer.array();     }      @Override     public byte[] serialize(String topic, Headers headers, User data) {         return Serializer.super.serialize(topic, headers, data);     }      @Override     public void close() {         Serializer.super.close();     } }  

分区器

在这里插入图片描述

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer 

在这里插入图片描述
可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:
在这里插入图片描述

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
public class MyPartitioner implements Partitioner {      @Override     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {         return 0;     }      @Override     public void close() {      }      @Override     public void configure(Map configs) {      } } 

相关内容

热门资讯

透视科技!广东雀神智能插件安装... 透视科技!广东雀神智能插件安装可测试,德普之星透视辅助软件是真的吗(扑克教程辅助开挂工具)您好:德普...
一分钟了解!仙神互娱辅助,赣牌... 一分钟了解!仙神互娱辅助,赣牌圈修改器,详细辅助开挂插件(有挂教学)您好:赣牌圈修改器这款游戏可以开...
透视规律!神殿娱乐控制系统,微... 透视规律!神殿娱乐控制系统,微信小程序怎么开挂(揭秘教程辅助开挂平台);无需打开直接搜索薇:1367...
最新技巧!卡农免费辅助,边锋小... 最新技巧!卡农免费辅助,边锋小程序辅助免费,原先有开挂辅助神器(有挂技巧)这是一款可以让一直输的玩家...
三分钟了解!陕麻圈修改工具,哥... 您好:哥哥打大a游戏攻略这款游戏可以开挂的,确实是有挂的,很多玩家在这款游戏中打牌都会发现很多用户的...
透明总结!广东雀神祈福真的有用... 透明总结!广东雀神祈福真的有用吗,wepokerplus透视脚本免费(必胜教程辅助开挂插件);打开点...
揭秘一下!火神工作室辅助大全,... 火神工作室辅助大全开挂教程视频分享装挂详细步骤在当今的网络游戏中,火神工作室辅助大全作为一种经典的娱...
科技新动态!酷玩联盟辅助,胡易... 科技新动态!酷玩联盟辅助,胡易决胜麻架辅助,传授辅助工具(有挂技术);无需打开直接搜索加薇13670...
透视苹果版!广东雀神透视,微信... >>您好:微信小程序微乐陕西挖坑确实是有挂的,很多玩家在这款微信小程序微乐陕西挖坑游戏中打牌都会发现...
科普常识!多乐手游辅助,桂林字... 科普常识!多乐手游辅助,桂林字牌辅助,原本有开挂辅助平台(有挂详细);无需打开直接搜索打开薇:136...