Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-30 Thread Xingbo Huang
Hi Pierre, Have you ever thought of declaring your entire json as a string field in `Table` and putting the parsing work in UDF? Best, Xingbo Pierre Oberholzer 于2020年12月1日周二 上午4:13写道: > Hi Xingbo, > > Many thanks for your follow up. Yes you got it right. > So using Table API and a ROW object f

Re: Best way to handle data streams for non-sealed trait hierarchies

2020-11-30 Thread Salva Alcántara
To be clear, the main idea would be to define the "general framework" in terms of DataStream[Event], where `Event` would be a trait capturing the required commonalities. Within each specific application, one would work with different types of events but would ultimately perform a union in order to

Best way to handle data streams for non-sealed trait hierarchies

2020-11-30 Thread Salva Alcántara
I know that for ADTs (sealed traits) there are some ongoing efforts to overcome the performance degradation caused by the kryo fallback (see https://github.com/apache/flink/pull/12929). E.g., ``` sealed trait Event { def id: Int } case class Pageview(id: Int, page: String) extends Event case cla

Re: Filter Null in Array in SQL Connector

2020-11-30 Thread Rex Fenley
Hello, Any updates on this bug? Thanks! On Fri, Nov 20, 2020 at 11:51 AM Rex Fenley wrote: > Btw, this is what our source and sink essentially look like, with some > columns redacted. > > CREATE TABLE source_kafka_data ( > id BIGINT, > roles ARRAY, > PRIMARY KEY (id) NOT ENFORCED >

Re: Anomalous spikes in aggregations of keyed data

2020-11-30 Thread Kegel, Mark
At the moment we checkpoint every minute. I can turn this frequency down but I’m not sure that will fix/hide the issue. Mark From: Arvid Heise Date: Monday, November 30, 2020 at 2:33 PM To: Kegel, Mark Cc: user@flink.apache.org Subject: Re: Anomalous spikes in aggregations of keyed data Hi Ma

Re: Anomalous spikes in aggregations of keyed data

2020-11-30 Thread Arvid Heise
Hi Mark, could you double check if these spikes co-occur with checkpointing? If there is an alignment, certain channels are blocked from taking in data. If all keys are more or less contained in a shard with less data, it would why only these keys are affected. On Mon, Nov 30, 2020 at 9:27 PM Keg

Re: problem with kafka on multiple partitions and large size message

2020-11-30 Thread Arvid Heise
Hi Youzha, could you double-check if your filter function is actually just filtering out all messages? For example, could you replace the filter by (record -> true)? Filtering all records would most certainly describe the behavior best. If that doesn't work, we need to go back to the basics: whic

Anomalous spikes in aggregations of keyed data

2020-11-30 Thread Kegel, Mark
We have a high volume (600-700 shards) kinesis data stream that we are doing a simple keying and aggregation on. The logic is very simple: kinesis source, key by fields (A,B,C), window (1-minute, tumbling), aggregate by summing over integer field R, connect to sink. We are seeing some anomalous

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-30 Thread Pierre Oberholzer
Hi Xingbo, Many thanks for your follow up. Yes you got it right. So using Table API and a ROW object for the nested output of my UDF, and since types are mandatory, I guess this boils down to: - How to nicely specify the types for the 100k fields : shall I use TypeInformation [1] or better retriev

Re: problem with kafka on multiple partitions and large size message

2020-11-30 Thread Youzha
Hi, Sorry for my last pict attachment. there is still filter transform process before produce to kafka. when i check it again, if i use rebalance function, there will be stuck on the first process after consume message. on my last post. the process stuck on filter process. After consume from topic

Re: problem with kafka on multiple partitions and large size message

2020-11-30 Thread Youzha
Hi Arvid, thanks for your reply. yes i’m using exactly once mode and i’ve enable checkpointing. env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE); this is my flow in the web UI. i’m trying to do the simple thing. read from kafka and then sink it to another topic. this is the web

Re: problem with kafka on multiple partitions and large size message

2020-11-30 Thread Arvid Heise
Hi Youzha, do you use exactly once mode in kafka producer? Make sure that you have enabled checkpointing and set the interval appropriately. You can also see the message flow in the web UI. Check if something is reaching the sink. Aside from that, if you use window operators make sure that they f

problem with kafka on multiple partitions and large size message

2020-11-30 Thread Youzha
Hi, i’m using kafka consumer and producer on flink. i’ve succeed to consume, transform, and produce to another topic on my development env ( 1 partition and a few dummy msg) but when i try to submit job on production env ( 20 partitions and the size is about 100Gb ), there is no one message produ

Re: Flink 1.11 avro format question

2020-11-30 Thread Dawid Wysakowicz
Hi, I managed to backport the change to the 1.11 branch. It should be part of the 1.11.3 release. Best, Dawid On 25/11/2020 16:23, Hongjian Peng wrote: > Thanks for Danny and Dawid's quick reply. > > Dawid, I find your fix > at  > https://github.com/apache/flink/pull/14085/commits/bc9caab71f510