flink-会话模式部署
会话情况:
添加依赖
<properties><flink.version>1.17.2</flink.version> </properties> <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version><scope>provided</scope></dependency> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.13.6</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.2-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency> </dependencies> <build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins> </build>
java代码
package com.demo.day1; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import static org.apache.flink.connector.kafka.sink.KafkaSink.builder; public class Demo1_WordCount { public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("hadoop11:9092,hadoop12:9092,hadoop13:9092").setTopics("topic1").setValueOnlyDeserializer(new SimpleStringSchema()).setGroupId("g1").setStartingOffsets(OffsetsInitializer.latest()).build(); DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(),"source");ds.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] arr = value.split(",");for(String s1:arr){out.collect(Tuple2.of(s1,1));}}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).sum("f1").print(); env.execute();} }
测试:
查看(数据同步):
申请一个yarn会话:
打架包:
上传架包:
运行:
测试:
修改并行度为3进行测试:
打架包并上传:
查看:
生产者向topica(3个分区)发送数据:
向topica发送2000条数据
package com.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; public class producer{public static void main(String[] args) throws Exception{Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092,hadoop12:9092,hadoop13:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //设置自定义分区configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartitioner"); KafkaProducer<String,String> producer = new KafkaProducer<>(configs);for (int i=0;i<1000;i++){ProducerRecord producerRecord=new ProducerRecord("topica","kafka");producer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null){System.out.println("发送成功:"+recordMetadata.partition());System.out.println("发送成功:"+recordMetadata.topic());System.out.println("发送成功:"+recordMetadata.offset());}}});} producer.close();} }
发送分区随机:
查看数据:
向topica发送1000000条数据:
package com.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; public class producer{public static void main(String[] args) throws Exception{Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092,hadoop12:9092,hadoop13:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //设置自定义分区configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartitioner"); KafkaProducer<String,String> producer = new KafkaProducer<>(configs);for (int i=0;i<1000000;i++){ProducerRecord producerRecord=new ProducerRecord("topica",i%3,null,"kafka");producer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null){System.out.println("发送成功:"+recordMetadata.partition());System.out.println("发送成功:"+recordMetadata.topic());System.out.println("发送成功:"+recordMetadata.offset());}}});} producer.close();} }
查看数据:
flink-应用模式部署
1. 上传flink的lib和plugins到HDFS上 hdfs dfs -mkdir /flink-dist hdfs dfs -put /opt/installs/flink/lib /flink-dist hdfs dfs -put /opt/installs/flink/plugins/ /flink-dist 2. 上传自己的jar包到HDFS hdfs dfs -mkdir /my-flinkjars hdfs dfs -put /opt/flinkjob/flink-test-1.0-SNAPSHOT.jar /my-flinkjars 3. 提交作业 flink run-application \ -t yarn-application \ -Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \ -c com.demo.day1.Demo1_WordCount \ hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
提交作业后查看yarn:
测试:
写数据:
查看数据(不在一个分区,具有随机性):