您现在的位置是:主页 > news > 网站后台用esayui做/哪个网站学seo是免费的

网站后台用esayui做/哪个网站学seo是免费的

admin2025/5/7 23:35:26news

简介网站后台用esayui做,哪个网站学seo是免费的,幼儿园主题网络图设计动物世界,时代定制Flink双流join 目录Flink双流join1. Window Join滚动窗口Join滑动窗口Join会话窗口Join2. Interval Join在Flink中, 支持两种方式的流的Join: Window Join和Interval Join 1. Window Join 窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素. 注意: 所有的窗口…

网站后台用esayui做,哪个网站学seo是免费的,幼儿园主题网络图设计动物世界,时代定制Flink双流join 目录Flink双流join1. Window Join滚动窗口Join滑动窗口Join会话窗口Join2. Interval Join在Flink中, 支持两种方式的流的Join: Window Join和Interval Join 1. Window Join 窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素. 注意: 所有的窗口…

Flink双流join

目录

  • Flink双流join
    • 1. Window Join
      • 滚动窗口Join
      • 滑动窗口Join
      • 会话窗口Join
    • 2. Interval Join

在Flink中, 支持两种方式的流的Join: Window Join和Interval Join

1. Window Join

窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素.

注意:

  1. 所有的窗口join都是inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了)
  2. join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳

滚动窗口Join

在这里插入图片描述

import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Flink01_Join_Window_Tumbling {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop162", 8888)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop162", 9999)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));s1.join(s2).where(WaterSensor::getId).equalTo(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 必须使用窗口.apply(new JoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic String join(WaterSensor first, WaterSensor second) throws Exception {return "first: " + first + ", second: " + second;}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

滑动窗口Join

在这里插入图片描述

会话窗口Join

在这里插入图片描述

2. Interval Join

间隔流join(Interval Join), 是指使用一个流的数据按照key去join另外一条流的指定范围的数据.

如下图: 橙色的流去join绿色的流. 范围是由橙色流的event-time + lower bount和event-time + upper bount来决定的.

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

在这里插入图片描述

注意:

  1. Interval Join只支持event-time
  2. 必须是keyBy之后的流才可以interval join
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class Flink01_Join_Interval {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop162", 8888)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop162", 9999)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));s1.keyBy(WaterSensor::getId).intervalJoin(s2.keyBy(WaterSensor::getId))// 指定上下界.between(Time.seconds(-2), Time.seconds(3)).process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor left, WaterSensor right, Context ctx, Collector<String> out) throws Exception {out.collect(left + "," + right);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}