Re: Add customized metrics in Sink

2024-09-19 Thread Péter Váry
Hi Tengda, Which sink version do you use? I have recently (Flink 1.18, 1.19) added the metrics to the initialization context for the committers for the new SinkV2 API [1]. With this you have a place to collect the desired metrics. I hope this helps, Peter [1] https://cwiki.apache.org/confluence/p

Re: Get types from Row

2024-08-26 Thread Péter Váry
Hi Jose, I have facing a similar issue when working on schema evolution in the Iceberg connector. The RowData is optimized in a way, that it is expected to have the same schema for the lifetime of the deployment. This avoids any extra serialization for every record. To work around this I see 2 opt

Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-25 Thread Péter Váry
c7a27fc5/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java> >>>>>> and >>>>>> related code seem incredibly useful even outside of the context of the >>>>>> Paimon table format. >>

Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Péter Váry
Paimon table format is great, but at Wikimedia Foundation we > are on the Iceberg train. Imagine if there was a flink-cdc full database > sync to Flink IcebergSink! > > > > > On Thu, May 23, 2024 at 3:47 PM Péter Váry > wrote: > >> I will ask Marton about the sl

Re: "Self-service ingestion pipelines with evolving schema via Flink and Iceberg" presentation recording from Flink Forward Seattle 2023

2024-05-23 Thread Péter Váry
I will ask Marton about the slides. The solution was something like this in a nutshell: - Make sure that on job start the latest Iceberg schema is read from the Iceberg table - Throw a SuppressRestartsException when data arrives with the wrong schema - Use Flink Kubernetes Operator to restart your

Re: Iceberg connector

2024-04-16 Thread Péter Váry
ken by a task among these 2 parallel > tasks). > > I expected the job vertex to have taken almost the same amount of time in > both these scenarios because the split size is the same ( same # of data > files and each data file is of the same size) and all the tasks return 2500 >

Re: Pyflink w Nessie and Iceberg in S3 Jars

2024-04-16 Thread Péter Váry
Is it intentional, that you are using iceberg-flink-runtime-1.16-1.3.1.jar with 1.18.0 PyFlink? This might cause issues later. I would try to synchronize the Flink versions throughout all the dependencies. On Tue, Apr 16, 2024, 11:23 Robert Prat wrote: > I finally managed to make it work followi

Re: Iceberg connector

2024-04-16 Thread Péter Váry
Hi Chetas, See my answers below: On Tue, Apr 16, 2024, 06:39 Chetas Joshi wrote: > Hello, > > I am running a batch flink job to read an iceberg table. I want to > understand a few things. > > 1. How does the FlinkSplitPlanner decide which fileScanTasks (I think one > task corresponds to one da

Re: Optimize exact deduplication for tens of billions data per day

2024-04-11 Thread Péter Váry
h(originalKey) % 10) >> Then in the KeyProcessOperator using MapState instead of ValueState >> MapState mapState >> >> There's about 10 OriginalKey for each mapState >> >> Hope this will help >> >> On Fri, Mar 29, 2024 at 9:24 PM Péte

Re: Optimize exact deduplication for tens of billions data per day

2024-03-29 Thread Péter Váry
Hi Lei, Have you tried to make the key smaller, and store a list of found keys as a value? Let's make the operator key a hash of your original key, and store a list of the full keys in the state. You can play with your hash length to achieve the optimal number of keys. I hope this helps, Peter

Re: IcebergSourceReader metrics

2024-03-28 Thread Péter Váry
Hi Chetas, Are you looking for this information? * public IcebergSourceReaderMetrics(MetricGroup metrics, String fullTableName) {* *MetricGroup readerMetrics =* *metrics.addGroup("IcebergSourceReader").addGroup("table", fullTableName);* *this.assignedSplits = readerMetrics.counter

Re: Global connection open and close

2024-03-21 Thread Péter Váry
Hi Jacob, Flink jobs, tasks typically run on multiple nodes/servers. This means that it is not possible to have a connection shared on job level. You can read about the architecture in more detail in the docs. [1] I hope this helps, Péter [1] - https://nightlies.apache.org/flink/flink-docs-rele

Re: FlinkSource to read iceberg table in Batch mode

2024-03-15 Thread Péter Váry
Hi Chetas, How sure are you that your job has no other unbounded source? I have seen this working in several cases, but if you could help with providing a sort executable example, I would like to take a look. Thanks, Peter On Wed, Mar 13, 2024, 20:55 Chetas Joshi wrote: > Hello, > > I am usin

Re: Is the kafka-connector doc missing a dependency on flink-connector-base

2023-12-05 Thread Péter Váry
Hi JM, The dependency is set here: https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/pom.xml#L50-L55 org.apache.flink flink-connector-base ${flink.version} provided We expect that the non-provided dep

Re: Connect to kerberos enabled hive cluster

2023-11-06 Thread Péter Váry
Hi Bo, You might be interested in using delegation tokens for connecting to Hive. The faeture was added here: https://issues.apache.org/jira/browse/FLINK-32223 Peter On Tue, Nov 7, 2023, 03:16 Bo <99...@qq.com> wrote: > Hello community, > > > Does anyone have succeeded in using flink with a K

Re: Side outputs from sinks

2023-10-18 Thread Péter Váry
Hi Aian, Which sink API are you using? Have you tried the Sink v2 API [1]? If you implement the WithPostCommitTopology interface [2], then you can provide a follow-up step after the commits are finished. I have not tried yet, but I expect that the failed Committables are emitted as well, and avai

Re: Using HybridSource

2023-07-04 Thread Péter Váry
Was it a conscious decision that HybridSource only accept Sources, and does not allow mapping functions applied to them before combining them? On Tue, Jul 4, 2023, 23:53 Ken Krugler wrote: > Hi Oscar, > > Couldn’t you have both the Kafka and File sources return an Either from CSV File, Protobuf