欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 旅游 > Flink在指定时间窗口内统计均值,超过阈值后报警

Flink在指定时间窗口内统计均值,超过阈值后报警

2025/2/25 0:58:49 来源:https://blog.csdn.net/k7gxn56/article/details/145616666  浏览:    关键词:Flink在指定时间窗口内统计均值,超过阈值后报警

1、需求

统计物联网设备收集上来的温湿度数据,如果5分钟内的均值超过阈值(30摄氏度)则发出告警消息,要求时间窗口和阈值可在管理后台随时修改,实时生效(完成当前窗口后下一个窗口使用最新配置)。

物联网设备的数据从kafka中读取,配置数据从mysql中读取,有个管理后台可以调整窗口和阈值大小。

2、思路

使用flink的双流join,配置数据使用广播流,设备数据使用普通流。

3、实现代码

package cu.iot.flink;import com.alibaba.fastjson2.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import window.alert.AlertConfig;
import window.alert.EnrichedSensorData;
import window.alert.SensorData;import java.time.Duration;
import java.util.Properties;public class BroadcastDemo {private static String KAFKA_SERVERS = "192.168.213.1:9092,192.168.213.2:9092,192.168.213.3:9092";private static String KAFKA_GROUP_ID = "public-system-group-dev";private static String KAFKA_CONSUMER_TOPIC = "public-system-collect-data-dev";private static String KAFKA_PRODUCER_TOPIC = "public-system-collect-data-dev-output";private static String KAFKA_PRODUCER_SLIDE_TOPIC = "public-system-collect-data-dev-slide-output";private static String MYSQL_URL = "jdbc:mysql://10.20.72.1:8190/alerting?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8";private static String MYSQL_USERNAME = "root";private static String MYSQL_PASSWORD = "xxxxx";public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//env.enableCheckpointing(1000);env.setParallelism(1);Properties kafkaProps = new Properties();kafkaProps.setProperty("bootstrap.servers", KAFKA_SERVERS);kafkaProps.setProperty("group.id", KAFKA_GROUP_ID);FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(KAFKA_CONSUMER_TOPIC, new SimpleStringSchema(), kafkaProps);kafkaConsumer.setStartFromLatest();// 侧输出OutputTag<EnrichedSensorData> lateData = new OutputTag<>("lateData", Types.GENERIC(EnrichedSensorData.class));DataStream<SensorData> sensorStream = env.addSource(kafkaConsumer).map((MapFunction<String, SensorData>) value -> JSON.parseObject(value,SensorData.class));Properties dbProps = new Properties();dbProps.setProperty("url", MYSQL_URL);dbProps.setProperty("username", MYSQL_USERNAME);dbProps.setProperty("password", MYSQL_PASSWORD);DataStreamSource<window.alert.AlertConfig> streamSource = env.addSource(new MySQLSourceFunction(dbProps));SingleOutputStreamOperator<window.alert.AlertConfig> streamOperator = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<window.alert.AlertConfig>forMonotonousTimestamps().withTimestampAssigner((o, timestamp) -> o.getTimestamp()));MapStateDescriptor<String, window.alert.AlertConfig> broadcastStateDescriptor = new MapStateDescriptor<>("alertConfig",TypeInformation.of(new TypeHint<String>() {}),TypeInformation.of(new TypeHint<window.alert.AlertConfig>() {}));// 设置事件时间和水印SingleOutputStreamOperator<String> alertStream = sensorStream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorData>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((o, timestamp) -> o.getTimestamp())).connect(streamOperator.broadcast(broadcastStateDescriptor)).process(new BroadcastProcessFunction<SensorData, AlertConfig, EnrichedSensorData>() {@Overridepublic void processElement(SensorData value, ReadOnlyContext ctx, Collector<EnrichedSensorData> out) throws Exception {ReadOnlyBroadcastState<String, AlertConfig> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);AlertConfig alertConfig = broadcastState.get("alertConfig");if (alertConfig != null) {EnrichedSensorData enrichedSensorData = new EnrichedSensorData(value, alertConfig);//System.out.println("out.collect = "+enrichedSensorData);out.collect(enrichedSensorData);}}@Overridepublic void processBroadcastElement(AlertConfig value, BroadcastProcessFunction<SensorData, AlertConfig, EnrichedSensorData>.Context ctx, Collector<EnrichedSensorData> collector) throws Exception {BroadcastState<String, AlertConfig> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);//System.out.println("broadcastState.put = "+value);broadcastState.put("alertConfig", value);}}).keyBy(EnrichedSensorData::getSensorId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)).sideOutputLateData(lateData)
//                .aggregate(
//                        new AggregateFunction<EnrichedSensorData, Tuple2<Double, Integer>, Double>() {
//                    @Override
//                    public Tuple2<Double, Integer> createAccumulator() {
//                        return new Tuple2<>(0.0D, 0);
//                    }
//
//                    @Override
//                    public Tuple2<Double, Integer> add(EnrichedSensorData val, Tuple2<Double, Integer> accumulator) {
//                        return new Tuple2<>(accumulator.f0 + val.getTemperature() , accumulator.f1 + 1);
//                    }
//
//                    @Override
//                    public Double getResult(Tuple2<Double, Integer> accumulator) {
//                        Double rs = accumulator.f0 / accumulator.f1;
//                        System.out.println("getResult...accumulator.f0 ="+accumulator.f0+", accumulator.f1 = "+accumulator.f1+","+rs);
//                        return rs;
//                    }
//
//                    @Override
//                    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
//                        System.out.println("merge...  a.f0 + b.f0="+(a.f0 + b.f0)+", a.f1 + b.f1="+(a.f1 + b.f1));
//                        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
//                    }
//                }, new ProcessWindowFunction<Double, String, String, TimeWindow>() {
//                    @Override
//                    public void process(String s, ProcessWindowFunction<Double, String, String, TimeWindow>.Context ctx, Iterable<Double> list, Collector<String> out) {
//                        TimeWindow window = ctx.window();
//                        int count = 0;
//                        double sum = 0.0;
//                        for (Double v : list) {
//                            sum += v;
//                            count++;
//                        }
//                        out.collect("<Alert> ID:"+s+",window:["+window.getStart()+","+window.getEnd()+") avg="+(count > 0 ? sum / count : 0));
//                    }
//                }
//                ).apply(new WindowFunction<EnrichedSensorData, String, String, TimeWindow>() {@Overridepublic void apply(String sensorId, TimeWindow window, Iterable<EnrichedSensorData> input, Collector<String> out) {double sumTemp = 0;int count = 0;AlertConfig alertConfig = null;for (EnrichedSensorData data : input) {//System.out.println("Processing event: " + data);sumTemp += data.getTemperature();count++;alertConfig = data.getAlertConfig();}double avgTemp = count > 0 ? sumTemp / count : 0;if (alertConfig != null && avgTemp > alertConfig.getThreshold()) {out.collect("<ALERT> window: [" + window.getStart() + "," + window.getEnd() + ") sensorId:"+sensorId+",均值:"+avgTemp+",阈值:"+alertConfig.getThreshold());}}});alertStream.print("正常输出>>>");SingleOutputStreamOperator<String> out = alertStream;SideOutputDataStream<EnrichedSensorData> output = out.getSideOutput(lateData);output.printToErr("侧输出>>>");// Kafka sink for alertsFlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(KAFKA_SERVERS, KAFKA_PRODUCER_TOPIC, new SimpleStringSchema());FlinkKafkaProducer<EnrichedSensorData> kafkaProducer2 = new FlinkKafkaProducer<>(KAFKA_SERVERS, KAFKA_PRODUCER_SLIDE_TOPIC, new JsonSerializationSchema<>());alertStream.addSink(kafkaProducer);output.addSink(kafkaProducer2);env.execute("Sensor Alerts");}
}
package cu.iot.flink;import cn.hutool.core.util.RandomUtil;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import window.alert.AlertConfig;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;public class MySQLSourceFunction implements SourceFunction<AlertConfig> {private Properties p;private volatile boolean running = true;public MySQLSourceFunction(Properties p) {this.p = p;}@Overridepublic void run(SourceContext<AlertConfig> ctx) throws Exception {while (running) {AlertConfig config = fetchAlertConfig();ctx.collect(config);Thread.sleep(10000);}}@Overridepublic void cancel() {running = false;}private AlertConfig fetchAlertConfig() {AlertConfig config = new AlertConfig();try (Connection conn = DriverManager.getConnection(p.getProperty("url"), p.getProperty("username"), p.getProperty("password"));PreparedStatement stmt = conn.prepareStatement("SELECT time_window,upper_limit FROM t_alert_rule WHERE id = 3 AND rule_type = 'timeWindow'");ResultSet rs = stmt.executeQuery()) {if (rs.next()) {config.setWindowSizeMillis(rs.getLong("time_window"));config.setThreshold(rs.getDouble("upper_limit"));config.setTimestamp(System.currentTimeMillis()- RandomUtil.randomInt(1000, 10000));}} catch (Exception e) {e.printStackTrace();}return config;}
}
package window.alert;public class AlertConfig {private Long windowSizeMillis;private Double threshold;private Long timestamp;public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}public Long getWindowSizeMillis() {return windowSizeMillis;}public void setWindowSizeMillis(Long windowSizeMillis) {this.windowSizeMillis = windowSizeMillis;}public Double getThreshold() {return threshold;}public void setThreshold(Double threshold) {this.threshold = threshold;}public AlertConfig() {}public AlertConfig(long windowSize, double threshold) {this.windowSizeMillis = windowSize;this.threshold = threshold;}@Overridepublic String toString() {return "AlertConfig{" +"windowSizeMillis=" + windowSizeMillis +", threshold=" + threshold +", timestamp=" + timestamp +'}';}
}
package window.alert;public class SensorData {private String sensorId;private double temperature;private double humidity;private long timestamp;public SensorData() {}public String getSensorId() {return sensorId;}public void setSensorId(String sensorId) {this.sensorId = sensorId;}public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public double getHumidity() {return humidity;}public void setHumidity(double humidity) {this.humidity = humidity;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public SensorData(String sensorId, double temperature, double humidity, long timestamp) {this.sensorId = sensorId;this.temperature = temperature;this.humidity = humidity;this.timestamp = timestamp;}@Overridepublic String toString() {return "SensorData{" +"sensorId='" + sensorId + '\'' +", temperature=" + temperature +", humidity=" + humidity +", timestamp=" + timestamp +'}';}
}
package window.alert;public class EnrichedSensorData {private SensorData sensorData;private AlertConfig alertConfig;public EnrichedSensorData(SensorData sensorData, AlertConfig alertConfig) {this.sensorData = sensorData;this.alertConfig = alertConfig;}public SensorData getSensorData() {return sensorData;}public void setSensorData(SensorData sensorData) {this.sensorData = sensorData;}public String getSensorId() {return sensorData.getSensorId();}public double getTemperature() {return sensorData.getTemperature();}public AlertConfig getAlertConfig() {return alertConfig;}@Overridepublic String toString() {return "EnrichedSensorData{" +"sensorData=" + sensorData +", alertConfig=" + alertConfig +'}';}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词