您现在的位置是:主页 > news > 合肥网站建设方案/成人用品推广网页
合肥网站建设方案/成人用品推广网页
admin2025/5/9 3:21:53【news】
简介合肥网站建设方案,成人用品推广网页,武汉百度竞价,个人推广网站1- Keyed State案例 以WordCount 的 sum 所使用的StreamGroupedReduce类为例,讲解了如何在代码中使用 需求:使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可) 用户自己管理KeyedState,存储Key的状态值,步…
合肥网站建设方案,成人用品推广网页,武汉百度竞价,个人推广网站1- Keyed State案例 以WordCount 的 sum 所使用的StreamGroupedReduce类为例,讲解了如何在代码中使用 需求:使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可)
用户自己管理KeyedState,存储Key的状态值,步…
1- Keyed State案例
以WordCount 的 sum
所使用的StreamGroupedReduce
类为例,讲解了如何在代码中使用
需求:使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可)
用户自己管理KeyedState,存储Key的状态值,步骤如下:
//-1.定义一个状态用来存放最大值
private transient ValueState<Long> maxValueState;
//-2.创建一个状态描述符对象
ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
//-3.根据状态描述符获取State
maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);
//-4.使用State
Long historyValue = maxValueState.value();
//判断当前值和历史值谁大
if (historyValue == null || currentValue > historyValue)
//-5.更新状态
maxValueState.update(currentValue);
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author liu a fu* @version 1.0* @date 2021/3/9 0009* @DESC Flink State 中KeyedState,默认情况下框架自己维护,此外可以手动维护*/
public class StreamKeyedStateDemo {public static void main(String[] args) throws Exception {//1-环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); //全局并行度设置//2-数据源sourceDataStreamSource<Tuple3<String, String, Long>> tupleStream = env.fromElements(Tuple3.of("上海", "普陀区", 488L), Tuple3.of("上海", "徐汇区", 212L),Tuple3.of("北京", "西城区", 823L), Tuple3.of("北京", "海淀区", 234L),Tuple3.of("上海", "杨浦区", 888L), Tuple3.of("上海", "浦东新区", 666L),Tuple3.of("北京", "东城区", 323L), Tuple3.of("上海", "黄浦区", 111L));//3-数据的transformation// TODO:使用DataStream转换函数maxBy获取每个市最大值SingleOutputStreamOperator<Tuple3<String, String, Long>> maxDataStream = tupleStream.keyBy(0).maxBy(2);//4-数据的sink// maxDataStream.printToErr();/*** (上海,普陀区,488)* (上海,普陀区,488)* (北京,西城区,823)* (北京,西城区,823)* (上海,杨浦区,888)* (上海,杨浦区,888)* (北京,西城区,823)* (上海,杨浦区,888)*/// TODO: 自己管理KededState存储每个Key状态StateSingleOutputStreamOperator<String> stateDataStream = tupleStream.keyBy(0) // 城市city分组.map(new RichMapFunction<Tuple3<String, String, Long>, String>() {// step1. 定义存储转态数据结构// transient是类型修饰符,只能用来修饰字段。在对象序列化的过程中,标记为transient的变量不会被序列化private transient ValueState<Long> valueState = null ;@Overridepublic void open(Configuration parameters) throws Exception {// step2. 初始化状态的值valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("maxState", Long.class));}@Overridepublic String map(Tuple3<String, String, Long> tuple) throws Exception {/*Key: 上海, tuple: 上海,普陀区,488*/// step3. 从以前状态中获取值Long historyValue = valueState.value();// 获取当前key中的值Long currentValue = tuple.f2 ;// 判断历史值是否为null,如果key的数据第1次出现,以前没有状态if(null == historyValue || currentValue > historyValue){// step4. 更新状态值valueState.update(currentValue);}// 返回处理结果return tuple.f0 + ", " + valueState.value();}});/*上海, 488上海, 488北京, 823北京, 823上海, 888上海, 888北京, 823上海, 888*/stateDataStream.printToErr();//5-获取执行的executeenv.execute(StreamKeyedStateDemo.class.getSimpleName());}
}
2- Operator State案例
对WordCoun示例中的FromElementsFunction类进行详解并分享,如何在代码中使用:
需求:使用ListState存储offset,模拟Kafka的offset维护。
/*** @author liu a fu* @version 1.0* @date 2021/3/9 0009* @DESC Flink State 中OperatorState,自定义数据源Kafka消费数据,保存消费偏移量数据并进行Checkpoint*/
public class StreamOperatorStateDemo {public static void main(String[] args) throws Exception {//1-环境准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO: 设置检查点Checkpoint相关属性,保存状态env.enableCheckpointing(1000) ; // 每隔1s执行一次Checkpointenv.setStateBackend(new FsStateBackend("file:///D:/ckpt/")) ; // 状态数据保存本地文件系统env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 当应用取消时,Checkpoint数据保存,不删除env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 固定延迟重启策略: 程序出现异常的时候,重启3次,每次延迟3秒钟重启,超过2次,程序退出env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000));//2-数据源sourceDataStreamSource<String> kafkaSource = env.addSource(new KafkaSource());//3-数据的transformation//4-数据的sinkkafkaSource.printToErr();//5-获取executeenv.execute(StreamOperatorStateDemo.class.getSimpleName());}/*** 自定义数据源,模拟从Kafka消费数据,管理消费偏移量,进行状态存储*/private static class KafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {// step1. 定义存储偏移量状态private transient ListState<Long> offsetState = null;// 标识程序是否运行private boolean isRunning = true ;// 定义偏移量private Long offset = 0L ;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning){// 模拟从Kafka消费数据int partitionId = getRuntimeContext().getIndexOfThisSubtask();offset += 1;//输出数据ctx.collect("p-" + partitionId + ": " + offset);// step 3. 更新偏移量到状态中offsetState.update(Arrays.asList(offset));// 每隔1秒消费数据TimeUnit.SECONDS.sleep(1);// 当偏移量被5整除时,抛出异常if(offset % 5 == 0){throw new RuntimeException("程序处理异常啦啦啦啦.................") ;}}
}@Overridepublic void cancel() {isRunning = false;}// ==================================================================@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// 状态描述符ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("offsetState", Long.class);// step2. 状态初始化offsetState = context.getOperatorStateStore().getListState(descriptor);//TODO: 是否从检查点恢复if (context.isRestored()){// 更新偏移量值offset = offsetState.get().iterator().next();}}// 保存状态到检查点@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {//保存数据offsetState.clear();//更新状态offsetState.update(Arrays.asList(offset));}}}
- 程序异常以后,自动从检查点目录恢复
// TODO: 设置检查点Checkpoint相关属性,保存状态
env.enableCheckpointing(1000) ; // 每隔1s执行一次Checkpoint
env.setStateBackend(new FsStateBackend("file:///D:/ckpt/")) ; // 状态数据保存本地文件系统
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);// 当应用取消时,Checkpoint数据保存,不删除env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));