[ 
https://issues.apache.org/jira/browse/FLINK-24062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-24062:
-----------------------------------
    Labels: pull-request-available release-testing  (was: release-testing)

> Exception encountered during timer serialization in Python DataStream API 
> --------------------------------------------------------------------------
>
>                 Key: FLINK-24062
>                 URL: https://issues.apache.org/jira/browse/FLINK-24062
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.14.0
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available, release-testing
>             Fix For: 1.14.0
>
>
> For the following example:
> {code}
> ################################################################################
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #      http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> import argparse
> import logging
> import sys
> from pyflink.common import WatermarkStrategy, Encoder, Types
> from pyflink.datastream import StreamExecutionEnvironment, 
> RuntimeExecutionMode
> from pyflink.datastream.connectors import (FileSource, StreamFormat, 
> FileSink, OutputFileConfig,
>                                            RollingPolicy)
> word_count_data = ["To be, or not to be,--that is the question:--",
>                    "Whether 'tis nobler in the mind to suffer",
>                    "The slings and arrows of outrageous fortune",
>                    "Or to take arms against a sea of troubles,",
>                    "And by opposing end them?--To die,--to sleep,--",
>                    "No more; and by a sleep to say we end",
>                    "The heartache, and the thousand natural shocks",
>                    "That flesh is heir to,--'tis a consummation",
>                    "Devoutly to be wish'd. To die,--to sleep;--",
>                    "To sleep! perchance to dream:--ay, there's the rub;",
>                    "For in that sleep of death what dreams may come,",
>                    "When we have shuffled off this mortal coil,",
>                    "Must give us pause: there's the respect",
>                    "That makes calamity of so long life;",
>                    "For who would bear the whips and scorns of time,",
>                    "The oppressor's wrong, the proud man's contumely,",
>                    "The pangs of despis'd love, the law's delay,",
>                    "The insolence of office, and the spurns",
>                    "That patient merit of the unworthy takes,",
>                    "When he himself might his quietus make",
>                    "With a bare bodkin? who would these fardels bear,",
>                    "To grunt and sweat under a weary life,",
>                    "But that the dread of something after death,--",
>                    "The undiscover'd country, from whose bourn",
>                    "No traveller returns,--puzzles the will,",
>                    "And makes us rather bear those ills we have",
>                    "Than fly to others that we know not of?",
>                    "Thus conscience does make cowards of us all;",
>                    "And thus the native hue of resolution",
>                    "Is sicklied o'er with the pale cast of thought;",
>                    "And enterprises of great pith and moment,",
>                    "With this regard, their currents turn awry,",
>                    "And lose the name of action.--Soft you now!",
>                    "The fair Ophelia!--Nymph, in thy orisons",
>                    "Be all my sins remember'd."]
> def word_count(input_path, output_path):
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_runtime_mode(RuntimeExecutionMode.BATCH)
>     # write all the data to one file
>     env.set_parallelism(1)
>     # define the source
>     if input_path is not None:
>         ds = env.from_source(
>             
> source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
>                                                        input_path)
>                              .process_static_file_set().build(),
>             watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
>             source_name="file_source"
>         )
>     else:
>         print("Executing word_count example with default input data set.")
>         print("Use --input to specify file input.")
>         ds = env.from_collection(word_count_data)
>     def split(line):
>         yield from line.split()
>     # compute word count
>     ds = ds.flat_map(split) \
>            .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), 
> Types.INT()])) \
>            .key_by(lambda i: i[0]) \
>            .reduce(lambda i, j: (i[0], i[1] + j[1]))
>     # define the sink
>     if output_path is not None:
>         ds.sink_to(
>             sink=FileSink.for_row_format(
>                 base_path=output_path,
>                 encoder=Encoder.simple_string_encoder())
>             .with_output_file_config(
>                 OutputFileConfig.builder()
>                 .with_part_prefix("prefix")
>                 .with_part_suffix(".ext")
>                 .build())
>             .with_rolling_policy(RollingPolicy.default_rolling_policy())
>             .build()
>         )
>     else:
>         print("Printing result to stdout. Use --output to specify output 
> path.")
>         ds.print()
>     # submit for execution
>     env.execute()
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
>     parser = argparse.ArgumentParser()
>     parser.add_argument(
>         '--input',
>         dest='input',
>         required=False,
>         help='Input file to process.')
>     parser.add_argument(
>         '--output',
>         dest='output',
>         required=False,
>         help='Output file to write results to.')
>     argv = sys.argv[1:]
>     known_args, _ = parser.parse_known_args(argv)
>     word_count(known_args.input, known_args.output)
> {code}
> It will throw the following exception:
> {code}
> Traceback (most recent call last):
>   File "pyflink/examples/datastream/word_count.py", line 134, in <module>
>     word_count(known_args.input, known_args.output)
>   File "pyflink/examples/datastream/word_count.py", line 113, in word_count
>     env.execute()
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py",
>  line 691, in execute
>     return 
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/java_gateway.py",
>  line 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>  line 146, in deco
>     return f(*a, **kw)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/protocol.py",
>  line 328, in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o3.execute.
> : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>       at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>       at 
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>       at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
>       at akka.dispatch.OnComplete.internal(Future.scala:300)
>       at akka.dispatch.OnComplete.internal(Future.scala:297)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
>       at 
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
>       at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
>       at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
>       at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
>       at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
>       at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>       at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:441)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>       at akka.actor.Actor.aroundReceive(Actor.scala:537)
>       at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>       ... 4 more
> Caused by: java.lang.RuntimeException: Error while waiting for 
> BeamPythonFunctionRunner flush
>       at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:361)
>       at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processElementsOfCurrentKeyIfNeeded(AbstractPythonFunctionOperator.java:257)
>       at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.setCurrentKey(AbstractPythonFunctionOperator.java:246)
>       at 
> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.setCurrentKey(PythonKeyedProcessOperator.java:225)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:504)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:491)
>       at 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:229)
>       at 
> org.apache.flink.streaming.api.operators.sort.SortingDataInput.emitNextSortedRecord(SortingDataInput.java:207)
>       at 
> org.apache.flink.streaming.api.operators.sort.SortingDataInput.emitNext(SortingDataInput.java:187)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:489)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:819)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:746)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:728)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377)
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361)
>       at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2(AbstractPythonFunctionOperator.java:340)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.beam.sdk.coders.CoderException: java.io.EOFException
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>       at 
> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:73)
>       at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:542)
>       at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
>       ... 7 more
> Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
>       at 
> org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:104)
>       at 
> org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:90)
>       at 
> org.apache.beam.runners.core.construction.Timer$Coder.decode(Timer.java:195)
>       at 
> org.apache.beam.runners.core.construction.Timer$Coder.decode(Timer.java:158)
>       at 
> org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
>       at 
> org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:65)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:29)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:178)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:127)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       ... 3 more
> Caused by: java.io.EOFException
>       at org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73)
>       at org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:56)
>       at 
> org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:55)
>       at 
> org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100)
>       ... 20 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to