欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 会展 > flink学习(11)——state

flink学习(11)——state

2025/1/4 19:45:33 来源:https://blog.csdn.net/weixin_52642840/article/details/144122048  浏览:    关键词:flink学习(11)——state

state

————保存历史数据

有状态计算和无状态计算

- 无状态计算:- 不需要考虑历史数据, 相同的输入,得到相同的输出!- 如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1)
- 有状态计算:- 需要考虑历史数据, 相同的输入,可能会得到不同的输出!- 如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)
Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。具体区别:从状态管理的方式上来说
Managed State是由Flink管理的,Flink帮忙存储、恢复和优化
Raw State是开发者自己管理的,需要自己序列化从状态的数据结构上来说
Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。
Raw State只支持字节,任何上层数据结构需要序列化为字节数组。从具体使用场景来说
绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State
Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。对Managed State继续细分,它又有两种类型:Keyed State和Operator State。 以下的对Managed State的概述Flink状态 - 托管状态- KeyedState ( 在keyBy之后可以使用状态 )- ValueState  (存储一个值)- ListState   (存储多个值)- MapState    (存储key-value) - OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]- 原生状态 (不用)

Keyed State(键控状态)

——用于分组后处理的数据,每个 key 都有自己的独立状态

- KeyedState ( 在keyBy之后可以使用状态 )- ValueState  (存储一个值)- ListState   (存储多个值)- MapState    (存储key-value) - AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。- ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。
1、Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中
2、当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key

案例1:求最大值

1、使用KeyedState中的ValueState获取数据中的最大值(获取每个key的最大值)(实际中直接使用maxBy即可)
2、我们自己使用KeyState中的ValueState来模拟实现maxBy
代码实现:
package com.bigdata.day05;/*** 1、键控状态现在只用于map flatmap 因为这两个是不能记录数据状态的,需要键控状态的帮助* 2、使用open进行初始化* 3、ValueState   只能存储一个值,用于存放最大值或者最小值* 4、keyBy+map 可以当作 maxBy**/
public class _04_stateDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(Tuple2.of("北京", 1L),Tuple2.of("上海", 2L),Tuple2.of("北京", 6L),Tuple2.of("上海", 8L),Tuple2.of("北京", 3L),Tuple2.of("上海", 4L),Tuple2.of("北京", 7L));//3. transformation-数据处理转换tupleDS.keyBy(new KeySelector<Tuple2<String, Long>, String>() {@Overridepublic String getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}}).map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {ValueState<Long> state = null;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<Long>("valueState",Long.class);state = getRuntimeContext().getState(stateDescriptor);}@Overridepublic Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {if (state.value() == null){state.update(value.f1);}else {if (state.value() < value.f1){state.update(value.f1);}}return Tuple2.of(value.f0,state.value());}}).print();//.maxBy(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

解决方式二:

package com.bigdata.day05;/*** 1、使用maxBy实现最大值* 2、和上面的区别在于由于map对于每一条数据都会有一个输出值,所以结果会有多个,只需看最后的即可* * **/
public class _04_stateDemo_maxby {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(Tuple2.of("北京", 1L),Tuple2.of("上海", 2L),Tuple2.of("北京", 6L),Tuple2.of("上海", 8L),Tuple2.of("北京", 3L),Tuple2.of("上海", 4L),Tuple2.of("北京", 7L));//3. transformation-数据处理转换tupleDS.keyBy(0).maxBy(1).print();//.maxBy(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

案例2

——如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]

package com.bigdata.day05;/*** 1、使用flatMap实现键控状态* 2、此时就没有一个合适的方法可以快速得到数据了* 3、reduce 只能相加,process 不能获取数量 app agg均不能使用 * **/
public class _05_stateDemo_2 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String line) throws Exception {String[] s = line.split(" ");return Tuple2.of(s[0],Integer.valueOf(s[1]));}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}}).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {ValueState<Integer> valueState = null;ListState<Integer> listState = null;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);valueState = getRuntimeContext().getState(stateDescriptor);ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);listState = getRuntimeContext().getListState(listStateDescriptor);}@Overridepublic void flatMap(Tuple2<String, Integer> tuple2, Collector<Tuple2<String, ArrayList<Integer>>> out) throws Exception {if (tuple2.f1>38){valueState.update(valueState.value()==null?1:(valueState.value()+1));listState.add(tuple2.f1);}ArrayList<Integer> wendus = new ArrayList<>();if (valueState.value()!=null && valueState.value()>=3){for (Integer wendu : listState.get()) {wendus.add(wendu);}out.collect(Tuple2.of(tuple2.f0,wendus));}}}).print();env.execute();}
}

使用map实现

package com.bigdata.day05;/*** 1、map的最后返回值是不能为null的必须有值* 2、而flatMap可以* 3、map会返回很多空* **/
public class _05_stateDemo_2_reduce {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String line) throws Exception {String[] s = line.split(" ");return Tuple2.of(s[0],Integer.valueOf(s[1]));}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {ValueState<Integer> valueState = null;ListState<Integer> listState = null;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);valueState = getRuntimeContext().getState(stateDescriptor);ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);listState = getRuntimeContext().getListState(listStateDescriptor);}@Overridepublic Tuple2<String, ArrayList<Integer>> map(Tuple2<String, Integer> tuple2) throws Exception {if (tuple2.f1>38){valueState.update(valueState.value()==null?1:(valueState.value()+1));listState.add(tuple2.f1);}ArrayList<Integer> wendus = new ArrayList<>();if (valueState.value()!=null && valueState.value()>=3){for (Integer wendu : listState.get()) {wendus.add(wendu);}return Tuple2.of(tuple2.f0,wendus);}return null;}}).print();env.execute();}
}

Operator State(操作符状态) 

- OperatorState ( 没有keyBy的情况下也可以使用 ) ————一般不使用

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com