xuyangzhong created FLINK-24291:
-----------------------------------

             Summary: Decimal precision is lost when deserializing
                 Key: FLINK-24291
                 URL: https://issues.apache.org/jira/browse/FLINK-24291
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: xuyangzhong


When added the test case following into FileSystemItCaseBase:
{code:java}
// create table
tableEnv.executeSql(
  s"""
     |create table test2 (
     |  c0 decimal(10,0), c1 int
     |) with (
     |  'connector' = 'filesystem',
     |  'path' = '/Users/zhongxuyang/test/test',
     |  'format' = 'testcsv'
     |)
   """.stripMargin
)

//test file content is:
//2113554011,1
//2113554022,2
{code}
and
{code:java}
// select sql
@Test
def myTest2(): Unit={
  check(
    "SELECT c0 FROM test2",
    Seq(
      row(2113554011),
      row(2113554022)
    ))
}
{code}
i got an exception :

java.lang.RuntimeException: Failed to fetch next 
resultjava.lang.RuntimeException: Failed to fetch next result at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
 at java.util.Iterator.forEachRemaining(Iterator.java:115) at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109) at 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300)
 at 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140)
 at 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:106)
 at 
org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase.check(BatchFileSystemITCaseBase.scala:46)
 at 
org.apache.flink.table.planner.runtime.FileSystemITCaseBase$class.myTest2(FileSystemITCaseBase.scala:128)
 at 
org.apache.flink.table.planner.runtime.batch.sql.BatchFileSystemITCaseBase.myTest2(BatchFileSystemITCaseBase.scala:33)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at 
org.junit.rules.RunRules.evaluate(RunRules.java:20) at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137) at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
 at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
 at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
 at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)Caused by: 
java.io.IOException: Failed to fetch job execution result at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ... 43 moreCaused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
 ... 45 moreCaused by: org.apache.flink.runtime.client.JobExecutionException: 
Job execution failed. at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
 at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
 at 
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) 
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:181)
 ... 45 moreCaused by: org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategy at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
 at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at 
akka.actor.Actor.aroundReceive(Actor.scala:537) at 
akka.actor.Actor.aroundReceive$(Actor.scala:535) at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at 
akka.actor.ActorCell.invoke(ActorCell.scala:548) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at 
akka.dispatch.Mailbox.run(Mailbox.scala:231) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:243) at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused
 by: java.lang.AssertionError at 
org.apache.flink.table.data.writer.AbstractBinaryWriter.writeDecimal(AbstractBinaryWriter.java:166)
 at 
org.apache.flink.table.data.writer.BinaryRowWriter.writeDecimal(BinaryRowWriter.java:27)
 at 
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:144) at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
 at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
 at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73) 
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)Process
 finished with exit code 255

 This is cause by the unexpected error in 
_AbstractBinaryWriter.writeDecimal.assert._ When i debug here, i found that the 
expected value in file 2113554011 will be read to 
2113554011.000000000000000000, the precision of which is 38  and cases the 
following judge is false. (value.precision() is 38 and precision is 10)
{code:java}
assert value == null || (value.precision() == precision)
{code}
 I think this is because when the source reads values from fileSystem, it will 
treat the decimal  as BigDecimal, and doesn't convert it to the precision we 
wished. Maybe it's a bug.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to