Yep, but

"Consider this example: if you had a Kafka Consumer that was consuming from
topic A, you did a savepoint, then changed your Kafka consumer to instead
consume from topic B, and then restarted your job from the savepoint.
Before this change, your consumer would now consume from both topic A
and B because
it was stored in state that the consumer was consuming from topic A. With
the change, your consumer would only consume from topic B after restore
because it now filters the topics that are stored in state using the
*configured
topics*."

Yep, the issue is that our pattern is all inclusive and thus *configured
topics *implies all topics for the all inclusive pattern and thus all of
our 1500 plus topics . We use flink to push all data in all topics to
hdfs.  We now are in the process of pruning our topics ( and as you can
imagine, they do not follow a specific pattern on name )  and thus removal
of some 500 of them. We would have assumed that removing the topics from
the cluster would be an indication to flink that though it has state
associated with those topics and irrespective of pattern,  the topics
should no longer be consumed, ver akin to auto discovery of partitions (
and thus topics ) but opposite.

Does that make sense ?








On Fri, Jun 14, 2019 at 3:24 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Visha,
>
> If I remember correctly, the behavior of the Kafka consumer was changed in
> Flink 1.8 to account for such situations.
> Please check the release notes [1] and the corresponding Jira issue [2].
>
> If this is not the behavior you need, please feel free to create a new
> Jira issue and start a discussion there.
>
> Thanks, Fabian
>
> [1]
> https://flink.apache.org/news/2019/04/09/release-1.8.0.html#important-changes
> [2] https://issues.apache.org/jira/browse/FLINK-10342
>
> Am Do., 13. Juni 2019 um 22:31 Uhr schrieb Vishal Santoshi <
> vishal.santo...@gmail.com>:
>
>> I guess, adjusting the pattern  ( blacklisting the topic/s ) would
>> work....
>>
>> On Thu, Jun 13, 2019 at 3:02 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Given
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L471
>>>
>>> it seems that if
>>>
>>> *  I have a regex pattern for consuming a bunch of topics
>>> * auto create is turned on
>>>
>>> then even if I do this
>>>
>>> * suspend the pipe with SP
>>> * delete the topic
>>> * start the pipe
>>>
>>> It will recreate the topic, b'coz the state has it and the check is
>>> against the pattern rather then against the cluster metadata....in case
>>> auto create is turned off it will throw an exception ( I think )
>>>
>>>
>>> The question thus is, as to how can we safely delete a topic from our
>>> kafka cluster without either a recreation ( auto create on ) or exception (
>>> auto create off ) when restarting from a SP, where the restore  state had
>>> that topic..
>>>
>>> Thanks much,
>>>
>>> Visha.
>>>
>>>
>>>

Reply via email to