欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 旅游 > 零基础入门Flink,掌握基本使用方法

零基础入门Flink,掌握基本使用方法

2024/11/30 4:57:28 来源:https://blog.csdn.net/qq_46248151/article/details/143885601  浏览:    关键词:零基础入门Flink,掌握基本使用方法

Flink基本概念

首先来讲,Flink是一个面向数据流处理批处理的分布式开源计算框架。

那么,流处理和批处理分别处理什么样的数据呢,这就涉及两个概念-无界流和有界流

无界流VS有界流

任何类型的数据都可以形成流数据,比如用户交互记录, 传感器数据,事件日志等等。

Apache Flink 擅长处理无界和有界数据集。 精确的时间控制和有状态的计算,使得 Flink能够运行
任何处理无界流的应用

流数据分为无界流和有界流。

  • 1) 无界流:有定义流的开始,但没有定义流的结束, 会不停地产生数据,无界流采用的是流处理方式。
  • 2) 有界流:有定义流的开始, 也有定义流的结束, 需要在获取所有数据后再进行计算,有界流采用的是批处理方式。

组件结构

DataSet 一般用来处理有界流数据。
DataStream一般用来处理无界流数据。

Flink基础Demo案例

1、基本环境搭建

pom.xml核心配置

    <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.11.2</flink.version><java.version>1.8</java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version><spring.boot.version>2.1.6.RELEASE</spring.boot.version><mysql.jdbc.version>5.1.47</mysql.jdbc.version></properties><dependencies><!-- Flink 的示例和常用工具类 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!--Flink 的流处理核心库--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!--Flink 的客户端库,用于提交作业等--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.8</version></dependency></dependencies>

2、批处理Demo实现

这个demo实现-----通过批处理方式,统计日志文件中的异常数量。

文件准备order_info.log,文件内容如下

2019-08-25 16:32:55,626 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$e1b53a9c] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:32:56,918 [main] INFO  [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
2019-08-25 16:32:57,829 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:32:57,834 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:32:57,847 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 5ms. Found 0 repository interfaces.
2019-08-25 16:32:57,858 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:32:57,859 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:32:57,870 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
2019-08-25 16:32:57,908 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
2019-08-25 16:32:57,928 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2019-08-25 16:32:58,144 [main] INFO  [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
2019-08-25 16:32:58,155 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-25 16:32:58,162 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-25 16:32:58,176 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-25 16:32:58,212 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:32:58,267 [configOperate_1_2] WARN  [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
2019-08-25 16:32:58,269 [main] WARN  [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
2019-08-25 16:32:58,280 [main] INFO  [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-08-25 16:32:58,289 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: nullat org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: nullat org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)... 19 common frames omitted
Caused by: io.seata.common.exception.NotSupportYetException: not support register type: nullat io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.CGLIB$globalTransactionScanner$0(<generated>)at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba$$FastClassBySpringCGLIB$$ec5dcab5.invoke(<generated>)at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.globalTransactionScanner(<generated>)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)... 20 common frames omitted
Caused by: java.lang.IllegalArgumentException: illegal type:nullat io.seata.config.ConfigType.getType(ConfigType.java:62)at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)... 35 common frames omitted
2019-08-25 16:36:05,248 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$ed668c12] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:36:06,559 [main] INFO  [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
2019-08-25 16:36:07,554 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:36:07,555 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:36:07,568 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 6ms. Found 0 repository interfaces.
2019-08-25 16:36:07,581 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:36:07,583 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:36:07,595 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
2019-08-25 16:36:07,639 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
2019-08-25 16:36:07,661 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2019-08-25 16:36:07,919 [main] INFO  [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
2019-08-25 16:36:07,930 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-25 16:36:07,937 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-25 16:36:07,944 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-25 16:36:07,987 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:36:10,247 [configOperate_1_2] WARN  [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
2019-08-25 16:36:14,137 [main] WARN  [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
2019-08-25 16:36:14,150 [main] INFO  [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-08-25 16:36:14,161 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: nullat org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: nullat org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)... 19 common frames omitted
Caused by: io.seata.common.exception.NotSupportYetException: not support register type: nullat io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.CGLIB$globalTransactionScanner$0(<generated>)at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830$$FastClassBySpringCGLIB$$691b278a.invoke(<generated>)at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.globalTransactionScanner(<generated>)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)... 20 common frames omitted
Caused by: java.lang.IllegalArgumentException: illegal type:nullat io.seata.config.ConfigType.getType(ConfigType.java:62)at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)... 35 common frames omitted
2019-08-25 16:36:21,421 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean

 代码实现

public class BatchProcessorApplication {public static void main(String[] args) throws Exception{//1.定义Flink运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2.读取数据源(日志文件信息)DataSource<String> logData = env.readTextFile("./data/order_info.log");//3.清洗转换数据logData.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {// 1) 根据正则,提取每行日志的级别Pattern pattern = Pattern.compile("\\[main\\](.*?)\\[");Matcher matcher = pattern.matcher(value);if (matcher.find()){// 2) 如果匹配符合规则,放置元组,输出数据collector.collect(new Tuple2<>(matcher.group(1).trim(),1));}}//groupBy(0)代表对collector中每个tuple2的进行分组,sum(1)代表对tuple2中的Integer进行求和}).groupBy(0).sum(1).print();}
}

3、流处理Demo实现

本地模拟socket请求

代码实现

public class StreamProcessorApplication {public static void main(String[] args) throws Exception{//1.定义Flink执行环境StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();//2.从socket数据流中读取实时流数据DataStreamSource<String> dataStreamSource = streamEnv.socketTextStream("127.0.0.1", 9999);dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = line.split("\t");collector.collect(new Tuple2<>(split[0],1));}// setParallelism设置并行流计算有多少个线程}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);streamEnv.execute();}}

Flink部署安装配置

首先确保你的Linux系统已经安装好了JDK

解压flink安装包

tar -zxvf flink-1.11.2-bin-scala_2.11.tgz

进入flink配置目录

masters 文件用于指定 Flink 集群中的主节点(JobManager)地址。它帮助集群中的各个节点知道主节点的位置,从而能够正确地连接和通信。

 flink-conf.yaml 文件是 Flink 的主要配置文件,用于配置 Flink 集群的各种参数。这个文件包含了丰富的配置选项,涵盖了从基本的集群设置到高级的性能调优。

其中的常见配置:

  • jobmanager.rpc.address:

    • 描述: 指定 JobManager 的主机名或IP地址。
    • 示例jobmanager.rpc.address: localhost
  • jobmanager.rpc.port:

    • 描述: 指定 JobManager 的RPC端口号。
    • 示例jobmanager.rpc.port: 6123
  • jobmanager.memory.process.size:

    • 描述: 指定 JobManager 的总内存大小。
    • 示例jobmanager.memory.process.size: 1600m
  • taskmanager.memory.process.size:

    • 描述: 指定 TaskManager 的总内存大小。
    • 示例taskmanager.memory.process.size: 1600m
  • taskmanager.numberOfTaskSlots:

    • 描述: 指定每个 TaskManager 的任务槽位数。
    • 示例taskmanager.numberOfTaskSlots: 4
  • parallelism.default:

    • 描述: 指定默认的并行度。
    • 示例parallelism.default: 4
  • state.backend:

    • 描述: 指定状态后端,可以选择 MemoryStateBackendFsStateBackend 或 RocksDBStateBackend
    • 示例state.backend: rocksdb
  • rest.address:

    • 描述: 指定 REST API 的主机名或IP地址。
    • 示例rest.address: localhost
  • rest.port:

    • 描述: 指定 REST API 的端口号。
    • 示例rest.port: 8081

启动flink,进入到bin目录

 

 

访问8081端口

到此flink安装完毕 

Flink任务提交

在pom.xml文件中配置打包插件

<build><plugins> <!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><!--<encoding>${project.build.sourceEncoding} </encoding>--></configuration></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!-- zip -d learn_spark.jar META- INF/*.RSA META-INF/*.DSA META-INF/*.SF --><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 可以设置jar包的入口类(可选) --><mainClass>com.demo.flink.usage.stream.StreamProcessorApplication</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

点击package打成jar包

界面提交

上传jar包,信息无误后,点击提交

在这里就能看到输出结果了

命令行提交

上传jar包至linux

进入flink的bin目录

执行命令

./flink run -c com.demo.flink.usage.stream.StreamProcessorApplication /usr/local/flink-usage-1.0-SNAPSHOT.jar

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com