Hi Robert, Awesome, thanks for the fast turnaround!
Cheers, David On Fri, Jan 22, 2016 at 11:17 AM, Robert Metzger <rmetz...@apache.org> wrote: > Hi David, > > you are right. I'll fix the issue in this pull request: > https://github.com/apache/flink/pull/1541 > > I guess everything in your topology runs with a parallelism of 1? Running > it with a parallelism higher than 1 will also work around the issue > (because then the two Sinks are not executed in one Task). > > On Fri, Jan 22, 2016 at 4:56 PM, David Kim < > david....@braintreepayments.com> wrote: > >> Hi Robert, >> >> Thanks for the workaround. Unfortunately I think I found a bug in the >> code that controls the metrics logic. >> >> Should Boolean.getBoolean be Boolean.valueOf instead? >> >> >> https://github.com/apache/flink/blob/81320c1c7ee98b9a663998df51cc4d5aa73d9b2a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L192 >> >> Thanks! >> David >> >> On Fri, Jan 22, 2016 at 2:17 AM, Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> Hi David, >>> >>> thank you for reporting the issue. I'll look into it. In the meantime, >>> you can set "flink.disable-metrics" to "true" in the properties. This way, >>> you disable the metrics. >>> I'll probably have to introduce something like a client id to >>> differentiate between the producers. >>> >>> Robert >>> >>> On Thu, Jan 21, 2016 at 11:51 PM, David Kim < >>> david....@braintreepayments.com> wrote: >>> >>>> Hi Robert! >>>> >>>> Thanks for reaching out. I ran into an issue and wasn't sure if this >>>> was due to a misconfiguration on my end of if this is a real bug. I have >>>> one DataStream and I'm sinking to two different kafka sinks. When the job >>>> starts, I run into this error: >>>> >>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>> failed. >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) >>>> at >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>>> at >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: java.lang.UnsupportedOperationException: The accumulator >>>> 'producer-record-retry-rate' already exists and cannot be added. >>>> at >>>> org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204) >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> The particular accumulator the exception is complains about changes, >>>> meaning it's not always 'producer-record-retry-rate' -- most likely due to >>>> the non-deterministic ordering of the collection. Any guidance appreciated! >>>> >>>> I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08. >>>> >>>> The flink code looks something like this: >>>> >>>> >>>> val stream: DataStream[Foo] = ... >>>> >>>> val kafkaA = new FlinkKafkaProducer08[Foo]... >>>> >>>> val kafkaB = new FlinkKafkaProducer08[Foo]... >>>> >>>> >>>> stream >>>> .addSink(kafkaA) >>>> >>>> stream. >>>> .addSink(kafkaB) >>>> >>>> >>>> Thanks, >>>> David >>>> >>>> On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> >>>>> I've now merged the pull request. DeserializationSchema.isEndOfStream() >>>>> should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors. >>>>> >>>>> Please let me know if the updated code has any issues. I'll fix the >>>>> issues asap. >>>>> >>>>> On Wed, Jan 13, 2016 at 5:06 PM, David Kim < >>>>> david....@braintreepayments.com> wrote: >>>>> >>>>>> Thanks Robert! I'll be keeping tabs on the PR. >>>>>> >>>>>> Cheers, >>>>>> David >>>>>> >>>>>> On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <metrob...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi David, >>>>>>> >>>>>>> In theory isEndOfStream() is absolutely the right way to go for >>>>>>> stopping data sources in Flink. >>>>>>> That its not working as expected is a bug. I have a pending pull >>>>>>> request for adding a Kafka 0.9 connector, which fixes this issue as well >>>>>>> (for all supported Kafka versions). >>>>>>> >>>>>>> Sorry for the inconvenience. If you want, you can check out the >>>>>>> branch of the PR and build Flink yourself to get the fix. >>>>>>> I hope that I can merge the connector to master this week, then, the >>>>>>> fix will be available in 1.0-SNAPSHOT as well. >>>>>>> >>>>>>> Regards, >>>>>>> Robert >>>>>>> >>>>>>> >>>>>>> >>>>>>> Sent from my iPhone >>>>>>> >>>>>>> On 11.01.2016, at 21:39, David Kim <david....@braintreepayments.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hello all, >>>>>>> >>>>>>> I saw that DeserializationSchema has an API "isEndOfStream()". >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java >>>>>>> >>>>>>> Can *isEndOfStream* be utilized to somehow terminate a streaming >>>>>>> flink job? >>>>>>> >>>>>>> I was under the impression that if we return "true" we can control >>>>>>> when a stream can close. The use case I had in mind was controlling when >>>>>>> unit/integration tests would terminate a flink job. We can rely on the >>>>>>> fact >>>>>>> that a test/spec would know how many items it expects to consume and >>>>>>> then >>>>>>> switch *isEndOfStream* to return true. >>>>>>> >>>>>>> Am I misunderstanding the intention for *isEndOfStream*? >>>>>>> >>>>>>> I also set a breakpoint on *isEndOfStream* and saw that it never >>>>>>> was hit when using "FlinkKafkaConsumer082" to pass in a >>>>>>> DeserializationSchema implementation. >>>>>>> >>>>>>> Currently testing on 1.0-SNAPSHOT. >>>>>>> >>>>>>> Cheers! >>>>>>> David >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Note: this information is confidential. It is prohibited to share, >>>>>> post online or otherwise publicize without Braintree's prior written >>>>>> consent. >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Note: this information is confidential. It is prohibited to share, post >>>> online or otherwise publicize without Braintree's prior written consent. >>>> >>> >>> >> >> >> -- >> Note: this information is confidential. It is prohibited to share, post >> online or otherwise publicize without Braintree's prior written consent. >> > > -- Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.