public class OverWindowTableApiTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStreamSource streamSource = env.socketTextStream("hadoop102", 7777); SingleOutputStreamOperator map = streamSource.map(new MapFunction() { @Override public WaterSensor map(String value) throws Exception { String[] splits = value.split(","); WaterSensor waterSensor = new WaterSensor(splits[0], Long.valueOf(splits[1]), Integer.valueOf(splits[2])); return waterSensor; } }); // 流转表(指定表结构) Schema schema = Schema.newBuilder() .column("id", "STRING") .column("ts", "BIGINT") .column("vc", "INT") .columnByExpression("pt" , "PROCTIME()") .columnByExpression("et", "TO_TIMESTAMP_LTZ(ts, 3)") //.watermark("et" , "source_watermark()") // 沿用流中的水位线 .watermark("et" , "et - INTERVAL '1' SECOND ") // 重新定义水位线 .build(); Table table = tableEnv.fromDataStream(map, schema); // API式开窗 OverWindow w1 = Over.partitionBy($("id")).orderBy($("pt")).preceding(UNBOUNDED_ROW).following(CURRENT_ROW).as("w"); table.window(w1) .select( $("id"), $("ts"), $("vc"), $("et"), $("vc").sum().over($("w")).as("sum_vc") ).execute().print(); env.execute(); } }
再次运行
成功!
上一篇:第十课:telnet(远程登入)