Hi Salva,
I've done exactly that (union for N number of streams in order to perform a
join), and gave a talk at Flink Forward a few years ago:
https://www.youtube.com/watch?v=tiGxEGPyqCg&ab_channel=FlinkForward
On Wed, Dec 4, 2024 at 5:03 AM Salva Alcántara
wrote:
> I have a job which basically
Hi Ken,
Snapshotting is implemented differently in Flink CDC, it doesn't re-use
Debezium's implementation. So you can override some Debezium properties
using "debezium.", but not "debezium.snapshot.".
On Wed, Oct 9, 2024 at 12:46 PM Ken CHUAN YU wrote:
> Hi there
> I have issue to use flink sql
Yes, quite well! :) We've been using it in production for many months now.
On Thu, Oct 3, 2024 at 10:50 AM Ilya Karpov wrote:
> Yaroslav,
> Yep, I saw it, did you try it yourself? Does it work?
>
> чт, 3 окт. 2024 г. в 19:54, Yaroslav Tkachenko :
>
>> https://g
https://github.com/itinycheng/flink-connector-clickhouse is another one, it
supports at least Flink 1.17.
On Thu, Oct 3, 2024 at 7:52 AM Sachin Mittal wrote:
> It works for me with Flink version 1.8.
>
> I am using this in prod. Somehow it’s simpler to use this to ingest data
> into clickhouse t
Hi Ganesh,
Have you tried following this suggestion?
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
On Fri, Aug 23, 2024 at 10:39 AM Ganesh Walse
wrote:
> Hi All,
>
> I am using apache flink for
Hi John,
I've experienced this issue recently; it's likely caused either by:
- the size of the producer record batch, it can be reduced by configuring
lower linger.ms and batch.size values
- the size of an individual record
On Fri, Aug 23, 2024 at 7:20 AM Ahmed Hamdy wrote:
> Why do you belie
Hi Lasse,
Have you seen this approach
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
?
On Tue, Apr 9, 2024 at 7:09 AM Lasse Nedergaard <
lassenedergaardfl...@gmail.com> wrote:
> Hi.
>
> I my Integrati
14: did not find expected '-' indicator
>
>
>
> Thanks,
>
> Suyash Jaiswal
>
>
>
>
>
> *From:* Yaroslav Tkachenko
> *Sent:* Monday, March 18, 2024 3:19 PM
> *To:* Suyash Jaiswal
> *Cc:* user@flink.apache.org
> *Subject:* Re: Read configmap data i
Hi Suyash,
You can expose your configmap values as environment variables using the
*podTemplate* parameter (see:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/reference/).
This can be configured individually for TaskManager, JobManager or for
bo
The schema registry support is provided
in ConfluentRegistryAvroSerializationSchema class
(flink-avro-confluent-registry package).
On Thu, Feb 1, 2024 at 8:04 AM Yaroslav Tkachenko
wrote:
> You can also implement a custom KafkaRecordSerializationSchema, which
> allows creating a Producer
You can also implement a custom KafkaRecordSerializationSchema, which
allows creating a ProducerRecord (see "serialize" method) - you can set
message key, headers, etc. manually. It's supported in older versions.
On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun wrote:
> Sorry, I didn't notice the versi
Hi Oscar,
The only way you can define the Kafka message key is by providing a
custom KafkaRecordSerializationSchema to your FlinkKafkaSink via
"setRecordSerializer" method.
KafkaRecordSerializationSchema.serialize method return a ProducerRecord, so
you can set things like the message key, message
=".
>
>
> --
> Best!
> Xuyang
>
>
> 在 2023-10-20 15:57:28,"Yaroslav Tkachenko" 写道:
>
> Hi Xuyang,
>
> A shuffle by join key is what I'd expect, but I don't see it. The issue
> only happens with parallelism > 1.
>
> >
eproduced.
>
>
>
>
> --
> Best!
> Xuyang
>
>
> At 2023-10-20 04:31:09, "Yaroslav Tkachenko" wrote:
>
> Hi everyone,
>
> I noticed that a simple INNER JOIN in Flink SQL behaves
> non-deterministicly. I'd like to understand if it's
Hi everyone,
I noticed that a simple INNER JOIN in Flink SQL behaves
non-deterministicly. I'd like to understand if it's expected and whether an
issue is created to address it.
In my example, I have the following query:
SELECT a.funder, a.amounts_added, r.amounts_removed FROM table_a AS a JOIN
It depends on your requirements. Personally, I don't use PVs and, instead,
mount a volume from a host with a fast instance-level SSD.
On Wed, Aug 30, 2023 at 11:26 AM Tony Chen wrote:
> We used to have a Persistent Volume (PV), attached to the pod, for storing
> the RocksDB data while using the
Hey Tony,
Pretty much all Flink configuration is supported, including the RocksDB
state backend.
On Wed, Aug 30, 2023 at 9:05 AM Tony Chen wrote:
> Hi Flink Community,
>
> Does the flink-kubernetes-operator support RocksDB as the state backend
> for FlinkDeployment?
>
> We have some Flink appli
Hi Krzysztof,
You may want to check flink-kubernetes-operator-api (
https://mvnrepository.com/artifact/org.apache.flink/flink-kubernetes-operator-api),
here's an example for reading FlinkDeployments
https://github.com/sap1ens/heimdall/blob/main/src/main/java/com/sap1ens/heimdall/kubernetes/FlinkDe
Hi Xiao,
I don't believe it's possible for the application mode.
On Fri, Jul 7, 2023 at 12:20 PM Xiao Ma wrote:
> Hi Alexey,
>
> Thank you for the reply.
>
> So the link pasted is to start a job though the kubectl command. Is there
> a way to start an Application Mode job through the RESTful Ja
Hi Yogesh,
Multiple kafka sources are supported. This warning only indicates that
multiple consumers won't be able to register JMX metrics. There are several
bugs reported about this, but I believe it should be fixed for consumers in
the newer Flink versions (1.14+).
On Wed, Jul 5, 2023 at 9:32 P
Hey Brendan,
Do you use a file source by any chance?
On Mon, Jun 26, 2023 at 4:31 AM Brendan Cortez
wrote:
> Hi all!
>
> I'm trying to submit a Flink Job in Application Mode in the Kubernetes
> cluster.
>
> I see some problems when an application has a big number of operators
> (more than 20 sa
27, 2023 at 6:06 PM Yaroslav Tkachenko
> wrote:
>
>> Hi Giannis,
>>
>> I'm curious, what tool did you use for this analysis (what the screenshot
>> shows)? Is it something custom?
>>
>> Thank you.
>>
>> On Wed, Apr 26, 2023 at 10:38 PM G
Hi Giannis,
I'm curious, what tool did you use for this analysis (what the screenshot
shows)? Is it something custom?
Thank you.
On Wed, Apr 26, 2023 at 10:38 PM Giannis Polyzos
wrote:
> This is really helpful,
>
> Thanks
>
> On Thu, Apr 27, 2023 at 5:46 AM Yanfei Lei wrote:
>
>> Hi Giannis,
Hi Ravi,
All of them should be production ready. I've personally used half of them
in production.
Do you have any specific concerns?
On Thu, Mar 9, 2023 at 9:39 AM ravi_suryavanshi.yahoo.com via user <
user@flink.apache.org> wrote:
> Hi,
> Can anyone help me here?
>
> Thanks and regards,
> Ravi
Hi everyone,
In my Flink application, I have a table created with ChangelogMode.all().
One of the sinks I want to use requires ChangelogMode.insertOnly().
The only solution that comes to mind is converting my table to a DataStream
of Rows, filtering out using RowKind and converting it back to a
Hey everyone,
I'm wondering if anyone has done any experiments trying to use non-temporal
watermarks? For example, a dataset may contain some kind of virtual
timestamp / version field that behaves just like a regular timestamp
(monotonically increasing, etc.), but has a different scale / range.
A
Hi! According to this
https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility,
1.14 is the latest supported version.
On Fri, Jan 27, 2023 at 9:19 AM P Singh wrote:
> Hi Team,
>
> I am trying to run apache beam pipeline on flink cluster. I have set up
> kubernetes locally
Hey Ken,
I have flink-s3-fs-hadoop as a provided dependency in my project, and I've
configured my IDE to include provided dependencies when starting
applications. Works just fine.
On Tue, Jan 3, 2023 at 5:06 PM Ken Krugler
wrote:
> Hi all,
>
> With Flink 1.15.x, is there a way to use the S3 Pre
Hi Noel,
It's definitely possible. You need to implement a
custom KafkaRecordDeserializationSchema: its "deserialize" method gives you
a ConsumerRecord as an argument so that you can extract Kafka message key,
headers, timestamp, etc.
Then pass that when you create a KafkaSource via "setDeseriali
Hey Theodor,
That's pretty much it, assuming your Protobuf schema is more or less fixed.
But for a production workload, you'd need to add a Schema Registry lookup.
I guess the implementation for that would be similar to what's in the Avro
format.
On Tue, Nov 29, 2022 at 2:26 AM Theodor Wübker
wr
make use of FIFO compaction.
>
> I’m a bit surprised you were able to run with 10+TB state without
> unaligned checkpoints because the performance in my application degrades
> quite a lot. Can you share your checkpoint configurations?
>
>
> Thanks,
> Vishal
> On 15 Nov
Hi Vishal,
Just wanted to comment on this bit:
> My job has very large amount of state (>100GB) and I have no option but
to use unaligned checkpoints.
I successfully ran Flink jobs with 10+ TB of state and no unaligned
checkpoints enabled. Usually, you consider enabling them when there is some
k
gt; (possibly a complex way of doing it). I am relatively certain though that
> there is nothing that does the trick as straightforward as there is for
> Avro with AvroSchemaConverter in the flink-avro package.
>
> -Theo
>
> On 8. Nov 2022, at 16:34, Yaroslav Tkachenko wrote:
>
Hey Theo, have you looked at the flink-json and flink-protobuf packages?
On Tue, Nov 8, 2022 at 5:21 AM Theodor Wübker
wrote:
> Hello,
>
> I have a streaming use case, where I execute a query on a Table. I take
> the ResolvedSchema of the table and convert it to an Avro-Schema using the
> AvroSc
Hey Vishal,
I guess you're using the DataStream API? In this case, you have more
control over data serialization, so it makes sense to use custom
serialization logic.
IMO, flink-json (as well as flink-avro, flink-csv, etc.) is really helpful
when using Table API / SQL API, because it contains the
I'm confident I'm hitting a bug, I guess I'm the first one trying this
recovery in the standalone mode :-D
Created https://issues.apache.org/jira/browse/FLINK-29633
On Thu, Oct 13, 2022 at 8:45 AM Yaroslav Tkachenko
wrote:
> Thanks folks, I understand this can be a limitatio
.CheckpointCoordinator[] - Restoring
> job from Savepoint 34 @ 0 for
> located at
> s3p://flink-checkpoints/k8s-checkpoint-test-k8s-deploy//chk-34.
>
> --
Hey everyone,
I'm trying to redeploy an application using a savepoint. The new version of
the application has a few operators with new uids and a few operators with
the old uids. I'd like to keep the state for the old ones.
I passed the allowNonRestoredState flag (using Apache Kubernetes Operator
Hi,
You can implement a custom KafkaRecordDeserializationSchema (example
https://docs.immerok.cloud/docs/cookbook/reading-apache-kafka-headers-with-apache-flink/#the-custom-deserializer)
and just avoid emitting the record if the header value matches what you
need.
On Wed, Oct 12, 2022 at 11:04 AM
Hi Sucheth,
The short answer is no, when deserializing Avro messages you have to
provide the schema somehow, either directly or using the Schema Registry.
On Mon, Oct 10, 2022 at 10:00 AM Sucheth S wrote:
> Hi,
>
> I'm trying to deserialize avro messages from the kafka topic as a
> consumer.
>
Hey Pavel,
I was looking for something similar a while back and the best thing I came
up with was using the DataStream API to do all the shuffling and THEN
converting the stream to a table using fromDataStream/fromChangelogStream.
On Wed, Oct 5, 2022 at 4:54 AM Pavel Penkov wrote:
> I have a ta
to deprecate and
> remove the Scala APIs?
>
> Best regards,
>
> Martijn
>
>
> On Tue, Oct 4, 2022 at 2:41 PM Yaroslav Tkachenko
> wrote:
>
>> Hi Martijn,
>>
>> The 2.0 argument makes sense (I agree it's easier to introduce more
>> breaking ch
ility guarantees already (depending if it's
> Public/PublicEvolving/Experimental). If a change would happen there, I
> think it would be smaller refactoring.
>
> Best regards,
>
> Martijn
>
> [1] https://issues.apache.org/jira/browse/FLINK-25000
>
> On Tue, Oct 4, 2
Hi Martijn,
As a Scala user, this change would affect me a lot and I'm not looking
forward to rewriting my codebase, and it's not even a very large one :)
I'd like to suggest supporting Java 17 as a prerequisite (
https://issues.apache.org/jira/browse/FLINK-15736). Things like switch
expressions
Hi Deepak,
You can use a union operator. I actually gave a talk on creating an
advanced join using the union operator and multiple streams:
-
https://www.slideshare.net/sap1ens/storing-state-forever-why-it-can-be-good-for-your-analytics
- https://www.youtube.com/watch?v=tiGxEGPyqCg
I hope this he
Interesting, do you see the /opt/flink/usrlib folder created as well? Also,
what Flink version do you use?
Thanks.
On Tue, Sep 20, 2022 at 4:04 PM Javier Vegas wrote:
>
> jarURI: local:///opt/flink/lib/MYJARNAME.jar
>
> El mar, 20 sept 2022 a las 0:25, Yaroslav Tkachenko ()
> es
p.
>
> El vie, 16 sept 2022 a las 9:28, Yaroslav Tkachenko ()
> escribió:
>
>> Application mode. I've done a bit more research and created
>> https://issues.apache.org/jira/browse/FLINK-29288, planning to work on a
>> PR today.
>>
>> TLDR: currentl
es are included and the job jar itself ends
> up on the user classpath. I'm curious whether Chesnay (CC'd) has an answer
> to that one.
>
> On Tue, Sep 13, 2022 at 1:40 AM Yaroslav Tkachenko
> wrote:
>
>> Hey everyone,
>>
>> I’m migrating a Flink Kuber
Hey everyone,
I’m migrating a Flink Kubernetes standalone job to the Flink operator (with
Kubernetes native mode).
I have a lot of classloading issues when trying to run with the operator in
native mode. For example, I have a Postgres driver as a dependency (I can
confirm the files are included i
Hi Soumen,
I'd try parsing the input using the DataStream API (with a fast JSON
library) and then converting it to a Table.
On Thu, Jul 21, 2022 at 6:22 AM Soumen Choudhury wrote:
> We have a requirement of parsing a very complex json (size around 25 kb
> per event) event with a predefined sche
Hi!
I'd try re-running the SSD test with the following config options:
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
On Thu, Jul 21, 2022 at 4:11 AM vtygoss wrote:
> Hi, community!
>
>
> I am doing some performance tests based on my scene.
>
Hi Alexis,
Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
consumer offsets? In this case, it should get the offsets from Kafka and
not the state.
On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:
> Hello,
>
> Regarding the new Kafka
Hi John,
I've been using a path like
this: s3:savepoint- (no trailing slash).
I'm pretty sure you need to specify the full path.
Yes, you can see savepoint restore in logs. It's also fairly easy to see it
in the Flink UI, under the Checkpoints section (it shows the information
about the lates
Hi Devin,
I gave a talk called "Storing State Forever: Why It Can Be Good For Your
Analytics", which may be very relevant:
- https://www.youtube.com/watch?v=tiGxEGPyqCg
-
https://www.slideshare.net/sap1ens/storing-state-forever-why-it-can-be-good-for-your-analytics
On Wed, May 25, 2022 at 8:04 PM
hat was likely due to an under-optimized job where the bottleneck was
> elsewhere
>
> On Wed, Apr 20, 2022, 11:08 AM Yaroslav Tkachenko
> wrote:
>
>> Hey Trystan,
>>
>> Based on my personal experience, good disk IO for RocksDB matters a lot.
>> Are you using the
Hey Trystan,
Based on my personal experience, good disk IO for RocksDB matters a lot.
Are you using the fastest SSD storage you can get for RocskDB folders?
For example, when running on GCP, we noticed *10x* throughput improvement
by switching RocksDB storage to
https://cloud.google.com/compute/d
Also https://shopify.engineering/optimizing-apache-flink-applications-tips
might be helpful (has a section on profiling, as well as classloading).
On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler wrote:
> We have a very rough "guide" in the wiki (it's just the specific steps I
> took to debug an
Hey everyone,
I'm curious if anyone knows the reason behind choosing 38 as
a MAX_PRECISION for DecimalType in the Table API?
I needed to process decimal types larger than that and I ended up
monkey-patching DecimalType to use a higher precision. I understand it adds
a bit of overhead, but I haven
aStream API.
>> I don't know if using the PARTITIONED BY clause in the create statement
>> of the table can help to "balance" the data, see
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#partitioned-by
>> .
>>
>>
>
Hey everyone,
I'm trying to use Flink SQL to construct a set of transformations for my
application. Let's say the topology just has three steps:
- SQL Source
- SQL SELECT statement
- SQL Sink (via INSERT)
The sink I'm using (JDBC) would really benefit from data partitioning (by
PK ID) to avoid c
Hi, sorry for resurrecting the old thread, but I have precisely the same issue
with a different filesystem.
I've tried using plugins dir, setting FLINK_PLUGINS_DIR, etc. - nothing works
locally. I added a breakpoint to the PluginConfig.getPluginsDir method and
confirmed it's not even called.
t; On Wed, Jun 23, 2021 at 6:36 AM Yaroslav Tkachenko <
> yaroslav.tkache...@shopify.com> wrote:
>
>> Hi everyone,
>>
>> I need to add support for the GCS filesystem. I have a working example
>> where I add two JARs to the */opt/flink/lib*/ folder:
>>
>
Hi everyone,
I need to add support for the GCS filesystem. I have a working example
where I add two JARs to the */opt/flink/lib*/ folder:
- GCS Hadoop connector
- *Shaded* Hadoop using flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
Now I'm trying to follow the advice from
https://ci.apache.org/projec
Forward talks .. but it is difficult
> to find answers (unless you spend a lot of time on YouTube).
>
> Best,
> Robert
>
>
> On Wed, May 19, 2021 at 8:01 PM Yaroslav Tkachenko <
> yaroslav.tkache...@shopify.com> wrote:
>
>> Hi everyone,
>>
>> I
aner way would probably be to do this
> re-deployment with explicit savepoints.
>
> We are doing this in kubernetes where both scaling options are really
> easy to carry out. But the same concepts should work on any other setup,
> too.
>
>
> Hope that helps
>
> Jan
>
>
Hi everyone,
I'd love to learn more about how different companies approach specifying
Flink parallelism. I'm specifically interested in real, production
workloads.
I can see a few common patterns:
- Rely on default parallelism, scale by changing parallelism for the whole
pipeline. I guess it onl
ry mechanism
>>> already. So I think there is no need for flink to retry.
>>> I am not very sure but from the log it seems that the gfs's retry is
>>> interrupted by some reason. So I think we could get more insight if we
>>> could find the first fail cause.
&
retry mechanism.
> >>> Interrupted while sleeping before retry. Giving up after 1/10 retries
> for 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>
> Best,
> Guowei
>
>
> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
> yaroslav.tka
Hi everyone,
I'm wondering if people have experienced issues with Taskmanager failure
recovery when dealing with a lot of state.
I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and
checkpoints. ~150 task managers with 4 slots each.
When I run a pipeline without much state
Hello everyone,
I have a question about implementing a join of N datastreams (where N > 2)
without any time guarantees. According to my requirements, late data is not
tolerable, so if I have a join between stream A and stream B and a message
with key X arrives in stream B one year after arriving i
active keys in
memory. I don't feel like LRU is doing a very good job here... I couldn't
find any option like that, but I'm wondering if someone could recommend
something similar.
Thank you!
--
Yaroslav Tkachenko
sap1ens.com
71 matches
Mail list logo