Hi,
 根据你提供的堆栈,真正报错的堆栈如下:
 `Parsing error for column 1 of row '锘�1,98' originated by LongParser: 
NUMERIC_VALUE_ILLEGAL_CHARACTER.`
这是因为不能将数据转换为 long 类型,故你可以对应字段定义为 varchar。
Best,
Hailong Wang.
在 2020-10-25 17:53:29,"洗你的头" <[email protected]> 写道:
>尊敬的开发者您好:我在新使用pyflink时,跑通了简单的单词统计的例子,但是在运行求和的例子时报错了,我不知道如何解决
>
>具体内容为:
>1.源代码
>from&nbsp;pyflink.table&nbsp;import&nbsp;StreamTableEnvironment,&nbsp;DataTypes,&nbsp;BatchTableEnvironment
>from&nbsp;pyflink.table.descriptors&nbsp;import&nbsp;Schema,&nbsp;OldCsv,&nbsp;FileSystem
>from&nbsp;pyflink.table.udf&nbsp;import&nbsp;udf
>from&nbsp;pyflink.datastream&nbsp;import&nbsp;StreamExecutionEnvironment
>#from&nbsp;pyflink.dataset&nbsp;import&nbsp;ExecutionEnvironment
>
>env&nbsp;=&nbsp;StreamExecutionEnvironment.get_execution_environment()
>#env&nbsp;=&nbsp;ExecutionEnvironment.get_execution_environment()
>env.set_parallelism(1)
>t_env&nbsp;=&nbsp;StreamTableEnvironment.create(env)
>#t_env&nbsp;=&nbsp;BatchTableEnvironment.create(env)
>t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",&nbsp;'true')
>t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",&nbsp;'2')
>
>t_env.register_function("add",&nbsp;udf(lambda&nbsp;i,&nbsp;j:&nbsp;i&nbsp;+&nbsp;j,&nbsp;[DataTypes.BIGINT(),&nbsp;DataTypes.BIGINT()],&nbsp;DataTypes.BIGINT()))
>
>t_env.connect(FileSystem().path('input\input'))&nbsp;\
>&nbsp;&nbsp;&nbsp;&nbsp;.with_format(OldCsv()
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.field('a',&nbsp;DataTypes.BIGINT())
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.field('b',&nbsp;DataTypes.BIGINT()))&nbsp;\
>&nbsp;&nbsp;&nbsp;&nbsp;.with_schema(Schema()
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.field('a',&nbsp;DataTypes.BIGINT())
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.field('b',&nbsp;DataTypes.BIGINT()))&nbsp;\
>&nbsp;&nbsp;&nbsp;&nbsp;.create_temporary_table('mySource')
>
>t_env.connect(FileSystem().path('output'))&nbsp;\
>&nbsp;&nbsp;&nbsp;&nbsp;.with_format(OldCsv()
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.field('sum',&nbsp;DataTypes.BIGINT()))&nbsp;\
>&nbsp;&nbsp;&nbsp;&nbsp;.with_schema(Schema()
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.field('sum',&nbsp;DataTypes.BIGINT()))&nbsp;\
>&nbsp;&nbsp;&nbsp;&nbsp;.create_temporary_table('mySink')
>
>t_env.from_path('mySource')\
>&nbsp;&nbsp;&nbsp;&nbsp;.select("add(a,&nbsp;b)")&nbsp;\
>&nbsp;&nbsp;&nbsp;&nbsp;.insert_into('mySink')
>
>t_env.execute("tutorial_job")
>当我运行此代码时,会报出如下错误:
>2.错误信息
>Py4JJavaError                             Traceback (most recent call last) 
><ipython-input-5-4e5da54b7616&gt; in <module&gt;      35     
>.insert_into('mySink')      36  ---&gt; 37 t_env.execute("tutorial_job") 
>F:\Anaconda3\envs\pyflink\lib\site-packages\pyflink\table\table_environment.py 
>in execute(self, job_name)    1055                       "use 
>create_statement_set for multiple sinks.", DeprecationWarning)    1056         
>self._before_execute() -&gt; 1057         return 
>JobExecutionResult(self._j_tenv.execute(job_name))    1058     1059     def 
>from_elements(self, elements, schema=None, verify_schema=True): 
>F:\Anaconda3\envs\pyflink\lib\site-packages\py4j-0.10.8.1-py3.7.egg\py4j\java_gateway.py
> in __call__(self, *args)    1284         answer = 
>self.gateway_client.send_command(command)    1285         return_value = 
>get_return_value( -&gt; 1286             answer, self.gateway_client, 
>self.target_id, self.name)    1287     1288         for temp_arg in temp_args: 
>F:\Anaconda3\envs\pyflink\lib\site-packages\pyflink\util\exceptions.py in 
>deco(*a, **kw)     145     def deco(*a, **kw):     146         try: --&gt; 147 
>            return f(*a, **kw)     148         except Py4JJavaError as e:     
>149             s = e.java_exception.toString() 
>F:\Anaconda3\envs\pyflink\lib\site-packages\py4j-0.10.8.1-py3.7.egg\py4j\protocol.py
> in get_return_value(answer, gateway_client, target_id, name)     326          
>       raise Py4JJavaError(     327                     "An error occurred 
>while calling {0}{1}{2}.\n". --&gt; 328                     format(target_id, 
>".", name), value)     329             else:     330                 raise 
>Py4JError( Py4JJavaError: An error occurred while calling o374.execute. : 
>java.util.concurrent.ExecutionException: 
>org.apache.flink.runtime.client.JobExecutionException: Job execution failed.   
>   at 
>java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>       at 
>java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>    at 
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1717)
>  at 
>org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>    at 
>org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
>         at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>Method)       at 
>java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
>java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at 
>org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>      at 
>org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>        at 
>org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)     
>at 
>org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>      at 
>org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>    at 
>org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>    at java.base/java.lang.Thread.run(Thread.java:834) Caused by: 
>org.apache.flink.runtime.client.JobExecutionException: Job execution failed.   
>   at 
>org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>        at 
>org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
>        at 
>java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>        at 
>java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>    at 
>java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
>       at 
>org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>   at 
>java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>         at 
>java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>         at 
>java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>    at 
>java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
>       at 
>org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
>   at akka.dispatch.OnComplete.internal(Future.scala:264)  at 
>akka.dispatch.OnComplete.internal(Future.scala:261)  at 
>akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)    at 
>akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)    at 
>scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)         at 
>org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>      at 
>scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)    
>at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)     at 
>akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>      at 
>akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>      at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)   
>at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)   at 
>scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)         at 
>akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>         at 
>akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>         at 
>akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>        at 
>akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>        at 
>scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)       
>at 
>akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)   
>      at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)        
>at 
>akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>      at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)    
>at 
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)  
>      at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>       at 
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
>Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
>NoRestartBackoffTimeStrategy      at 
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>       at 
>org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>     at 
>org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>     at 
>org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
>        at 
>org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
>      at 
>org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
>    at 
>org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
>    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>Method)       at 
>java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
>java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at 
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>    at 
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>       at 
>org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>    at 
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)        at 
>akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)        at 
>scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)   at 
>akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)  at 
>scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)  at 
>scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)  at 
>scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)  at 
>akka.actor.Actor$class.aroundReceive(Actor.scala:517)        at 
>akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)      at 
>akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)     at 
>akka.actor.ActorCell.invoke(ActorCell.scala:561)     at 
>akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)      at 
>akka.dispatch.Mailbox.run(Mailbox.scala:225)         at 
>akka.dispatch.Mailbox.exec(Mailbox.scala:235)        ... 4 more Caused by: 
>org.apache.flink.api.common.io.ParseException: Parsing error for column 1 of 
>row '锘�1,98' originated by LongParser: NUMERIC_VALUE_ILLEGAL_CHARACTER.      
>at 
>org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:182)
>       at 
>org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>      at 
>org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>        at 
>org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79) 
>      at 
>org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:359)
>     at 
>org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:326)
>    at 
>org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:225)
>     at 
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) 
>     at 
>org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
>     at 
>org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
>  at 
>org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>  at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>      at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)       
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)         at 
>java.base/java.lang.Thread.run(Thread.java:834)
>
>
>我该如何解决呢?(或者您可以推荐几个我在中国 国内比较活跃的答疑社区吗?)
>因这种可能较简单的问题打扰到您,万分歉意,祝您工作愉快,身体健康!

回复