????????????????????????????pyflink??????????????????????????????????????????????????????????????????????????????

????????????
1.??????
from pyflink.table import StreamTableEnvironment, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment
#from pyflink.dataset import ExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
#env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
#t_env = BatchTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')
t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size", '2')

t_env.register_function("add", udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT()))

t_env.connect(FileSystem().path('input\input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")
??????????????????????????????????
2.????????
Py4JJavaError                             Traceback (most recent call last) 
<ipython-input-5-4e5da54b7616&gt; in <module&gt;      35     
.insert_into('mySink')      36  ---&gt; 37 t_env.execute("tutorial_job") 
F:\Anaconda3\envs\pyflink\lib\site-packages\pyflink\table\table_environment.py 
in execute(self, job_name)    1055                       "use 
create_statement_set for multiple sinks.", DeprecationWarning)    1056         
self._before_execute() -&gt; 1057         return 
JobExecutionResult(self._j_tenv.execute(job_name))    1058     1059     def 
from_elements(self, elements, schema=None, verify_schema=True): 
F:\Anaconda3\envs\pyflink\lib\site-packages\py4j-0.10.8.1-py3.7.egg\py4j\java_gateway.py
 in __call__(self, *args)    1284         answer = 
self.gateway_client.send_command(command)    1285         return_value = 
get_return_value( -&gt; 1286             answer, self.gateway_client, 
self.target_id, self.name)    1287     1288         for temp_arg in temp_args: 
F:\Anaconda3\envs\pyflink\lib\site-packages\pyflink\util\exceptions.py in 
deco(*a, **kw)     145     def deco(*a, **kw):     146         try: --&gt; 147  
           return f(*a, **kw)     148         except Py4JJavaError as e:     
149             s = e.java_exception.toString() 
F:\Anaconda3\envs\pyflink\lib\site-packages\py4j-0.10.8.1-py3.7.egg\py4j\protocol.py
 in get_return_value(answer, gateway_client, target_id, name)     326           
      raise Py4JJavaError(     327                     "An error occurred while 
calling {0}{1}{2}.\n". --&gt; 328                     format(target_id, ".", 
name), value)     329             else:     330                 raise 
Py4JError( Py4JJavaError: An error occurred while calling o374.execute. : 
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.    
   at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
       at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1717)
  at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
    at 
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
         at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)       at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)     at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834) Caused by: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.    
  at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
        at 
org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
        at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
       at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
   at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
         at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
         at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
       at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
   at akka.dispatch.OnComplete.internal(Future.scala:264)  at 
akka.dispatch.OnComplete.internal(Future.scala:261)  at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)    at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)    at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)         at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
      at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)    at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)  at 
akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)     at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
      at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
      at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)   
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)   at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)         at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
         at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
         at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
        at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)       at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)    
     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
      at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)    
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)   
     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)   
     at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy      at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
       at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
      at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)       at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
       at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)        at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)        at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)   at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)  at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)  at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)  at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)  at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517)        at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)      at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)     at 
akka.actor.ActorCell.invoke(ActorCell.scala:561)     at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)      at 
akka.dispatch.Mailbox.run(Mailbox.scala:225)         at 
akka.dispatch.Mailbox.exec(Mailbox.scala:235)        ... 4 more Caused by: 
org.apache.flink.api.common.io.ParseException: Parsing error for column 1 of 
row '???1?71,98' originated by LongParser: NUMERIC_VALUE_ILLEGAL_CHARACTER.  at 
org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:182)
       at 
org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111) 
     at 
org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
        at 
org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)  
     at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:359)
     at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:326)
    at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:225)
     at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)  
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)       at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)         at 
java.base/java.lang.Thread.run(Thread.java:834)


???????????????????????????????????????????? ????????????????????????????
??????????????????????????????????????????????????????????????????

回复