Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-27 Thread Xingbo Huang
Hi Pierre, Sorry for the late reply. Your requirement is that your `Table` has a `field` in `Json` format and its key has reached 100k, and then you want to use such a `field` as the input/output of `udf`, right? As to whether there is a limit on the number of nested key, I am not quite clear.

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard, > From my understanding, your case is not a pure deduplication case but > want to both keep the previous record and current record, thus the > deduplication query can not satisfy your requirement. > Indeed, that's what I came to realise during our discussion on this email chain. I'm

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Konstantin Knauf
Hi Laurent, With respect to Ververica Platform, we will support Flink 1.12 and add "upsert-kafka" as a packaged connector in our next minor release which we target for February. Cheers, Konstantin On Thu, Nov 12, 2020 at 3:43 AM Jark Wu wrote: > Hi Laurent, > > 1. Deduplicate with keeping the

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Leonard Xu
Hi, Laurent > > I'm not sure that would do what I want though. As far as I understand, the > deduplication query will always remember any values it has seen. So if I > have, for a specific primary key, the following values in another field: "a", > "a", "b", "b", "a", "a", the deduplication quer

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard, thank you for your answer. I'm not sure that would do what I want though. As far as I understand, the deduplication query will always remember any values it has seen. So if I have, for a specific primary key, the following values in another field: "a", "a", "b", "b", "a", "a", the ded

Re: Batch compressed file output

2020-11-27 Thread Matthias Pohl
Hi Flavio, others might have better ideas to solve this but I'll give it a try: Have you considered extending FileOutputFormat to achieve what you need? That approach (which is discussed in [1]) sounds like something you could do. Another pointer I want to give is the DefaultRollingPolicy [2]. It l

Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Dongwon Kim
Hi Timo, Okay, then the aggregate function should look like this: > public static class Agg extends AggregateFunction ArrayList> { > @Override > public ArrayList createAccumulator() { > return new ArrayList<>(); > } > @Override > public Integer[] getValue(ArrayList acc)

Batch compressed file output

2020-11-27 Thread Flavio Pompermaier
Hello guys, I have to write my batch data (Dataset) to a file format. Actually what I need to do is: 1. split the data if it exceeds some size threshold (by line count or max MB) 2. compress the output data (possibly without converting to the hadoop format) Are there any suggestions

Re: Duplication error on Kafka Connector Libraries

2020-11-27 Thread Arvid Heise
The most common cause of such issues is usually class loading. You probably have added the flink-connector-kafka also to flink-dist/libs. But the connector is only meant to be bundled with your job jar afaik. Right now, you have the Kafka classes loaded in the user code classloader and in the syst

Re: PyFlink Table API and UDF Limitations

2020-11-27 Thread Niklas Wilcke
Hi Xingbo, thanks for sharing. This is very interesting. Regards, Niklas > On 27. Nov 2020, at 03:05, Xingbo Huang wrote: > > Hi Niklas, > > Thanks a lot for supporting PyFlink. In fact, your requirement for multiple > input and multiple output is essentially Table Aggregation Functions[1].

Re: Caching

2020-11-27 Thread Dongwon Kim
Hi Navneeth, I didn't quite understand how async io can be used here. It would be great > if you can share some info on it. You need to add an async operator in the middle of your pipeline in order to enrich your input data. [1] and [2] will help you. Also how are you propagating any changes to

Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Timo Walther
Hi, first of all we don't support ListTypeInfo in Table API. Therefore, it is treated as a RAW type. The exception during exception creation is a bug that should be fixed in future version. But the mismatch is valid: ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. Can you