大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器
创始人
2024-11-11 13:38:11
0

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 消费者的基本流程
  • 消费者的参数、参数补充

在这里插入图片描述

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。
序列化器作用就是用于序列化要发送的消息的。

在这里插入图片描述
Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

public interface Serializer extends Closeable {      /**      * Configure this class.      * @param configs configs in key/value pairs      * @param isKey whether is for key or value      */     default void configure(Map configs, boolean isKey) {         // intentionally left blank     }      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param data typed data      * @return serialized bytes      */     byte[] serialize(String topic, T data);      /**      * Convert {@code data} into a byte array.      *      * @param topic topic associated with data      * @param headers headers associated with the record      * @param data typed data      * @return serialized bytes      */     default byte[] serialize(String topic, Headers headers, T data) {         return serialize(topic, data);     }      /**      * Close this serializer.      * 

* This method must be idempotent as it may be called multiple times. */ @Override default void close() { // intentionally left blank } }

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

public class User {      private String username;      private String password;      private Integer age;      public String getUsername() {         return username;     }      public void setUsername(String username) {         this.username = username;     }      public String getPassword() {         return password;     }      public void setPassword(String password) {         this.password = password;     }      public Integer getAge() {         return age;     }      public void setAge(Integer age) {         this.age = age;     } }  

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

public class UserSerilazer implements Serializer {      @Override     public void configure(Map configs, boolean isKey) {         Serializer.super.configure(configs, isKey);     }      @Override     public byte[] serialize(String topic, User data) {         if (null == data) {             return null;         }         int userId = data.getUserId();         String username = data.getUsername();         String password = data.getPassword();         int age = data.getAge();          int usernameLen = 0;         byte[] usernameBytes;         if (null != username) {             usernameBytes = username.getBytes(StandardCharsets.UTF_8);             usernameLen = usernameBytes.length;         } else {             usernameBytes = new byte[0];          }          int passwordLen = 0;         byte[] passwordBytes;         if (null != password) {             passwordBytes = password.getBytes(StandardCharsets.UTF_8);             passwordLen = passwordBytes.length;         } else {             passwordBytes = new byte[0];         }          ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + usernameLen + 4 + passwordLen + 4);         byteBuffer.putInt(userId);         byteBuffer.putInt(usernameLen);         byteBuffer.put(usernameBytes);         byteBuffer.putInt(passwordLen);         byteBuffer.put(passwordBytes);         byteBuffer.putInt(age);         return byteBuffer.array();     }      @Override     public byte[] serialize(String topic, Headers headers, User data) {         return Serializer.super.serialize(topic, headers, data);     }      @Override     public void close() {         Serializer.super.close();     } }  

分区器

在这里插入图片描述

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer 

在这里插入图片描述
可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:
在这里插入图片描述

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
public class MyPartitioner implements Partitioner {      @Override     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {         return 0;     }      @Override     public void close() {      }      @Override     public void configure(Map configs) {      } } 

相关内容

热门资讯

黑科技辅助!wpk辅助神器(透... 黑科技辅助!wpk辅助神器(透视)软件透明辅助挂(本来是真的有挂)-哔哩哔哩是一款可以让一直输的玩家...
5分钟了解“创思维正版辅助器下... 5分钟了解“创思维正版辅助器下载”详细透视开挂辅助安装-哔哩哔哩;一、创思维正版辅助器下载有挂的是的...
两分钟科普!wpk真吗,哈糖大... 两分钟科普!wpk真吗,哈糖大菠萝可以开挂吗,曝光教程(发现有挂)-哔哩哔哩哈糖大菠萝可以开挂吗辅助...
第一分钟了解(昆仑大厅)外挂辅... 第一分钟了解(昆仑大厅)外挂辅助插件(透视)详细教程(2022已更新)(哔哩哔哩);亲真的是有正版授...
黑科技辅助!wpk俱乐部长期盈... 黑科技辅助!wpk俱乐部长期盈利打法(透视)软件透明挂黑科技(切实存在有挂)-哔哩哔哩;1、让任何用...
第6分钟了解“功夫川嘛辅助器”... 第6分钟了解“功夫川嘛辅助器”详细透视开挂辅助器-哔哩哔哩;人气非常高,ai更新快且高清可以动的一个...
第五分钟辅助!xpoker辅助... 第五分钟辅助!xpoker辅助,德州透视插件,攻略教程(有挂方法)-哔哩哔哩德州透视插件辅助器中分为...
两分钟了解(皮皮跑胡子)外挂透... 两分钟了解(皮皮跑胡子)外挂透明挂辅助工具(辅助挂)透明挂教程(2020已更新)(哔哩哔哩);皮皮跑...
黑科技辅助!微扑克可以加入俱乐... 您好,微扑克可以加入俱乐部这款游戏可以开挂的,确实是有挂的,需要了解加微【136704302】很多玩...
8分钟了解“掌中乐游戏中心辅助... 8分钟了解“掌中乐游戏中心辅助器”详细透视开挂辅助脚本-哔哩哔哩;1、这是跨平台的掌中乐游戏中心辅助...