Re: Re:flink消费kafka出现GC问题

2021-01-21 文章 nick
我没有写代码,我全是flink sql写的业务逻辑 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 不同程序间checkpoint迁移

2021-01-21 文章 Yun Tang
Hi, Flink state processor[1] 应该可以满足你的需求。 [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html 祝好 唐云 From: gimlee Sent: Friday, January 22, 2021 12:07 To: user-zh@flink.apache.org Subject: 不同程序间checkpoint迁移 程序A:jar sour

Re:flink消费kafka出现GC问题

2021-01-21 文章 Ye Chen
@nick 我们之前也遇到的过GC导致任务挂掉问题,后来排查发现flink代码写的有问题,在@close方法中关闭数据库连接,但是事实上@close方法未起作用,导致资源未释放OOM。 hope this can help you At 2021-01-22 14:18:51, "nick" wrote: >org.apache.flink.runtime.JobException: Recovery is suppressed by >NoRestartBackoffTimeStrategy > at >org.apache.flink.ru

flink消费kafka出现GC问题

2021-01-21 文章 nick
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.ru

flink sink kafka from checkpoint run failed

2021-01-21 文章 maokeluo
flink: 1.11.2 kafka:2.11 发送数据到kafka,任务运行一段时间后取消,从checkpoint重新启动,任务启动失败,报错:org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1。 查看kafka-client未发现相关的异常message -- Sent from: http://apache-flink.147419.n8.nabble.com/

不同程序间checkpoint迁移

2021-01-21 文章 gimlee
程序A:jar source: kafka 程序B:Flink SQL source: kafka 使用同一个topic, group id 如果需要把A停掉,使用B替换A,需要把A的checkpoint中的数据、kafka的分区和offset信息改成程序B的checkpoint,请问有办法或者有工具实现嘛? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkUserCodeClassLoader在session模式的集群下是如何卸载类的

2021-01-21 文章 Xintong Song
standalone 集群 session 模式,作业的 main 方法应该是在 client 端执行的。 Thank you~ Xintong Song On Fri, Jan 22, 2021 at 9:52 AM Asahi Lee <978466...@qq.com> wrote: > 你好! >       >  我使用的是flink-1.12.0版本,启动的单机集群;在我的flink程序main方法中,我使用URLClassLoader加载了一个 > http://a.jar的jar包 ,通过该类加载器加载

Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-21 文章 Shengkai Fang
hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。 [1] https://github.com/apache/flink/pull/13800 Land 于2021年1月22日周五 上午11:28写道: > 可能是没有下推到MySQL执行。 > 问题和我遇到的类似: > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-21 文章 Land
可能是没有下推到MySQL执行。 问题和我遇到的类似:http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink sql去重问题

2021-01-21 文章 guaishushu1...@163.com
看到社区文档说是Blink的去重是一种特殊Top-N。经了解Top-N会保留一个最小堆,麻烦问下那其他数据是被机制清除了,还是会保存在内存中呀。用了这个Blink去重还需要用Idle State Retention Time来设置状态的TTL吗? guaishushu1...@163.com

Flink SQL csv格式分隔符设置失败

2021-01-21 文章 gimlee
Flink 版本:1.11.1 我的flink sql: create table stream_tmp.t1( log string) with ( 'connector' = 'kafka', 'topic' = 't1', 'properties.bootstrap.servers' = 'x:9092', 'properties.group.id' = 'flink_test_01', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'csv.field-delimiter' =

Re: 根据业务需求选择合适的flink state

2021-01-21 文章 张锴
@赵一旦 我今天调整一下逻辑再试试 赵一旦 于2021年1月22日周五 上午10:10写道: > 我理解你要的最终mysql结果表是: > 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间); > > 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。 > > > 如上按照我的方案就可以实现哈。 > > xuhaiLong 于2021年1月22日周五 上午10:03写道: > > > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 > > userId,然后对用户的每

Flink的StreamFileSink和1.12提供的FileSink中,BucketsBuilder的createBucketWriter中仅支持recoverableWriter。

2021-01-21 文章 赵一旦
如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗? 我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。 不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。 报错是只有hdfs才支持recoverableWriter。 有人知道如何解吗?

Re: 根据业务需求选择合适的flink state

2021-01-21 文章 赵一旦
我理解你要的最终mysql结果表是: 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间); 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。 如上按照我的方案就可以实现哈。 xuhaiLong 于2021年1月22日周五 上午10:03写道: > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试? > > >

回复: 根据业务需求选择合适的flink state

2021-01-21 文章 xuhaiLong
可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试? 在2021年1月21日 18:24,张锴 写道: 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有

回复: 根据业务需求选择合适的flink state

2021-01-21 文章 xuhaiLong
Hi, 看了下你的代码,用session window 时长为1分钟,表示的是user1 的窗口在1分钟内没收到数据就进行一个触发计算,所以最终得到的结果应该是需要你把 user1 产生的每条记录的时长做一个sum,如果只看单条维度是不全的 在2021年1月21日 18:24,张锴 写道: 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个

FlinkUserCodeClassLoader??session??????????????????????????

2021-01-21 文章 Asahi Lee
??        ??flink-1.12.0,??flinkmain??URLClassLoader??http://a.jar??jarrest api jar/run??uber-jar, job??

[DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-01-21 文章 Leonard Xu
Hello, everyone I want to start the discussion of FLIP-162: Consistent Flink SQL time function behavior[1]. We’ve some initial discussion of several problematic functions in dev mail list[2], and I think it's the right time to resolve them by a FLIP. Currently some time function behaviors

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-21 文章 Leonard Xu
> Before the changes, as I am writing this reply, the local time here is > 2021-01-21 12:03:35 (Beijing time, UTC+8). > And I tried these 5 functions in sql client, and got: > > Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE, > CURRENT_TIME; > +-+-

Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-21 文章 YueKun
非常抱歉,之前的信息有误,我是用 MAT 工具解析后,exclude all phantom/weak/soft etc. references 后,最后的信息是: org.apache.flink.util.ChildFirstClassLoader @ 0xb0c14308 class com.mysql.cj.jdbc.Driver @ 0xb0f95a20 com.mysql.cj.jdbc.Driver @ 0xb10183c8 java.sql.DriverInfo @ 0xb10183b0 java.lang.Object[12] @ 0xb1499308 java.uti

Re: 根据业务需求选择合适的flink state

2021-01-21 文章 赵一旦
我表达的方法是按照session window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。 实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。 然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。 赵一旦 于2021年1月21日周四 下午8:28写道: > 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。 > > 张

Re: 根据业务需求选择合适的flink state

2021-01-21 文章 赵一旦
我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。 张锴 于2021年1月21日周四 下午6:25写道: > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 > > context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 > 下面是我的部分代码逻辑: > > val ds = dataStream > .filter(_.liveType ==

Re:Re:flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 文章 Jeff
但udtf需要指定结果返回个数及字段名,如:@FunctionHint(output = @DataTypeHint("ROW"),但我希望把这个udtf弄得通用一点,因为json结构是不确定的,不受字段名跟字段个数限制。 在 2021-01-21 18:02:06,"Michael Ran" 写道: >特定的的map也是需要类型的,如果你在乎类型建议里面统一以字符串的udtf实现,后续再进行转换 >在 2021-01-21 18:35:18,"Jeff" 写道: >>hi all, >> >> >>有没有什么办法可以将json转成map呢?类似于str_t

Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-21 文章 YueKun
hi,JDBC 的问题目前已经解决了,但是又出了个新的问题,是 Kafka 的,还是同样的这个任务,Pyflink的,从Mysql读数据然后写入Kafka,任务结束后,还是有内存不释放,目前从jmap看应该是如下导致的。 class com.sun.jmx.mbeanserver.StandardMBeanIntrospector @ 0xad5d01b0 com.sun.jmx.mbeanserver.MBeanIntrospector$MBeanInfoMap @ 0xad715a00 java.util.WeakHashMap$Entry[32] @ 0xafc583f0 java.

Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-21 文章 Leonard Xu
> 看了下,是1.12才开始支持么,1.11是不行的嘛? 是的,1.11不支持,文档也是有版本的,如果对应版本的文档里没有该功能介绍,那就是不支持的。

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 张锴
@赵一旦 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 张锴 于2021年1月21日周四 下午7:13写道: > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 > > 赵一旦 于2021年1月21日周四 下午7:05写道: > >> @Michael Ran; 嗯嗯,没关系。 >> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 >> >> 目前看文档有streamFileSink,还有FileSi

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 张锴
我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 赵一旦 于2021年1月21日周四 下午7:05写道: > @Michael Ran; 嗯嗯,没关系。 > > @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 > > 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写

Re:Re:flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 文章 Jeff
但udtf需要指定结果返回个数及字段名,如:@FunctionHint(output = @DataTypeHint("ROW"),但我希望把这个udtf弄得通用一点,因为json结构是不确定的,不受字段名跟字段个数限制。 在 2021-01-21 18:02:06,"Michael Ran" 写道: >特定的的map也是需要类型的,如果你在乎类型建议里面统一以字符串的udtf实现,后续再进行转换 >在 2021-01-21 18:35:18,"Jeff" 写道: >>hi all, >> >> >>有没有什么办法可以将json转成map呢?类似于str

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 赵一旦
@Michael Ran; 嗯嗯,没关系。 @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。 Michael Ran 于2021年1月21日周四 下午7:01写道: > > 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以

Re:flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 文章 Michael Ran
特定的的map也是需要类型的,如果你在乎类型建议里面统一以字符串的udtf实现,后续再进行转换 在 2021-01-21 18:35:18,"Jeff" 写道: >hi all, > > >有没有什么办法可以将json转成map呢?类似于str_to_map函数。 > > >版本:flink 1.11 >planner: blink sql > > >需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, >UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。

flink sink到kafka,报错Failed to construct kafka producer

2021-01-21 文章 lp
flink1.11.2 自定义source循环产生数据然后sink到kafka 采用application Mode部署作业到yarn, jobmanager.log报错如下:(jobmanager和taskmanager的container都分配了,报错都是如下) 2021-01-21 10:52:17,742 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to re

Re:Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 Michael Ran
很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法 在 2021-01-21 18:45:06,"张锴" 写道: >import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, >DateTimeBucketer} > >sink.setBucketer sink.setWriter用这种方式试试 > > > >赵一旦

Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 张锴
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer} sink.setBucketer sink.setWriter用这种方式试试 赵一旦 于2021年1月21日周四 下午6:37写道: > @Michael Ran > 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > > Michael Ran 于2021年1月21日周四 下午5:23写道: > > > 这里应该是用了hdfs 的特定A

Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 赵一旦
@Michael Ran 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 Michael Ran 于2021年1月21日周四 下午5:23写道: > 这里应该是用了hdfs 的特定API吧,文件系统没兼容public > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > 在 2021-01-21 17:18:23,"赵一旦" 写道: > >具体报错信息如下: > > > >java.lang.UnsupportedOperationExce

flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 文章 Jeff
hi all, 有没有什么办法可以将json转成map呢?类似于str_to_map函数。 版本:flink 1.11 planner: blink sql 需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。

flink sql是否有json_to_map函数,或者实现方法是怎么样的?

2021-01-21 文章 Jeff
hi all, 有没有什么办法可以将json转成map呢?类似于str_to_map函数。 版本:flink 1.11 planner: blink sql 需求背景: UDF函数通过http请求获得了json字符串,希望能够直接使用json内字段, UDTF可以满足一部分要求,但不灵活,因为定义UDTF时需要指定输出字段及类型,限制很大。

Re: 根据业务需求选择合适的flink state

2021-01-21 文章 张锴
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 下面是我的部分代码逻辑: val ds = dataStream .filter(_.liveType == 1) .keyBy(1, 2) .window(EventTimeSessionWindows.withGap(Time.minute

Re: Flink与MySQL对接相关的问题

2021-01-21 文章 Land
下推的问题也有一个bug在跟踪: https://issues.apache.org/jira/browse/FLINK-18778 -- Sent from: http://apache-flink.147419.n8.nabble.com/

使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

2021-01-21 文章 刘海
HI! 这边做测试时遇到一个问题: 在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" + ") WITH (" + "'connector' = 'jdbc'," + "'url' = 'jdbc:mys

Re:Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 Michael Ran
这里应该是用了hdfs 的特定API吧,文件系统没兼容public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} 在 2021-01-21 17:18:23,"赵一旦" 写道: >具体报错信息如下: > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are >only supported for HDFS >at org.apache.flink.runtime.fs.hdfs.HadoopReco

Re: Re: flink监控

2021-01-21 文章 赵一旦
每个节点,即进程,直接监控进程的cpu,内存就可以。没有更小的粒度。 通信的话看进程的io读写,网络读写等吧。此外flink的rest api可以获取flink web ui能看到的全部信息,比如节点之间已发送records数量等。 penguin. 于2021年1月18日周一 上午10:55写道: > > 那请问对于每个节点的CPU、内存使用率以及节点之间的通信量如何进行实时监控获取数据呢? > > > > > > > > > > > > > > > > > > 在 2021-01-18 10:15:22,"赵一旦" 写道: > >slot好像只是逻辑概念,监控意义不大,没有资源

Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 赵一旦
除此以外,FlinkSQL读现有的hive数据仓库也是失败。配置okhive的catalog,表信息都能出来,但select操作就是失败。 赵一旦 于2021年1月21日周四 下午5:18写道: > 具体报错信息如下: > > java.lang.UnsupportedOperationException: Recoverable writers on Hadoop > are only supported for HDFS > at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( > HadoopRecov

Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 赵一旦
具体报错信息如下: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.( HadoopRecoverableWriter.java:61) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem .createRecoverableWriter(HadoopFi

Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-21 文章 赵一旦
Recoverable writers on Hadoop are only supported for HDFS 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。

Flink SQL建hive表为什么没有if not exists

2021-01-21 文章 gimlee
有个疑问,为什么在使用flink sql建hive表的时候没有 create table IF NOT EXISTS db.t1 (); 只能使用 create table db.t1 (); 而在创建动态表的时候,可以使用IF NOT EXISTS。 使用的版本是1.11.1 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-21 文章 gimlee
看了下,是1.12才开始支持么,1.11是不行的嘛? -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql升级到1.12 基于eventtime的window聚合没有watermark导致没有数据输出

2021-01-21 文章 Jie Wong
String ddl = "CREATE TABLE orders (\n" + " user_id INT,\n" + " product STRING,\n" + " amount INT,\n" + " `time` bigint,\n" + " `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(`time`)),\n" + " WATERMARK

Re: flink1.12.0 native k8s启动不了

2021-01-21 文章 yzxs
谢谢,问题已解决。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Application Mode部署作业到yarn,找不到.properties文件

2021-01-21 文章 lp
jobManager的完整报错日志如下: 2021-01-21 07:53:23,023 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 2021-01-21 07:53:23,027 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Star

Application Mode部署作业到yarn,找不到.properties文件

2021-01-21 文章 lp
flink processFunction程序,main()中采用ParameterTool读取resources文件夹下的pro.properties配置文件(kafka地址等);IDEA本地执行完全OK,maven打成jar包后,采用yarn application的方式部署作业, bin/flink run-application -t yarn-application /opt/quickstart-0.1.jar ;作业失败,查看yarn的container日志发现如下错误: Caused by: org.apache.flink.client.program.ProgramIn