Hi Robert,

Awesome, thanks for the fast turnaround!

Cheers,
David

On Fri, Jan 22, 2016 at 11:17 AM, Robert Metzger <rmetz...@apache.org>
wrote:

> Hi David,
>
> you are right. I'll fix the issue in this pull request:
> https://github.com/apache/flink/pull/1541
>
> I guess everything in your topology runs with a parallelism of 1? Running
> it with a parallelism higher than 1 will also work around the issue
> (because then the two Sinks are not executed in one Task).
>
> On Fri, Jan 22, 2016 at 4:56 PM, David Kim <
> david....@braintreepayments.com> wrote:
>
>> Hi Robert,
>>
>> Thanks for the workaround. Unfortunately I think I found a bug in the
>> code that controls the metrics logic.
>>
>> Should Boolean.getBoolean be Boolean.valueOf instead?
>>
>>
>> https://github.com/apache/flink/blob/81320c1c7ee98b9a663998df51cc4d5aa73d9b2a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L192
>>
>> Thanks!
>> David
>>
>> On Fri, Jan 22, 2016 at 2:17 AM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hi David,
>>>
>>> thank you for reporting the issue. I'll look into it. In the meantime,
>>> you can set "flink.disable-metrics" to "true" in the properties. This way,
>>> you disable the metrics.
>>> I'll probably have to introduce something like a client id to
>>> differentiate between the producers.
>>>
>>> Robert
>>>
>>> On Thu, Jan 21, 2016 at 11:51 PM, David Kim <
>>> david....@braintreepayments.com> wrote:
>>>
>>>> Hi Robert!
>>>>
>>>> Thanks for reaching out. I ran into an issue and wasn't sure if this
>>>> was due to a misconfiguration on my end of if this is a real bug. I have
>>>> one DataStream and I'm sinking to two different kafka sinks. When the job
>>>> starts, I run into this error:
>>>>
>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution
>>>> failed.
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>>>> at
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>> at
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: java.lang.UnsupportedOperationException: The accumulator
>>>> 'producer-record-retry-rate' already exists and cannot be added.
>>>> at
>>>> org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204)
>>>> at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>> The particular accumulator the exception is complains about changes,
>>>> meaning it's not always 'producer-record-retry-rate' -- most likely due to
>>>> the non-deterministic ordering of the collection. Any guidance appreciated!
>>>>
>>>> I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08.
>>>>
>>>> The flink code looks something like this:
>>>>
>>>>
>>>> val stream: DataStream[Foo] = ...
>>>>
>>>> val kafkaA = new FlinkKafkaProducer08[Foo]...
>>>>
>>>> val kafkaB = new FlinkKafkaProducer08[Foo]...
>>>>
>>>>
>>>> stream
>>>>   .addSink(kafkaA)
>>>>
>>>> stream.
>>>>   .addSink(kafkaB)
>>>>
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>> On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <rmetz...@apache.org>
>>>> wrote:
>>>>
>>>>> I've now merged the pull request. DeserializationSchema.isEndOfStream()
>>>>> should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors.
>>>>>
>>>>> Please let me know if the updated code has any issues. I'll fix the
>>>>> issues asap.
>>>>>
>>>>> On Wed, Jan 13, 2016 at 5:06 PM, David Kim <
>>>>> david....@braintreepayments.com> wrote:
>>>>>
>>>>>> Thanks Robert! I'll be keeping tabs on the PR.
>>>>>>
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>>> On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <metrob...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> In theory isEndOfStream() is absolutely the right way to go for
>>>>>>> stopping data sources in Flink.
>>>>>>> That its not working as expected is a bug. I have a pending pull
>>>>>>> request for adding a Kafka 0.9 connector, which fixes this issue as well
>>>>>>> (for all supported Kafka versions).
>>>>>>>
>>>>>>> Sorry for the inconvenience. If you want, you can check out the
>>>>>>> branch of the PR and build Flink yourself to get the fix.
>>>>>>> I hope that I can merge the connector to master this week, then, the
>>>>>>> fix will be available in 1.0-SNAPSHOT as well.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Robert
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Sent from my iPhone
>>>>>>>
>>>>>>> On 11.01.2016, at 21:39, David Kim <david....@braintreepayments.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I saw that DeserializationSchema has an API "isEndOfStream()".
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
>>>>>>>
>>>>>>> Can *isEndOfStream* be utilized to somehow terminate a streaming
>>>>>>> flink job?
>>>>>>>
>>>>>>> I was under the impression that if we return "true" we can control
>>>>>>> when a stream can close. The use case I had in mind was controlling when
>>>>>>> unit/integration tests would terminate a flink job. We can rely on the 
>>>>>>> fact
>>>>>>> that a test/spec would know how many items it expects to consume and 
>>>>>>> then
>>>>>>> switch *isEndOfStream* to return true.
>>>>>>>
>>>>>>> Am I misunderstanding the intention for *isEndOfStream*?
>>>>>>>
>>>>>>> I also set a breakpoint on *isEndOfStream* and saw that it never
>>>>>>> was hit when using "FlinkKafkaConsumer082" to pass in a
>>>>>>> DeserializationSchema implementation.
>>>>>>>
>>>>>>> Currently testing on 1.0-SNAPSHOT.
>>>>>>>
>>>>>>> Cheers!
>>>>>>> David
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Note: this information is confidential. It is prohibited to share,
>>>>>> post online or otherwise publicize without Braintree's prior written
>>>>>> consent.
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Note: this information is confidential. It is prohibited to share, post
>>>> online or otherwise publicize without Braintree's prior written consent.
>>>>
>>>
>>>
>>
>>
>> --
>> Note: this information is confidential. It is prohibited to share, post
>> online or otherwise publicize without Braintree's prior written consent.
>>
>
>


-- 
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.

Reply via email to