在Flink中,处理时间序列数据时,通常需要考虑事件时间和水印(watermarks)的处理。以下是修改前后的代码对比分析:
val systemDS = unitDS.map(dp => { dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode)) dp }).keyBy(_.getDeviceCode) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .process(new MySystemWinF) unitDS 经过一个 map 操作,将每个元素的 deviceCode 转换为系统设备码。keyBy(_.getDeviceCode) 对转换后的设备码进行分组。process 操作应用自定义的窗口函数 HPageSystemWinF 来处理每个窗口中的数据。注意:修改前的代码没有显示地处理水印(watermarks),这可能导致在处理乱序数据或延迟数据时出现问题。
val systemDS = unitDS.map(dp => { dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode)) dp }).keyBy(_.getDeviceCode) .assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 假设这里应该是.forBoundedOutOfOrderness而不是.forBoundedOutOfOrdernessDaysPower .withIdleness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner[DaysPower] { override def extractTimestamp(element: DaysPower, recordTimestamp: Long): Long = { Math.max(element.getEventTime, recordTimestamp) } }) ).keyBy(_.getDeviceCode) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .process(new MySystemWinF) map, keyBy, 和 window 操作。assignTimestampsAndWatermarks 方法来处理事件时间和水印: WatermarkStrategy.forBoundedOutOfOrderness 允许一定程度的乱序数据(这里是5秒)。.withIdleness(Duration.ofSeconds(5)) 设置了空闲超时时间为5秒,用于处理不活跃的键。withTimestampAssigner 自定义了时间戳分配器,确保使用的事件时间是元素中的 eventTime 和记录的 recordTimestamp 中的较大值。
上一篇:黑马Stream教程-练习题