Hi Govindarajan,

you can broadcast the stream with debug logger information by calling
`stream.broadcast`. Then every stream record should be send to all
sub-tasks of the downstream operator.

Cheers,
Till

On Mon, Oct 3, 2016 at 5:13 PM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Hi Gordon,
>
> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
> and I have checkpoint enabled. When I look at the consumer offsets in kafka
> it appears to be stagnant and there is a huge lag. But I can see my flink
> program is in pace with kafka source in JMX metrics and outputs. Is there a
> way to identify why the offsets are not committed to kafka?
>
> On which commit was your Kafka connector built? There was a recent change
> to the offset committing for Kafka 0.9 consumer, so identifying the exact
> commit will help clarify whether the recent change introduced any new
> problems. Also, what is your configured checkpoint interval? When
> checkpointing is enabled, the Kafka consumer only commits to Kafka when
> checkpoints are completed. So, offsets in Kafka are not updated until the
> next checkpoint is triggered.
>
> I am using 1.2-SNAPSHOT
> 'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version:
> '1.2-SNAPSHOT'
> 'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version:
> '1.2-SNAPSHOT'
>
> - In my current application we custom loggers for debugging purposes.
> Let’s say we want to find what’s happening for a particular user, we fire
> an api request to add the custom logger for that particular user and use it
> for logging along the data path. Is there a way to achieve this in flink?
> Are there any global mutable parameters that I can use to achieve this
> functionality?
>
> I’m not sure if I understand the use case correctly, but it seems like you
> will need to change configuration / behaviour of a specific Flink operator
> at runtime. To my knowledge, the best way to do this in Flink right now is
> to translate your original logging-trigger api requests to a stream of
> events fed to Flink. This stream of events will then basically be changes
> of your user logger behaviour, and your operators can change its logging
> behaviour according to this stream.
>
> I can send the changes as streams, but I need this change for all the
> operators in my pipeline. Instead of using coflatmap at each operator to
> combine the streams, is there a way to send a change to all the operators?
>
> - Can I pass on state between operators? If I need the state stored on
> previous operators, how can I fetch it?
> I don’t think this is possible.
> Fine, thanks.
>
> Thanks.
>
> On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi!
>>
>> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka
>> source and I have checkpoint enabled. When I look at the consumer offsets
>> in kafka it appears to be stagnant and there is a huge lag. But I can see
>> my flink program is in pace with kafka source in JMX metrics and outputs.
>> Is there a way to identify why the offsets are not committed to kafka?
>>
>> On which commit was your Kafka connector built? There was a recent change
>> to the offset committing for Kafka 0.9 consumer, so identifying the exact
>> commit will help clarify whether the recent change introduced any new
>> problems. Also, what is your configured checkpoint interval? When
>> checkpointing is enabled, the Kafka consumer only commits to Kafka when
>> checkpoints are completed. So, offsets in Kafka are not updated until the
>> next checkpoint is triggered.
>>
>> - In my current application we custom loggers for debugging purposes.
>> Let’s say we want to find what’s happening for a particular user, we fire
>> an api request to add the custom logger for that particular user and use it
>> for logging along the data path. Is there a way to achieve this in flink?
>> Are there any global mutable parameters that I can use to achieve this
>> functionality?
>>
>> I’m not sure if I understand the use case correctly, but it seems like
>> you will need to change configuration / behaviour of a specific Flink
>> operator at runtime. To my knowledge, the best way to do this in Flink
>> right now is to translate your original logging-trigger api requests to
>> a stream of events fed to Flink. This stream of events will then basically
>> be changes of your user logger behaviour, and your operators can change its
>> logging behaviour according to this stream.
>>
>> - Can I pass on state between operators? If I need the state stored on
>> previous operators, how can I fetch it?
>>
>> I don’t think this is possible.
>>
>>
>> Best Regards,
>> Gordon
>>
>>
>> On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan (
>> govindragh...@gmail.com) wrote:
>>
>> Hi,
>>
>>
>>
>> I have few questions on how I need to model my use case in flink. Please
>> advise. Thanks for the help.
>>
>>
>>
>> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka
>> source and I have checkpoint enabled. When I look at the consumer offsets
>> in kafka it appears to be stagnant and there is a huge lag. But I can see
>> my flink program is in pace with kafka source in JMX metrics and outputs.
>> Is there a way to identify why the offsets are not committed to kafka?
>>
>>
>>
>> - In my current application we custom loggers for debugging purposes.
>> Let’s say we want to find what’s happening for a particular user, we fire
>> an api request to add the custom logger for that particular user and use it
>> for logging along the data path. Is there a way to achieve this in flink?
>> Are there any global mutable parameters that I can use to achieve this
>> functionality?
>>
>>
>>
>> - Can I pass on state between operators? If I need the state stored on
>> previous operators, how can I fetch it?
>>
>>
>>
>> Thanks
>>
>>
>

Reply via email to