[ https://issues.apache.org/jira/browse/FLINK-22900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-22900: ----------------------------------- Labels: auto-deprioritized-major auto-deprioritized-minor auto-unassigned pull-request-available (was: auto-deprioritized-major auto-deprioritized-minor auto-unassigned) > flink 1.11.2 fileSystem source table read fileSystem sink table path > multi-partition error > ------------------------------------------------------------------------------------------- > > Key: FLINK-22900 > URL: https://issues.apache.org/jira/browse/FLINK-22900 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem > Affects Versions: 1.11.2 > Environment: 1.The error code is in FileSystemTableSource > {code:java} > public List<Map<String, String>> getPartitions() { > try { > return (List) > PartitionPathUtils.searchPartSpecAndPaths(this.path.getFileSystem(), > this.path, this.partitionKeys.size()).stream().map((tuple2) -> { > return (LinkedHashMap) tuple2.f0; > }).map((spec) -> { > LinkedHashMap<String, String> ret = new LinkedHashMap(); > spec.forEach((k, v) -> > { > String var10000 = (String) ret.put(k, > this.defaultPartName.equals(v) ? null : v); > }); > return ret; > }).collect(Collectors.toList()); > } catch (Exception var2) { > throw new TableException("Fetch partitions fail.", var2); > } > } > > {code} > > 2.searchPartSpecAndPaths > > {code:java} > public static List<Tuple2<LinkedHashMap<String, String>, Path>> > searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) { > FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, > fs); > //eg: generatedParts > // hdfs://xxx-hdfs/merge/all/.staging_1622167234684/cp-0 > // hdfs://xxx-hdfs/merge/all/dayno=20210531/hour=11 > List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList(); > FileStatus[] var5 = generatedParts; > int var6 = generatedParts.length; > for (int var7 = 0; var7 < var6; ++var7) { > FileStatus part = var5[var7]; > if (!isHiddenFile(part)) { > ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()), > part.getPath())); > } > } > return ret; > } > {code} > > 3.isHiddenFile reads staging_1622167234684/cp-0 and then an error is > reported,so I suggest to judge the number of partitions at the same time to > ensure the availability of the directory > {code:java} > public static List<Tuple2<LinkedHashMap<String, String>, Path>> > searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) > {//根据分去字段个数递归获得分区目录 > FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, > fs); > List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList(); > for (FileStatus part : generatedParts) { > if (isHiddenFile(part)) { > continue; > } > LinkedHashMap<String, String>,Path > fullPartSpec = > extractPartitionSpecFromPath(part.getPath()); > if (fullPartSpec.size == partitionNumber) { > ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()), > part.getPath())); > } > } > return ret; > } > {code} > Reporter: bigdataf > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > auto-unassigned, pull-request-available > Attachments: image-2021-06-08-19-55-59-174.png > > > eg: > Create create table source_test(id string,name string dayno sring,`hour` > string) partitioned (dayno ,`hour`) > with('connector'='filesystm',path='xxxxx/data/') based on flink filesystem > connect > 1.Stack error > {code:java} > // > ava.lang.reflect.InvocationTargetException at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) at > com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:66) > Caused by: java.util.NoSuchElementException: key not found: hour at > scala.collection.MapLike$class.default(MapLike.scala:228) at > scala.collection.AbstractMap.default(Map.scala:59) at > scala.collection.MapLike$class.apply(MapLike.scala:141) at > scala.collection.AbstractMap.apply(Map.scala:59) at > org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:155) > at > org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:153) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at > org.apache.flink.table.planner.plan.utils.PartitionPruner$.org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow(PartitionPruner.scala:153) > at > org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:130) > at > org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:129) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(PartitionPruner.scala:129) > at > org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.internalPartitionPrune$1(PushPartitionIntoLegacyTableSourceScanRule.scala:134) > at > org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.onMatch(PushPartitionIntoLegacyTableSourceScanRule.scala:144) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.immutable.Range.foreach(Range.scala:160) at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) > at scala.collection.immutable.List.foreach(List.scala:381) at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > at > org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) > at > com.oppo.recdata.datapipe.flink.table.FlinkTableExecution.start(FlinkTableExecution.java:52) > at com.oppo.recdata.datapipe.Datapipe.entryPoint(Datapipe.java:110) at > com.oppo.recdata.datapipe.Datapipe.run(Datapipe.java:48) at > com.oppo.recdata.datapipe.DatapipeFlink.main(DatapipeFlink.java:13) ... 5 > more{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)