My pipeline utilizes the Combine.perKey transform and I would like to add
withHotKeyFanout to prevent the combine from being a bottleneck. To test I made
sure that my mergeAccumulators function was correct and added
withHotKeyFanout(2) to my Combine transform.
When I launch the pipeline with flink(v1.3.2) the pipeline only lasts for a
minute or so until I am greeted with this stacktrace:
java.lang.RuntimeException: Exception occurred while processing valve output
watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
...7 more
Caused by: org.apache.beam.sdk.util.UserCodeException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
at
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:74)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:113)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:758)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:527)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:496)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
... 7 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
... 15 more
Caused by: org.apache.avro.AvroRuntimeException: Array data must be a
Collection or Array
at
org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:70)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:156)
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:64)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:879)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:856)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:87)
at
org.apache.beam.runners.core.ReduceFnRunner$2.output(ReduceFnRunner.java:1061)
at
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:428)
at
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:124)
at
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1066)
at
org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:779)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:134)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
at
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:74)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:113)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:758)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:527)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:496)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Is this occurring because a key only has one value associated with it?
This e-mail message and any attachments to it are intended only for the named
recipients and may contain legally privileged and/or confidential information.
If you are not one of the intended recipients, do not duplicate or forward this
e-mail message.