
使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入HBase 中的 gyflinkresult:Produce5minAgg 表, rowkey“ 设备 id- 系统时间 ” (如:123-2023-01-01 12:06:06.001)
注: ProduceRecord 主题,每生产一个产品产生一条数据; change_handle_state字段为 1 代表已经检验, 0 代表为检验; 时间语义使用Processing Time 。

package flink.calculate.ProduceRecordimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.functions.process.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.util.Collectorimport org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytesimport java.text.SimpleDateFormat
import java.util.Date// 定义数据模型
case class TotalProduceHbase(ProduceMachineID: String, ProduceInspect: Int)
case class Produce5minAggHbase(rowkey: String, machine_id: String, total_produce: String)object Produce5minAggToHbase {// Kafka配置参数private val topicName: String = "ProduceRecord"private val bootstrapServer: String = "master:9092,slave1:9092,slave2:9092"private val zookeeperQuorum: String = "master,slave1,slave2"def main(args: Array[String]): Unit = {// 创建流式执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.getConfig.setAutoWatermarkInterval(0L) // 设置处理时间间隔env.setParallelism(1) // 设置并行度为1,避免多线程操作影响结果// 配置Kafka数据源val kafkaSource: KafkaSource[String] = KafkaSource.builder().setTopics(topicName) // 设置Kafka消费的主题.setBootstrapServers(bootstrapServer) // 设置Kafka服务器地址.setStartingOffsets(org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.latest()) // 设置从最新偏移量开始消费.setValueOnlyDeserializer(new SimpleStringSchema()) // 设置反序列化器.build()// 从Kafka中读取数据流val dataStream: DataStream[String] = env.fromSource(kafkaSource, org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), topicName)// 打印原始数据流,方便调试dataStream.print("========元数据============>")// 数据处理:过滤和转换为业务模型val result: DataStream[Produce5minAggHbase] = dataStream.map(parseProduceData) // 解析每条记录为TotalProduceHbase对象.filter(_.ProduceInspect == 1) // 过滤掉不需要的生产检查数据.keyBy(_.ProduceMachineID) // 按照生产机器ID进行分组.window(TumblingProcessingTimeWindows.of(Time.minutes(5))) // 设置5分钟滚动窗口.process(new Produce5minAggFunction) // 执行自定义的窗口处理函数// 打印窗口聚合后的结果result.print("=========聚合结果=========>>>>>>>")// 将聚合结果写入到HBaseresult.addSink(new Produce5minAggHbaseSink(zookeeperQuorum, "gyflinkresult", "Produce5minAgg", "info"))// 启动Flink作业env.execute("Produce5minAggToHbase")}/*** 解析输入的生产记录数据*/def parseProduceData(input: String): TotalProduceHbase = {// 假设数据以逗号分隔,解析出机器ID和生产检查标志val array: Array[String] = input.split(",")TotalProduceHbase(array(1), array(9).toInt) // 返回机器ID和生产检查标志}/*** 自定义的窗口处理函数,用于聚合每个5分钟窗口内的生产记录*/class Produce5minAggFunction extends ProcessWindowFunction[TotalProduceHbase, Produce5minAggHbase, String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[TotalProduceHbase], out: Collector[Produce5minAggHbase]): Unit = {// 定义累加器,统计窗口内的生产记录数量var count: Int = 0for (elem <- elements) {count += 1 // 对每一条生产记录计数}// 获取当前窗口的结束时间并格式化为yyyy-MM-dd HH:mm:ss.SSS格式val currentProcessingTime: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(context.currentProcessingTime))// 拼接Rowkey,格式为<机器ID>-<窗口结束时间>val rowkey: String = s"${key}-${currentProcessingTime}"val machine_id: String = keyval total_produce: String = count.toString// 输出聚合结果out.collect(Produce5minAggHbase(rowkey, machine_id, total_produce))}}/*** HBase Sink,用于将聚合结果写入到HBase*/class Produce5minAggHbaseSink(zookeeper: String, namespace: String, table: String, family: String) extends RichSinkFunction[Produce5minAggHbase] {private var connection: Connection = _private var htable: Table = _// 在Sink函数打开时,初始化HBase连接override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {val hbaseConf = HBaseConfiguration.create()hbaseConf.set("hbase.zookeeper.quorum", zookeeper) // 设置Zookeeper集群地址hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") // 设置Zookeeper端口hbaseConf.set("hbase.defaults.for.version.skip", "true") // 忽略HBase版本信息connection = ConnectionFactory.createConnection(hbaseConf) // 创建HBase连接htable = connection.getTable(TableName.valueOf(namespace, table)) // 获取指定的HBase表对象}// 在Sink函数关闭时,释放HBase连接override def close(): Unit = {if (htable != null) htable.close() // 关闭表对象if (connection != null) connection.close() // 关闭HBase连接}// 将聚合结果写入到HBaseoverride def invoke(value: Produce5minAggHbase, context: SinkFunction.Context): Unit = {// 设置RowKey,基于机器ID和窗口结束时间构造val rowKey: Array[Byte] = Bytes.toBytes(value.rowkey)// 创建Put对象,准备将数据插入到HBaseval put: Put = new Put(rowKey)put.addColumn(Bytes.toBytes(family), Bytes.toBytes("machine_id"), Bytes.toBytes(value.machine_id))put.addColumn(Bytes.toBytes(family), Bytes.toBytes("total_produce"), Bytes.toBytes(value.total_produce))// 执行Put操作,将数据插入到HBasehtable.put(put)}}
}

