Re: Webinar: Unlocking the Power of Apache Beam with Apache Flink

2020-05-28 Thread Maximilian Michels
Thanks to everyone who joined and asked questions. Really enjoyed this new format! -Max On 28.05.20 08:09, Marta Paes Moreira wrote: > Thanks for sharing, Aizhamal - it was a great webinar! > > Marta > > On Wed, 27 May 2020 at 23:17, Aizhamal Nurmamat kyzy > mailto:aizha...@apache.org>> wrote:

How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Guodong Wang
Hi ! I want to use Flink SQL to process some json events. It is quite challenging to define a schema for the Flink SQL table. My data source's format is some json like this { "top_level_key1": "some value", "nested_object": { "nested_key1": "abc", "nested_key2": 123, "nested_key3": ["element1", "

Re: Multiple Sinks for a Single Soure

2020-05-28 Thread Alexander Fedulov
Hi Prasanna, if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ... *)*;

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Benchao Li
Hi Guodong, I think you almost get the answer, 1. map type, it's not working for current implementation. For example, use map, if the value if non-string json object, then `JsonNode.asText()` may not work as you wish. 2. list all fields you cares. IMO, this can fit your scenario. And you can set f

New dates for Flink Forward Global Virtual 2020

2020-05-28 Thread Ana Vasiliuk
Hi everyone, Flink Forward Global Virtual 2020 is now a 4-day conference, featuring two training days on October 19 & 20! The organizers have decided to extend the training program for this event to ensure that you get the most out of your time with our team of Flink experts. *New dates:* Apache

Re: Flink TTL for MapStates and Sideoutputs implementations

2020-05-28 Thread Alexander Fedulov
Hi Jaswin, I would like to clarify something first - what do you key your streams by, when joining them? It seems that what you want to do is to match each CartMessage with a corresponding Payment that has the same orderId+mid. If this is the case, you probably do not need the MapState in the firs

Re: Flink TTL for MapStates and Sideoutputs implementations

2020-05-28 Thread Jaswin Shah
Thanks for responding Alexander. We have solved the problem now with ValueState now. Basically, here we are implementing outer join logic with custom keyedCoprocessFunction implementations. From: Alexander Fedulov Sent: 28 May 2020 17:24 To: Jaswin Shah Cc: use

Re: Installing Ververica, unable to write to file system

2020-05-28 Thread Marta Paes Moreira
Hi, Charlie. This is not the best place for questions about Ververica Platform CE. Please use community-edit...@ververica.com instead — someone will be able to support you there! If you have any questions related to Flink itself, feel free to reach out to this mailing list again in the future. T

Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Chesnay Schepler
If it were a class-loading issue I would think that we'd see an exception of some kind. Maybe double-check that flink-shaded-hadoop is not in the lib directory. (usually I would ask for the full classpath that the HS is started with, but as it turns out this isn't getting logged :( (FLINK-18008

How do I make sure to place operator instances in specific Task Managers?

2020-05-28 Thread Felipe Gutierrez
For instance, if I have the following DAG with the respect parallelism in parenthesis (I hope the dag appears real afterall): source01 -> map01(4) -> flatmap01(4) \ |-> keyBy -> reducer(8) source02 -> map02(4) -> flatmap02(4) / And I have 4 TMs in 4 machines with 4 cores each. I would like

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Guodong Wang
Benchao, Thank you for your quick reply. As you mentioned, for current scenario, approach 2 should work for me. But it is a little bit annoying that I have to modify schema to add new field types when upstream app changes the json format or adds new fields. Otherwise, my user can not refer the fi

Re: Apache Flink - Question about application restart

2020-05-28 Thread M Singh
Hi Till/Zhu/Yang:  Thanks for your replies. So just to clarify - the job id remains same if the job restarts have not been exhausted.  Does Yarn also resubmit the job in case of failures and if so, then is the job id different. ThanksOn Wednesday, May 27, 2020, 10:05:40 AM EDT, Till Rohrmann

RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Hailu, Andreas
Just created a dump, here's what I see: "Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5 os_prio=0 tid=0x7f93a5a2c000 nid=0x5692 runnable [0x7f934a0d3000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio

Re: ClusterClientFactory selection

2020-05-28 Thread M Singh
HI Kostas/Yang/Lake: I am looking at aws emr and did not see the execution.target in the flink-conf.yaml file under flink/conf directory. Is it defined in another place ?   I also did search in the current flink source code and did find mention of it in the md files but not in any property fil

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Israel Ekpo
Guowei, What do we need to do to add support for it? How do I get started on that? On Wed, May 27, 2020 at 8:53 PM Guowei Ma wrote: > Hi, > I think the StreamingFileSink could not support Azure currently. > You could find more detailed info from here[1]. > > [1] https://issues.apache.org/jir

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Leonard Xu
Hi, guodong > I am wondering if Flink SQL can/will support the flexible schema in the > future, It’s an interesting topic, this feature is more close to the scope of schema inference. The schema inference should come in next few releases. Best, Leonard Xu > for example, register the tab

Streaming multiple csv files

2020-05-28 Thread Nikola Hrusov
Hello, I have multiple files (file1, file2, file3) each being CSV and having different columns and data. The column headers are finite and we know their format. I would like to take them and parse them based on the column structure. I already have the parsers e.g.: file1 has columns (id, firstna

Re: Multiple Sinks for a Single Soure

2020-05-28 Thread Prasanna kumar
Alexander, Thanks for the reply. Will implement and come back in case of any questions. Prasanna. On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov wrote: > Hi Prasanna, > > if the set of all possible sinks is known in advance, side outputs will be > generic enough to express your requirements

Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Chesnay Schepler
Looks like it is indeed stuck on downloading the archive. I searched a bit in the Hadoop JIRA and found several similar instances: https://issues.apache.org/jira/browse/HDFS-6999 https://issues.apache.org/jira/browse/HDFS-7005 https://issues.apache.org/jira/browse/HDFS-7145 It is supposed to be

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Benchao Li
Hi Guodong, Does the RAW type meet your requirements? For example, you can specify map type, and the value for the map is the raw JsonNode parsed from Jackson. This is not supported yet, however IMO this could be supported. Guodong Wang 于2020年5月28日周四 下午9:43写道: > Benchao, > > Thank you for your

Re: Apache Flink - Question about application restart

2020-05-28 Thread Till Rohrmann
Hi, Yarn won't resubmit the job. In case of a process failure where Yarn restarts the Flink Master, the Master will recover the submitted jobs from a persistent storage system. Cheers, Till On Thu, May 28, 2020 at 4:05 PM M Singh wrote: > Hi Till/Zhu/Yang: Thanks for your replies. > > So just

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Guodong Wang
Yes. Setting the value type as raw is one possible approach. And I would like to vote for schema inference as well. Correct me if I am wrong, IMO schema inference means I can provide a method in the table source to infer the data schema base on the runtime computation. Just like some calcite adapt

Re: Tumbling windows - increasing checkpoint size over time

2020-05-28 Thread Till Rohrmann
Hi Matt, when using tumbling windows, then the checkpoint size is not only dependent on the number of keys (which is equivalent to the number of open windows) but also on how many events arrive for each open window because the windows store every window event in its state. Hence, it can be the cas

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Till Rohrmann
Hi Israel, thanks for reaching out to the Flink community. As Guowei said, the StreamingFileSink can currently only recover from faults if it writes to HDFS or S3. Other file systems are currently not supported if you need fault tolerance. Maybe Klou can tell you more about the background and wha

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Israel Ekpo
Hi Till, Thanks for your feedback and guidance. It seems similar work was done for S3 filesystem where relocations were removed for those file system plugins. https://issues.apache.org/jira/browse/FLINK-11956 It appears the same needs to be done for Azure File systems. I will attempt to connec

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Till Rohrmann
I think what needs to be done is to implement a org.apache.flink.core.fs.RecoverableWriter for the respective file system. Similar to HadoopRecoverableWriter and S3RecoverableWriter. Cheers, Till On Thu, May 28, 2020 at 6:00 PM Israel Ekpo wrote: > Hi Till, > > Thanks for your feedback and guid

RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Hailu, Andreas
Okay, I will look further to see if we're mistakenly using a version that's pre-2.6.0. However, I don't see flink-shaded-hadoop in my /lib directory for flink-1.9.1. flink-dist_2.11-1.9.1.jar flink-table-blink_2.11-1.9.1.jar flink-table_2.11-1.9.1.jar log4j-1.2.17.jar slf4j-log4j12-1.7.15.jar A

Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-05-28 Thread LINZ, Arnaud
Hello, I would like to upgrade the performance of my Apache Kudu Sink by using the new “KuduPartitioner” of Kudu API to match Flink stream partitions with Kudu partitions to lower the network shuffling. For that, I would like to implement something like stream.partitionCustom(new KuduFlinkPa

Custom trigger to trigger for late events

2020-05-28 Thread Poornapragna Ts
Hi, I have a simple requirement where i want to have 10 second window with allow late events upto 1 hour. Existing TumblingEventTimeWindows with EventTimeTrigger will work for this. But the EventTimeTrigger, triggers for every incoming event after watermark has passed windows max time. I don't w

Flink Iterator Functions

2020-05-28 Thread Roderick Vincent
Hi, I am brand new to Apache Flink so please excuse any silly questions. I have an Iterator function defined as below and adding it as a source to a Flink stream. But when I try to pass configuration information to it (via a Spring env), what I notice is that one of the threads calls hasNext() a

Dropping messages based on timestamp.

2020-05-28 Thread Joe Malt
Hi, I'm working on a custom TimestampAssigner which will do different things depending on the value of the extracted timestamp. One of the actions I want to take is to drop messages entirely if their timestamp meets certain criteria. Of course there's no direct way to do this in the TimestampAssi

Re: Re: Flink Window with multiple trigger condition

2020-05-28 Thread aj
Hi, I have implemented the below solution and its working fine but the biggest problem with this is if no event coming for the user after 30 min then I am not able to trigger because I am checking time diff from upcoming events. So when the next event comes than only it triggers but I want it to t

Flink Elastic Sink

2020-05-28 Thread aj
Hello All, I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Israel Ekpo
Thanks Till. I will take a look at that tomorrow and let you know if I hit any roadblocks. On Thu, May 28, 2020 at 12:11 PM Till Rohrmann wrote: > I think what needs to be done is to implement > a org.apache.flink.core.fs.RecoverableWriter for the respective file > system. Similar to HadoopReco

Re: Stateful functions Harness

2020-05-28 Thread Boris Lublinsky
Also I have noticed, that a few cludstate jars including statefun-flink-core, statefun-flink-io, statefun-flink-harness are build for Scala 11, is it possible to create versions of those for Scala 12? > On May 27, 2020, at 3:15 PM, Seth Wiesman wrote: > > Hi Boris, > > Example usage of flink

Re: Apache Flink - Question about application restart

2020-05-28 Thread M Singh
Thanks Till - in the case of restart of flink master - I believe the jobid will be different.  Thanks On Thursday, May 28, 2020, 11:33:38 AM EDT, Till Rohrmann wrote: Hi, Yarn won't resubmit the job. In case of a process failure where Yarn restarts the Flink Master, the Master will re

Question on stream joins

2020-05-28 Thread Sudan S
Hi , I have two usecases 1. I have two streams which `leftSource` and `rightSource` which i want to join without partitioning over a window and find the difference of count of elements of leftSource and rightSource and emit the result of difference. Which is the appropriate join function ican use

RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Hailu, Andreas
May I also ask what version of flink-hadoop you're using and the number of jobs you're storing the history for? As of writing we have roughly 101,000 application history files. I'm curious to know if we're encountering some kind of resource problem. // ah From: Hailu, Andreas [Engineering] Sen

Re: ClusterClientFactory selection

2020-05-28 Thread Yang Wang
You could find more information about deployment target here[1]. As you mentioned, it is not defined in the flink-conf.yaml by default. For the code, it is defined in flink-core/DeploymentOptions. [1]. https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#deployment-targets Best, Y

Re: Cannot start native K8s

2020-05-28 Thread Yang Wang
A quick update on this issue. The root cause of this issue is compatibility of kubernetes-client and java 8u252[1]. And we have bumped he fabric8 kubernetes-client version from 4.5.2 to 4.9.2 in master and release-1.11 branch. Now users could deploy Flink on K8s natively with java 8u252. If you r

Re: Apache Flink - Question about application restart

2020-05-28 Thread Zhu Zhu
Restarting of flink master does not change the jobId if one yarn application. To be simple, in a yarn application that runs a flink cluster, the job id of a job does not change once the job is submitted. You can even submit a flink application multiples times to that cluster (if it is session mode)

Re: Flink Elastic Sink

2020-05-28 Thread Yangze Guo
Hi, Anuj. >From my understanding, you could send IndexRequest to the indexer in `ElasticsearchSink`. It will create a document under the given index and type. So, it seems you only need to get the timestamp and concat the `date` to your index. Am I understanding that correctly? Or do you want to e

Re: How do I make sure to place operator instances in specific Task Managers?

2020-05-28 Thread Weihua Hu
Hi, Felipe Flink does not support run tasks on specified TM. You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM. Can you please give the reason for specifying TM? Best Weihua Hu > 2020年5月28日 21:37,Felipe Gutierrez 写道: > > For instance, if I have t

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Benchao Li
Hi Guodong, After an offline discussion with Leonard. I think you get the right meaning of schema inference. But there are two problems here: 1. schema of the data is fixed, schema inference can save your effort to write the schema explicitly. 2. schema of the data is dynamic, in this case the sch

Re: Flink Elastic Sink

2020-05-28 Thread Leonard Xu
Hi,aj In the implementation of ElasticsearchSink, ElasticsearchSink won't create index and only start a Elastic client for sending requests to the Elastic cluster. You can simply extract the index(date value in your case) from your timestamp field and then put it to an IndexRequest[2], Elasti

Re: Re: Re: Flink Window with multiple trigger condition

2020-05-28 Thread Yun Gao
Hi, I think you could use timer to achieve that. In processFunction you could register a timer at specific time (event time or processing time) and get callbacked at that point. It could be registered like ctx.timerService().registerEventTimeTimer(current.lastModified + 6); More det

Re: Question on stream joins

2020-05-28 Thread Yun Gao
Hi Sudan, As far as I know, both join and cogroup requires keys (namely partitioning), thus for the non-keyed scenario, you may have to use low-level connect operator to achieve it. In my opinion it should be something like leftSource.connect(rightSource) .process(new TagCoprocessFu

pyflink Table Api连接 外部系统问题

2020-05-28 Thread 刘亚坤
目前在学习使用pyflink的Table api,请教一个问题: 1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作? 2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。 新手入门,请多指教,感谢。

关于flink sql 滚动窗口无法输出结果集合

2020-05-28 Thread steven chen
数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答 CREATE TABLE user_behavior ( itemCode VARCHAR, ts BIGINT COMMENT '时间戳', t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'-MM-dd HH:mm:ss')), proctime as PROCTIME(), WATERMARK FOR t as t - INTERVAL '5' SECOND ) WITH ( 'connector.type' = '

Re: Running and Maintaining Multiple Jobs

2020-05-28 Thread Yun Tang
Hi Prasanna As far as I know, Flink does not allow to submit new jobgraph without restarting it, and I actually not understand what's your 3rd question meaning. From: Prasanna kumar Sent: Friday, May 29, 2020 11:18 To: Yun Tang Cc: user Subject: Re: Running and