欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > flink--会话模式与应用模式

flink--会话模式与应用模式

2024/10/26 5:21:27 来源:https://blog.csdn.net/m0_46331650/article/details/141476449  浏览:    关键词:flink--会话模式与应用模式

flink-会话模式部署

会话情况:

添加依赖

<properties><flink.version>1.17.2</flink.version>
</properties>
​
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
​<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
​<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>
​<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.13.6</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.2-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency>
​
</dependencies>
​
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

java代码

package com.demo.day1;
​
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
​
​
import static org.apache.flink.connector.kafka.sink.KafkaSink.builder;
​
public class Demo1_WordCount {
​public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
​KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("hadoop11:9092,hadoop12:9092,hadoop13:9092").setTopics("topic1").setValueOnlyDeserializer(new SimpleStringSchema()).setGroupId("g1").setStartingOffsets(OffsetsInitializer.latest()).build();
​DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(),"source");ds.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
​String[] arr = value.split(",");for(String s1:arr){out.collect(Tuple2.of(s1,1));}}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).sum("f1").print();
​env.execute();}
​
}

测试:

查看(数据同步):

申请一个yarn会话:

 

打架包:

上传架包:

运行:

测试:

 

修改并行度为3进行测试:

 打架包并上传:

查看:

生产者向topica(3个分区)发送数据:

向topica发送2000条数据

package com.kafka;
​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
​
import java.util.HashMap;
import java.util.Map;
​
public class producer{public static void main(String[] args) throws Exception{Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092,hadoop12:9092,hadoop13:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
​//设置自定义分区configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartitioner");
​
​KafkaProducer<String,String> producer = new KafkaProducer<>(configs);for (int i=0;i<1000;i++){ProducerRecord producerRecord=new ProducerRecord("topica","kafka");producer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null){System.out.println("发送成功:"+recordMetadata.partition());System.out.println("发送成功:"+recordMetadata.topic());System.out.println("发送成功:"+recordMetadata.offset());}}});}
​producer.close();}
}

发送分区随机:

查看数据:

向topica发送1000000条数据:

package com.kafka;
​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
​
import java.util.HashMap;
import java.util.Map;
​
public class producer{public static void main(String[] args) throws Exception{Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092,hadoop12:9092,hadoop13:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
​//设置自定义分区configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartitioner");
​
​KafkaProducer<String,String> producer = new KafkaProducer<>(configs);for (int i=0;i<1000000;i++){ProducerRecord producerRecord=new ProducerRecord("topica",i%3,null,"kafka");producer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null){System.out.println("发送成功:"+recordMetadata.partition());System.out.println("发送成功:"+recordMetadata.topic());System.out.println("发送成功:"+recordMetadata.offset());}}});}
​producer.close();}
}

查看数据:

 

flink-应用模式部署

1. 上传flink的lib和plugins到HDFS上
hdfs dfs -mkdir /flink-dist
hdfs dfs -put /opt/installs/flink/lib   /flink-dist
hdfs dfs -put /opt/installs/flink/plugins/ /flink-dist
2. 上传自己的jar包到HDFS
hdfs dfs -mkdir /my-flinkjars
hdfs dfs -put /opt/flinkjob/flink-test-1.0-SNAPSHOT.jar /my-flinkjars
3. 提交作业
flink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-c com.demo.day1.Demo1_WordCount  \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

提交作业后查看yarn:

测试:

写数据:

查看数据(不在一个分区,具有随机性):

 

 

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com