Hi,
Wanted to understand how KafkaSource stores the offsets of each partition in
state.
Does it keep the offsets of each topic-partition in its checkpointed state or
just commit back to Kafka on every checkpoint?
And then how does DynamicKafkaSource work in this? Does it store the offsets
for
Hi team!
We have observed strange behavior when using KafkaSource and have
checkpointing enabled.
Even if we do not get checkpoint failures or any means of errors, we see
the committed offset going up and down in a crazy manner instead of moving
up to reach the end offset of that X topic.
We know
's any plan in releasing
> this feature?
>
> For our current situation, we are subscribing to hundreds of topics, and
> we add/remove topics quite often (every few days probably), adding topics
> seems to be okay at the moment, but with the current KafkaSource design, if
> re
ng
this feature?
For our current situation, we are subscribing to hundreds of topics, and we
add/remove topics quite often (every few days probably), adding topics
seems to be okay at the moment, but with the current KafkaSource design, if
removing a topic means we need to change the kafka soure
viewpage.action?pageId=217389320
On Wed, Nov 1, 2023 at 7:29 AM Emily Li via user wrote:
>
> Hey
>
> We have a flinkapp which is subscribing to multiple topics, we recently
> upgraded our application from 1.13 to 1.15, which we started to use
> KafkaSource instead of FlinkKafka
Hey
We have a flinkapp which is subscribing to multiple topics, we recently
upgraded our application from 1.13 to 1.15, which we started to use
KafkaSource instead of FlinkKafkaConsumer (deprecated).
But we noticed some weird issue with KafkaSource after the upgrade, we are
setting the topics
I am using Flink 1.13.1 on AWS EMR and I seem to have hit this bug:
https://issues.apache.org/jira/browse/FLINK-27041. My job will fail when
there are empty partitions. I see it is fixed in a newer version of Flink
but I cannot update Flink version at this time.
Suggestions on a workaround? I a
`KafkaPartitionSplitSerializer`.
Best,
Hang
Charles Tan 于2023年5月24日周三 06:27写道:
> Hi everyone,
>
> I have a few questions about reading KafkaSource state using the State
> Processor API. I have a simple Flink application which reads from a Kafka
> topic then produces to a different topic. After runnin
Hi everyone,
I have a few questions about reading KafkaSource state using the State
Processor API. I have a simple Flink application which reads from a Kafka
topic then produces to a different topic. After running the Flink job and
stopping it with a savepoint, I then write a few more records to
it may be useful for
> positioning the issue
>
> Best,
> Shammon FY
>
>
> On Fri, Apr 21, 2023 at 12:56 AM naga sudhakar
> wrote:
>
>> Hi Team,
>> Greetings of the day..
>> we are on flink 1.16.1 version and using flinkkafkaconsumer today.
>> When
> When I replaced it with kafkasource,it's failing with not able to connect
> with kafka jaas configuration. Error says Kafka client entry not found in
> /tmp/jass config file. We are passing the flonk runtime arg for the
> security.auth.login conf details. Same was working g w
Hi Team,
Greetings of the day..
we are on flink 1.16.1 version and using flinkkafkaconsumer today.
When I replaced it with kafkasource,it's failing with not able to connect
with kafka jaas configuration. Error says Kafka client entry not found in
/tmp/jass config file. We are passing the
Otherwise I guess we’ll need to just run it in
> a single cluster and be aware of the risks if we lost that cluster.
>
> Thanks,
> Ben
>
> On 2023/03/30 16:52:31 "Tzu-Li (Gordon) Tai" wrote:
> > Hi Robert,
> >
> > This is a design choice. Flin
guess we’ll need to just run it in a single
cluster and be aware of the risks if we lost that cluster.
Thanks,
Ben
On 2023/03/30 16:52:31 "Tzu-Li (Gordon) Tai" wrote:
> Hi Robert,
>
> This is a design choice. Flink's KafkaSource doesn't rely on consumer
>
Sorry, I meant to say "Hi Ben" :-)
On Thu, Mar 30, 2023 at 9:52 AM Tzu-Li (Gordon) Tai
wrote:
> Hi Robert,
>
> This is a design choice. Flink's KafkaSource doesn't rely on consumer
> groups for assigning partitions / rebalancing / offset tracking. It
> manua
Hi Robert,
This is a design choice. Flink's KafkaSource doesn't rely on consumer
groups for assigning partitions / rebalancing / offset tracking. It
manually assigns whatever partitions are in the specified topic across its
consumer instances, and rebalances only when the Flink job / Ka
Hi,
Is there a way to run multiple flink jobs with the same Kafka group.id and have
them join the same consumer group?
It seems that setting the group.id using KafkaSource.builder().set_group_id()
does not have the effect of creating an actual consumer group in Kafka.
Running the same flink jo
Hi,
I am using KafkaSource API read from 6 topics within Kafka. Flink version -
1.14.3. Each and every kafka topic my Flink pipeline reads from is having a
different load but same number of partitions (lets say 30). For example
partition 0 of topic 1 and partition 0 of topic 2 have different
nce/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits
>
> On Thu, Dec 8, 2022 at 6:21 PM Niklas Wilcke <mailto:niklas.wil...@uniberg.com>> wrote:
>> Hi Flink Community,
>>
>> I have a few questions regarding the new KafkaSource and event time,
27+Sources
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits
On Thu, Dec 8, 2022 at 6:21 PM Niklas Wilcke
wrote:
> Hi Flink Community,
>
> I have a few questions regarding the new KafkaSource and event time, which
> I wasn'
Hi Flink Community,
I have a few questions regarding the new KafkaSource and event time, which I
wasn't able to answer myself via checking the docs, but please point me to the
right pages in case I missed something. I'm not entirely whether my knowledge
entirely holds for the new K
argument so that you can extract Kafka message key,
> headers, timestamp, etc.
>
> Then pass that when you create a KafkaSource via "setDeserializer" method.
>
> On Wed, Dec 7, 2022 at 6:14 AM Noel OConnor wrote:
>>
>> Hi,
>> I'm using a kafka source to re
Hi Noel,
It's definitely possible. You need to implement a
custom KafkaRecordDeserializationSchema: its "deserialize" method gives you
a ConsumerRecord as an argument so that you can extract Kafka message key,
headers, timestamp, etc.
Then pass that when you create a
Hi,
I'm using a kafka source to read in messages from kafka into a datastream.
However I can't seem to access the key of the kafka message in the datastream.
Is this even possible ?
cheers
Noel
here's too much inter-node traffic happening, and the Flink tasks don't
> have any DC awareness.
I agree. Running a single Flink application cross DC tends to increase
costs significantly with public cloud providers in which you accrue cross
DC costs. You can follow [1] to allow the
rs, Martijn
>>
>> On Tue, Oct 4, 2022 at 3:51 PM Mason Chen wrote:
>>
>>> Hi Martjin,
>>>
>>> I notice that this question comes up quite often. Would this be a good
>>> addition to the KafkaSource documentation? I'd be happy to contribu
l free to open a PR and ping me for a review.
>
> Cheers, Martijn
>
> On Tue, Oct 4, 2022 at 3:51 PM Mason Chen wrote:
>
>> Hi Martjin,
>>
>> I notice that this question comes up quite often. Would this be a good
>> addition to the KafkaSource documentatio
lso have to be managed from
> Kafka, not from Flink state.)
>
>
>
> On Wed, Oct 5, 2022 at 9:54 AM Andrew Otto wrote:
>
>> Hi all,
>>
>> *tl;dr: Would it be possible to make a KafkaSource that uses Kafka's
>> built in consumer assignment for Flink tasks?*
:
> Hi all,
>
> *tl;dr: Would it be possible to make a KafkaSource that uses Kafka's built
> in consumer assignment for Flink tasks?*
>
> At the Wikimedia Foundation we are evaluating
> <https://phabricator.wikimedia.org/T307944> whether we can use a Kafka
>
Hi all,
*tl;dr: Would it be possible to make a KafkaSource that uses Kafka's built
in consumer assignment for Flink tasks?*
At the Wikimedia Foundation we are evaluating
<https://phabricator.wikimedia.org/T307944> whether we can use a Kafka
'stretch' cluster to simplify
Hi Mason,
Definitely! Feel free to open a PR and ping me for a review.
Cheers, Martijn
On Tue, Oct 4, 2022 at 3:51 PM Mason Chen wrote:
> Hi Martjin,
>
> I notice that this question comes up quite often. Would this be a good
> addition to the KafkaSource documentation? I
Hi Martjin,
I notice that this question comes up quite often. Would this be a good
addition to the KafkaSource documentation? I'd be happy to contribute to
the documentation.
Best,
Mason
On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser
wrote:
> Hi Robert,
>
> Ba
Hi Robert,
Based on
https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
I think you'll need to change the UID for your KafkaSource and restart your
job with allowNonRestoredState enabled.
Best regards,
Martijn
On Tue, Oct 4,
We've changed the KafkaSource to ingest from a new topic but the old name
is still being referenced:
2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global
failure triggered by OperatorCoordinator for 'Source: Grokfailures'
(operator feca28aff5a3958840bee985e
Hello,
Is there way to configure Flink to expose watermarLag metric per topic per
partition? I think it could be useful to detect data skew between partitions
Thanks,
Alexey
Yan, I've created https://issues.apache.org/jira/browse/FLINK-28975 to
track this.
Regards,
David
On Sun, Aug 14, 2022 at 6:38 PM Yan Shen wrote:
> Thanks David,
>
> I am working on a flink datastream job that does a temporal join of two
> kafka topics based on watermarks. The problem was quite
Thanks David,
I am working on a flink datastream job that does a temporal join of two
kafka topics based on watermarks. The problem was quite obvious when I
enabled idleness and data flowed through much faster with different results
even though the topics were not idle.
Regards.
On Mon, Aug 15,
Although I'm not very familiar with the design of the code involved, I also
looked at the code, and I'm inclined to agree with you that this is a bug.
Please do raise an issue.
I'm wondering how you noticed this. I was thinking about how to write a
failing test, and I'm wondering if this has some
Hi all,
After examining the source code further, I am quite sure
org.apache.flink.api.common.eventtime.WatermarksWithIdleness
does not work with FLIP-27 sources.
In org.apache.flink.streaming.api.operators.SourceOperator, there are
separate instances of WatermarksWithIdleness created for each spl
Hi,
I am using a org.apache.flink.connector.kafka.source.KafkaSource with a
watermark strategy like this:
WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10))
I noticed that after a short while all the partitions seem to be marked as
idle even though there are message
15/2022 17:50
> To Martijn Visser
> Cc Jing Ge ,
> user
> Subject Re: New KafkaSource API : Change in default behavior regarding
> starting offset
> Hello Martijn,
>
> Thanks for the link to the release note, especially :
> "When resuming f
请问这个邮件咋退订?
Replied Message
| From | bastien dine |
| Date | 06/15/2022 17:50 |
| To | Martijn Visser |
| Cc | Jing Ge,
user |
| Subject | Re: New KafkaSource API : Change in default behavior regarding
starting offset |
Hello Martijn,
Thanks for the link to the release note
e for sure .committedOffsets - we have it by default in our custom
KafkaSource builder to be sure we do not read all the previous data
(earliest)
What bother me is just this change in starting offset default behavior from
FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
In fact it happe
Flink in the master branch. Could you please point out the code that
>> committed offset is used as default?
>>
>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
>> is used, an exception will be thrown at runtime in case there is no
>> committed offset,
fsets() within
> Flink in the master branch. Could you please point out the code that
> committed offset is used as default?
>
> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
> is used, an exception will be thrown at runtime in case there is no
> committed off
Hi Bastien,
Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
Flink in the master branch. Could you please point out the code that
committed offset is used as default?
W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
is used, an exception wi
om :
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
Before in old FlinkKafkaConsumer it was from committed offset (i.e :
setStartFromGroupOffsets()
method)
which match with this behaviour in new KafkaSource : : OffsetsInitializer.
commit
Hi Frank,
This sounds like an interesting issue. Can you share a minimal working example?
Best regards,
Niklas
> On 9. Feb 2022, at 23:11, Frank Dekervel wrote:
>
> Hello,
>
> When trying to reproduce a bug, we made a DeserialisationSchema that throws
> an exception when a malformed message
Hi Santosh,
It’s best to avoid cross-posting. Let’s keep the discussion to SO.
Best regards,
Niklas
> On 12. Feb 2022, at 16:39, santosh joshi wrote:
>
> We are migrating to KafkaSource from FlinkKafkaConsumer. We have disabled
> auto commit of offset and instead committing them
We are migrating to KafkaSource
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java>
from FlinkKafkaConsumer
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connec
Hello,
When trying to reproduce a bug, we made a DeserialisationSchema that
throws an exception when a malformed message comes in.
Then, we sent a malformed message together with a number of well formed
messages to see what happens.
valsource= KafkaSource.builder[OurMessage]()
.setValueOnlyDe
ation
(https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html),
it states to use FlinkKafkaSource when consuming from Kafka.
However, I noticed that the newer API uses KafkaSource, which uses
KafkaSourceBuilder and OffsetsInitializer.
Although I am on the Flink
According to the Flink 1.12 documentation (
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html),
it states to use FlinkKafkaSource when consuming from Kafka.
However, I noticed that the newer API uses KafkaSource, which uses
KafkaSourceBuilder and
in Flink 1.14 in favor of
KafkaSource, which implements the unified batch/streaming interface defined
in FLIP-27.
Regards,
David
On Tue, Feb 1, 2022 at 9:21 AM Francesco Guardiani
wrote:
> I think the FlinkKakfaConsumer010 you're talking about is the old source
> api. You shou
I think the FlinkKakfaConsumer010 you're talking about is the old source
api. You should use only KafkaSource now, as they use the new source
infrastructure.
On Tue, Feb 1, 2022 at 9:02 AM HG wrote:
> Hello Francesco
> Perhaps I copied the wrong link of 1.2.
> But there i
Hello Francesco
Perhaps I copied the wrong link of 1.2.
But there is also
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
It seems there are 2 ways to use Kafka
KafkaSource source = KafkaSource.builder
The latter link you posted refers to a very old flink release. You shold
use the first link, which refers to latest release
FG
On Tue, Feb 1, 2022 at 8:20 AM HG wrote:
> Hello all
>
> I am confused.
> What is the difference between KafkaSource as defined in :
> https://night
Hello all
I am confused.
What is the difference between KafkaSource as defined in :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
and FlinkKafkaConsumer010 as defined in
https://nightlies.apache.org/flink/flink-docs-release-
1.2/api/java/org/apache
nts from 2 sources: Parquet files stored in a GCS
>> > Bucket, and a Kafka topic.
>> > With the release of Hybrid Source in Flink 1.14, we were able to construct
>> > a Hybrid Source which produces events from two sources: a FileSource which
>> > reads data from
gt; We have a requirement as follows:
> >
> > We want to stream events from 2 sources: Parquet files stored in a GCS
> Bucket, and a Kafka topic.
> > With the release of Hybrid Source in Flink 1.14, we were able to
> construct a Hybrid Source which produces events from two sources:
s data from a locally saved Parquet File, and a KafkaSource consuming
> events from a remote Kafka broker.
>
> I was wondering if instead of using a local Parquet file, whether it is
> possible to directly stream the file from a GCS bucket and construct a File
> Source out of it
data from a locally saved Parquet File, and a KafkaSource
consuming events from a remote Kafka broker.
I was wondering if instead of using a local Parquet file, whether it is
possible to directly stream the file from a GCS bucket and construct a File
Source out of it at runtime ? The Parquet Files
wrote:
> Thanks, Arvid.
>
> Can you clarify how the KafkaSource currently behaves in the situation
> where it starts with fewer partitions than subtasks? Is that the case
> described in FLIP-180 as case 1: "Static assignment + too few splits"? The
> implementation de
Thanks, Arvid.
Can you clarify how the KafkaSource currently behaves in the situation
where it starts with fewer partitions than subtasks? Is that the case
described in FLIP-180 as case 1: "Static assignment + too few splits"? The
implementation described there (emit MAX_WATERMARK) sh
Hi David,
yes that's intentionally [1] as it could lead to correctness issues and it
was inconsistently used across sources. Yes it should be documented.
For now I'd put it in the KafkaSource docs because I'm not sure in which
release notes it would fit best. In which release
I've seen a few questions recently from folks migrating from
FlinkKafkaConsumer to KafkaSource that make me suspect that something has
changed.
In FlinkKafkaConsumerBase we have this code which sets a source subtask to
idle if all of its partitions are empty when the subtask s
Hi Marco,
>
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#additional-properties
In the new KafkaSource, you can configure it in your properties. You can
take a look at `KafkaSourceOptions#COMMIT_OFFSETS_ON_CHECKPOINT` for the
specific config, which
The FlinkKafkaConsumer that will be deprecated has the method
"setCommitOffsetsOnCheckpoints(boolan)" method.
However, that functionality is not the new KafkaSource class.
How is this behavior / functionality configured in the new API?
-Marco A. Villalobos
With the legacy FlinkKafkaConsumer, overriding the isEndOfStream method of
DeserializationSchema can solve the problem.
But the new KafkaSource ignores the method (never been called), and it
seems the setUnbounded method only accepts offset or time.
gt; system, so there's no way to craft a single regular expression that
>>>>>>> would
>>>>>>> fit the state of all potential topics. Additionally the documentation
>>>>>>> you
>>>>>>> linked seems to suggest
starts running". My understanding is it would not pick up new topics that
>> match the pattern after the job starts.
>>
>>
>> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng > <mailto:tsreape...@gmail.com>> wrote:
>> Hi!
>>
>> I suppose you
Thanks Fabian,
That was the information I was missing.
(Late reply ... same here, FlinkForward 😊 )
Thias
-Original Message-
From: Fabian Paul
Sent: Donnerstag, 28. Oktober 2021 08:38
To: Schwalbe Matthias
Cc: Mason Chen ; user
Subject: Re: FlinkKafkaConsumer -> KafkaSou
at would
>>>>>> fit the state of all potential topics. Additionally the documentation you
>>>>>> linked seems to suggest that the regular expression is evaluated only
>>>>>> once
>>>>>> "when the job starts run
Hi,
Sorry for the late reply but most of use were involved in the Flink Forward
conference. The upgrade strategies for the Kafka sink and source are
pretty similar. Source and sink do not rely on state migration but leveraging
Kafka as source of truth.
When running with FlinkKafkaConsumer Maso
I would also be interested on instructions/discussion on how to state-migrate
from pre-unified sources/sinks to unified ones (Kafka) 😊
Thias
From: Mason Chen
Sent: Mittwoch, 27. Oktober 2021 01:52
To: user
Subject: FlinkKafkaConsumer -> KafkaSource State Migration
Hi all,
I read th
Oct 13, 2021 at 8:51 PM Caizhi Weng <mailto:tsreape...@gmail.com>> wrote:
> Hi!
>
> I suppose you want to read from different topics every now and then? Does the
> topic-pattern option [1] in Table API Kafka connector meet your needs?
>
> [1]
> https://ci.apache.org/
Hi all,
I read these instructions for migrating to the KafkaSource:
https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
.
Do we need to employ any uid/allowNonRestoredState tricks if our Flink job
is also stateful outside of the
gt;>> topics that match the pattern after the job starts.
>>>>>
>>>>>
>>>>> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> I suppose yo
t;. My understanding is it would not pick up new
>>>> topics that match the pattern after the job starts.
>>>>
>>>>
>>>> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
&
;>>
>>> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng
>>> wrote:
>>>
>>>> Hi!
>>>>
>>>> I suppose you want to read from different topics every now and then?
>>>> Does the topic-pattern option [1] in Table API Kafka connec
>>>
>>> I suppose you want to read from different topics every now and then?
>>> Does the topic-pattern option [1] in Table API Kafka connector meet your
>>> needs?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/do
ant to read from different topics every now and then? Does
>> the topic-pattern option [1] in Table API Kafka connector meet your needs?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern
>>
>> Preston
suppose you want to read from different topics every now and then? Does
> the topic-pattern option [1] in Table API Kafka connector meet your needs?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern
>
> Preston Price 于20
:
> The KafkaSource, and KafkaSourceBuilder appear to prevent users from
> providing their own KafkaSubscriber. Am I overlooking something?
>
> In my case I have an external system that controls which topics we should
> be ingesting, and it can change over time. I need to add, and rem
The KafkaSource, and KafkaSourceBuilder appear to prevent users from
providing their own KafkaSubscriber. Am I overlooking something?
In my case I have an external system that controls which topics we should
be ingesting, and it can change over time. I need to add, and remove topics
as we refresh
Thanks for the feedback.
> May I ask why you have less partitions than the parallelism? I would be
happy to learn more about your use-case to better understand the
> motivation.
The use case is that topic A, contains just a few messages with product
metadata that rarely gets updated, while topic
Hi all,
The problem you are seeing Lars is somewhat intended behaviour, unfortunately.
With the batch/stream unification every Kafka partition is treated
as kind of workload assignment. If one subtask receives a signal that there is
no workload anymore it goes into the FINISHED state.
As alread
execution-checkpointing-checkpoints-after-tasks-finish-enabled
>>
>> On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven wrote:
>>
>>> Using KafkaSource builder with a job parallelism larger than the number
>>> of kafka partitions, the job is unable to checkpoint.
>
Sep 15, 2021 at 11:26 AM Lars Skjærven wrote:
>
>> Using KafkaSource builder with a job parallelism larger than the number
>> of kafka partitions, the job is unable to checkpoint.
>>
>> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for
>> th
/#execution-checkpointing-checkpoints-after-tasks-finish-enabled
On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven wrote:
> Using KafkaSource builder with a job parallelism larger than the number of
> kafka partitions, the job is unable to checkpoint.
>
> With a job parallelism of 4, 3 of t
Using KafkaSource builder with a job parallelism larger than the number of
kafka partitions, the job is unable to checkpoint.
With a job parallelism of 4, 3 of the tasks are marked as FINISHED for the
kafka topic with one partition. For this reason checkpointing seems to be
disabled.
When using
On Tue, May 18, 2021 at 2:21 AM Alexey Trenikhun wrote:
> Hello,
>
> Is new KafkaSource/KafkaSourceBuilder ready to be used ? If so, is KafkaSource
> state compatible with legacy FlinkKafkaConsumer, for example if I replace
> FlinkKafkaConsumer
> by KafkaSource, will offsets
Found https://issues.apache.org/jira/browse/FLINK-22766
From: Alexey Trenikhun
Sent: Tuesday, May 25, 2021 3:25 PM
To: Ardhani Narasimha ; 陳樺威
; Flink User Mail List
Subject: Re: KafkaSource metrics
Looks like when KafkaSource is used instead of
Looks like when KafkaSource is used instead of FlinkKafkaConsumer, metrics
listed below are not available. Bug? Work in progress?
Thanks,
Alexey
From: Ardhani Narasimha
Sent: Monday, May 24, 2021 9:08 AM
To: 陳樺威
Cc: user
Subject: Re: KafkaSource metrics
Use
Email: renqs...@gmail.com
On May 25, 2021, 2:35 PM +0800, 陳樺威 , wrote:
> Hi Ardhani,
>
> Thanks for your kindly reply.
>
> Our team use your provided metrics before, but the metrics disappear after
> migrate to new KafkaSource.
>
> We initialize KafkaSource in following
Hi Ardhani,
Thanks for your kindly reply.
Our team use your provided metrics before, but the metrics disappear after
migrate to new KafkaSource.
We initialize KafkaSource in following code.
```
val consumer: KafkaSource[T] = KafkaSource.builder()
.setProperties(properties)
.setTopics(topic
mode makes any difference.
On Mon, May 24, 2021 at 7:44 PM 陳樺威 wrote:
> Hello,
>
> Our team tries to test reactive mode and replace FlinkKafkaConsumer with
> the new KafkaSource.
> But we can’t find the KafkaSource metrics list. Does anyone have any idea?
> In our case, we want
Hello,
Our team tries to test reactive mode and replace FlinkKafkaConsumer with
the new KafkaSource.
But we can’t find the KafkaSource metrics list. Does anyone have any idea?
In our case, we want to know the Kafka consume delay and consume rate.
Thanks,
Oscar
Hello,
Is new KafkaSource/KafkaSourceBuilder ready to be used ? If so, is KafkaSource
state compatible with legacy FlinkKafkaConsumer, for example if I replace
FlinkKafkaConsumer by KafkaSource, will offsets continue from what we had in
FlinkKafkaConsumer ?
Thanks,
Alexey
at I could use to guarantee the processing of a finite set of
> data via a KafkaSource (I.e. send finite records to Kafka, read from topic,
> process all records, apply assertion after processing).
>
> Any ideas/recommendations/workarounds would be greatly welcome and I’d be
> happy to share
1 - 100 of 106 matches
Mail list logo