Hi Govindarajan, you can broadcast the stream with debug logger information by calling `stream.broadcast`. Then every stream record should be send to all sub-tasks of the downstream operator.
Cheers, Till On Mon, Oct 3, 2016 at 5:13 PM, Govindarajan Srinivasaraghavan < govindragh...@gmail.com> wrote: > Hi Gordon, > > - I'm currently running my flink program on 1.2 SNAPSHOT with kafka source > and I have checkpoint enabled. When I look at the consumer offsets in kafka > it appears to be stagnant and there is a huge lag. But I can see my flink > program is in pace with kafka source in JMX metrics and outputs. Is there a > way to identify why the offsets are not committed to kafka? > > On which commit was your Kafka connector built? There was a recent change > to the offset committing for Kafka 0.9 consumer, so identifying the exact > commit will help clarify whether the recent change introduced any new > problems. Also, what is your configured checkpoint interval? When > checkpointing is enabled, the Kafka consumer only commits to Kafka when > checkpoints are completed. So, offsets in Kafka are not updated until the > next checkpoint is triggered. > > I am using 1.2-SNAPSHOT > 'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version: > '1.2-SNAPSHOT' > 'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version: > '1.2-SNAPSHOT' > > - In my current application we custom loggers for debugging purposes. > Let’s say we want to find what’s happening for a particular user, we fire > an api request to add the custom logger for that particular user and use it > for logging along the data path. Is there a way to achieve this in flink? > Are there any global mutable parameters that I can use to achieve this > functionality? > > I’m not sure if I understand the use case correctly, but it seems like you > will need to change configuration / behaviour of a specific Flink operator > at runtime. To my knowledge, the best way to do this in Flink right now is > to translate your original logging-trigger api requests to a stream of > events fed to Flink. This stream of events will then basically be changes > of your user logger behaviour, and your operators can change its logging > behaviour according to this stream. > > I can send the changes as streams, but I need this change for all the > operators in my pipeline. Instead of using coflatmap at each operator to > combine the streams, is there a way to send a change to all the operators? > > - Can I pass on state between operators? If I need the state stored on > previous operators, how can I fetch it? > I don’t think this is possible. > Fine, thanks. > > Thanks. > > On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi! >> >> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka >> source and I have checkpoint enabled. When I look at the consumer offsets >> in kafka it appears to be stagnant and there is a huge lag. But I can see >> my flink program is in pace with kafka source in JMX metrics and outputs. >> Is there a way to identify why the offsets are not committed to kafka? >> >> On which commit was your Kafka connector built? There was a recent change >> to the offset committing for Kafka 0.9 consumer, so identifying the exact >> commit will help clarify whether the recent change introduced any new >> problems. Also, what is your configured checkpoint interval? When >> checkpointing is enabled, the Kafka consumer only commits to Kafka when >> checkpoints are completed. So, offsets in Kafka are not updated until the >> next checkpoint is triggered. >> >> - In my current application we custom loggers for debugging purposes. >> Let’s say we want to find what’s happening for a particular user, we fire >> an api request to add the custom logger for that particular user and use it >> for logging along the data path. Is there a way to achieve this in flink? >> Are there any global mutable parameters that I can use to achieve this >> functionality? >> >> I’m not sure if I understand the use case correctly, but it seems like >> you will need to change configuration / behaviour of a specific Flink >> operator at runtime. To my knowledge, the best way to do this in Flink >> right now is to translate your original logging-trigger api requests to >> a stream of events fed to Flink. This stream of events will then basically >> be changes of your user logger behaviour, and your operators can change its >> logging behaviour according to this stream. >> >> - Can I pass on state between operators? If I need the state stored on >> previous operators, how can I fetch it? >> >> I don’t think this is possible. >> >> >> Best Regards, >> Gordon >> >> >> On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan ( >> govindragh...@gmail.com) wrote: >> >> Hi, >> >> >> >> I have few questions on how I need to model my use case in flink. Please >> advise. Thanks for the help. >> >> >> >> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka >> source and I have checkpoint enabled. When I look at the consumer offsets >> in kafka it appears to be stagnant and there is a huge lag. But I can see >> my flink program is in pace with kafka source in JMX metrics and outputs. >> Is there a way to identify why the offsets are not committed to kafka? >> >> >> >> - In my current application we custom loggers for debugging purposes. >> Let’s say we want to find what’s happening for a particular user, we fire >> an api request to add the custom logger for that particular user and use it >> for logging along the data path. Is there a way to achieve this in flink? >> Are there any global mutable parameters that I can use to achieve this >> functionality? >> >> >> >> - Can I pass on state between operators? If I need the state stored on >> previous operators, how can I fetch it? >> >> >> >> Thanks >> >> >