Hi Talat, I managed to turn your test case into something against Calcite. It looks like there is a bug affecting tables that contain one or more single element structs and no multi element structs. I've sent the details to the Calcite mailing list here. https://lists.apache.org/thread/tlr9hsmx09by79h91nwp2d4nv8jfwsto
I'm experimenting with ideas on how to work around this but a fix will likely require a Calcite upgrade, which is not something I'd have time to help with. (I'm not on the Google Beam team anymore.) Andrew On Wed, Feb 22, 2023 at 12:18 PM Talat Uyarer <tuya...@paloaltonetworks.com> wrote: > > Hi @Andrew Pilloud > > Sorry for the late response. Yes your test is working fine. I changed the > test input structure like our input structure. Now this test also has the > same exception. > > Feb 21, 2023 2:02:28 PM > org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel > INFO: SQL: > WITH `tempTable` AS (SELECT `panwRowTestTable`.`user_info`, > `panwRowTestTable`.`id`, `panwRowTestTable`.`value` > FROM `beam`.`panwRowTestTable` AS `panwRowTestTable` > WHERE `panwRowTestTable`.`user_info`.`name` = 'innerStr') (SELECT > `tempTable`.`user_info`, `tempTable`.`id`, `tempTable`.`value` > FROM `tempTable` AS `tempTable`) > Feb 21, 2023 2:02:28 PM > org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel > INFO: SQLPlan> > LogicalProject(user_info=[ROW($0)], id=[$1], value=[$2]) > LogicalFilter(condition=[=($0.name, 'innerStr')]) > LogicalProject(name=[$0.name], id=[$1], value=[$2]) > BeamIOSourceRel(table=[[beam, panwRowTestTable]]) > > > fieldList must not be null, type = VARCHAR > java.lang.AssertionError: fieldList must not be null, type = VARCHAR > > I dont know what is different from yours. I am sharing my version of the test > also. > > > Index: > sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java > IDEA additional info: > Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP > <+>UTF-8 > =================================================================== > diff --git > a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java > > b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java > --- > a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java > (revision fd383fae1adc545b6b6a22b274902cda956fec49) > +++ > b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java > (date 1677017032324) > @@ -54,6 +54,9 @@ > private static final Schema innerRowSchema = > > Schema.builder().addStringField("string_field").addInt64Field("long_field").build(); > > + private static final Schema innerPanwRowSchema = > + Schema.builder().addStringField("name").build(); > + > private static final Schema innerRowWithArraySchema = > Schema.builder() > .addStringField("string_field") > @@ -127,8 +130,12 @@ > .build())) > .put( > "basicRowTestTable", > - TestBoundedTable.of(FieldType.row(innerRowSchema), "col") > - > .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build())) > + TestBoundedTable.of(FieldType.row(innerRowSchema), "col", > FieldType.INT64, "field") > + > .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), > 1L)) > + .put( > + "panwRowTestTable", > + TestBoundedTable.of(FieldType.row(innerPanwRowSchema), > "user_info", FieldType.INT64, "id", FieldType.STRING, "value") > + > .addRows(Row.withSchema(innerRowSchema).addValues("name", 1L).build(), 1L, > "some_value")) > .put( > "rowWithArrayTestTable", > TestBoundedTable.of(FieldType.row(rowWithArraySchema), > "col") > @@ -219,6 +226,21 @@ > .build()); > pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); > } > + > + @Test > + public void testBasicRowWhereField() { > + BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider); > + PCollection<Row> stream = > + BeamSqlRelUtils.toPCollection( > + pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM > panwRowTestTable WHERE panwRowTestTable.`user_info`.`name` = 'innerStr') > SELECT * FROM tempTable")); > + Schema outputSchema = Schema.builder().addRowField("col", > innerRowSchema).addInt64Field("field").build(); > + PAssert.that(stream) > + .containsInAnyOrder( > + Row.withSchema(outputSchema) > + .addValues(Row.withSchema(innerRowSchema).addValues("name", > 1L).build(), 1L) > + .build()); > + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); > + } > > @Test > public void testArrayConstructor() { > > > > > On Fri, Feb 10, 2023 at 6:14 PM Andrew Pilloud <apill...@google.com> wrote: >> >> I have a test case that I believe should reproduce this on both head and >> 2.43 but it ends up with a different logical plan. Can you provide your >> input types? >> >> We have a class of issues around compex types >> https://github.com/apache/beam/issues/19009 I don't believe the >> "LogicalFilter(condition=[=($2.name, 'User1')])" particularly "$2.name" is >> something that works, in my test it seems that the planner has flattened the >> complex input and reproduced a ROW at the output. >> >> INFO: SQLPlan> >> LogicalProject(col=[ROW($0, $1)], field=[$2]) >> LogicalFilter(condition=[=($0, 'innerStr')]) >> LogicalProject(string_field=[$0.string_field], >> long_field=[$0.long_field], field=[$1]) >> BeamIOSourceRel(table=[[beam, basicRowTestTable]]) >> >> Feb 10, 2023 6:07:35 PM >> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel >> INFO: BEAMPlan> >> BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t0.string_field], >> expr#3=[$t0.long_field], expr#4=[ROW($t2, $t3)], >> expr#5=['innerStr':VARCHAR], expr#6=[=($t2, $t5)], col=[$t4], field=[$t1], >> $condition=[$t6]) >> BeamIOSourceRel(table=[[beam, basicRowTestTable]]) >> >> --- >> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java >> +++ >> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java >> @@ -127,8 +127,8 @@ public class BeamComplexTypeTest { >> .build())) >> .put( >> "basicRowTestTable", >> - TestBoundedTable.of(FieldType.row(innerRowSchema), "col") >> - >> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build())) >> + TestBoundedTable.of(FieldType.row(innerRowSchema), "col", >> FieldType.INT64, "field") >> + >> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), >> 1L)) >> .put( >> "rowWithArrayTestTable", >> TestBoundedTable.of(FieldType.row(rowWithArraySchema), >> "col") >> @@ -220,6 +220,21 @@ public class BeamComplexTypeTest { >> pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); >> } >> >> + @Test >> + public void testBasicRowWhereField() { >> + BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider); >> + PCollection<Row> stream = >> + BeamSqlRelUtils.toPCollection( >> + pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM >> basicRowTestTable WHERE basicRowTestTable.col.string_field = 'innerStr') >> SELECT * FROM tempTable")); >> + Schema outputSchema = Schema.builder().addRowField("col", >> innerRowSchema).addInt64Field("field").build(); >> + PAssert.that(stream) >> + .containsInAnyOrder( >> + Row.withSchema(outputSchema) >> + >> .addValues(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), >> 1L) >> + .build()); >> + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); >> + } >> + >> @Test >> public void testArrayConstructor() { >> BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider); >> >> >> On Fri, Feb 3, 2023 at 2:06 PM Talat Uyarer <tuya...@paloaltonetworks.com> >> wrote: >>> >>> Hi Andrew, >>> >>> Thank you for your MR. I am parricated to help us to solve the issue. I >>> rerun our tests and they are partially passing now with your fix. However, >>> there is one more issue with the WITH clause. >>> >>> When i run following query somehow beam lost type of column >>> >>> WITH tempTable AS (SELECT * FROM PCOLLECTION WHERE >>> PCOLLECTION.`user_info`.`name` = 'User1') SELECT * FROM tempTable >>> >>> I havent test on Beam Master. I run with your latest patch on our code >>> base. This is the output >>> >>> 14:00:30.095 [Test worker] INFO >>> o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL: >>> WITH `tempTable` AS (SELECT `PCOLLECTION`.`id`, `PCOLLECTION`.`value`, >>> `PCOLLECTION`.`user_info` >>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION` >>> WHERE `PCOLLECTION`.`user_info`.`name` = 'User1') (SELECT `tempTable`.`id`, >>> `tempTable`.`value`, `tempTable`.`user_info` >>> FROM `tempTable` AS `tempTable`) >>> 14:00:30.106 [Test worker] DEBUG >>> o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting >>> SqlNode to RelNode >>> LogicalProject(id=[$0], value=[$1], user_info=[$2]) >>> LogicalFilter(condition=[=($2.name, 'User1')]) >>> BeamIOSourceRel(table=[[beam, PCOLLECTION]]) >>> >>> 14:00:30.107 [Test worker] DEBUG >>> o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting >>> SqlNode to RelNode >>> LogicalProject(id=[$0], value=[$1], user_info=[$2]) >>> LogicalFilter(condition=[=($2.name, 'User1')]) >>> BeamIOSourceRel(table=[[beam, PCOLLECTION]]) >>> >>> 14:00:30.109 [Test worker] INFO >>> o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQLPlan> >>> LogicalProject(id=[$0], value=[$1], user_info=[ROW($2)]) >>> LogicalFilter(condition=[=($2.name, 'User1')]) >>> LogicalProject(id=[$0], value=[$1], name=[$2.name]) >>> BeamIOSourceRel(table=[[beam, PCOLLECTION]]) >>> >>> 14:00:30.173 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER = >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081; >>> COST = {inf} >>> 14:00:30.173 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule >>> [BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] rels [#27] >>> 14:00:30.173 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118: Apply rule >>> [BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE)] to >>> [rel#27:BeamIOSourceRel.BEAM_LOGICAL(table=[beam, PCOLLECTION])] >>> 14:00:30.174 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#41 >>> via BeamEnumerableConverterRule(in:BEAM_LOGICAL,out:ENUMERABLE) >>> 14:00:30.175 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#118 generated 1 >>> successors: >>> [rel#41:BeamEnumerableConverter.ENUMERABLE(input=BeamIOSourceRel#27)] >>> 14:00:30.175 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER = >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081; >>> COST = {inf} >>> 14:00:30.175 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule >>> [ProjectToCalcRule] rels [#33] >>> 14:00:30.175 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136: Apply rule >>> [ProjectToCalcRule] to >>> [rel#33:LogicalProject.NONE(input=RelSubset#32,inputs=0..1,exprs=[$2.name])] >>> 14:00:30.177 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Transform to: rel#44 >>> via ProjectToCalcRule >>> 14:00:30.178 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#136 generated 1 >>> successors: >>> [rel#44:LogicalCalc.NONE(input=RelSubset#32,expr#0..2={inputs},expr#3=$t2.name,proj#0..1={exprs},2=$t3)] >>> 14:00:30.178 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER = >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver@1c510081; >>> COST = {inf} >>> 14:00:30.178 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - Pop match: rule >>> [FilterToCalcRule] rels [#35] >>> 14:00:30.178 [Test worker] DEBUG >>> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - call#160: Apply rule >>> [FilterToCalcRule] to >>> [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition==($2.name, >>> 'User1'))] >>> >>> fieldList must not be null, type = VARCHAR >>> java.lang.AssertionError: fieldList must not be null, type = VARCHAR >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.type.RelDataTypeImpl.getFieldList(RelDataTypeImpl.java:164) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.checkValid(RexFieldAccess.java:76) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.<init>(RexFieldAccess.java:64) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:208) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:911) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:894) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:94) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:161) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:113) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:896) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:894) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall.accept(RexCall.java:189) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:302) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexProgramBuilder.addCondition(RexProgramBuilder.java:277) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.rules.FilterToCalcRule.onMatch(FilterToCalcRule.java:76) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:239) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:61) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317) >>> at >>> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:373) >>> at >>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:211) >>> at >>> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:112) >>> at >>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:171) >>> at >>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:110) >>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548) >>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499) >>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:373) >>> at >>> com.paloaltonetworks.cortex.streamcompute.filter.Filter.expand(Filter.java:126) >>> at >>> com.paloaltonetworks.cortex.streamcompute.filter.Filter.expand(Filter.java:49) >>> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548) >>> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499) >>> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:373) >>> at >>> com.paloaltonetworks.cortex.streamcompute.filter.WithClauseFilterComplexBulkTest.testIt(WithClauseFilterComplexBulkTest.java:149) >>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) >>> at >>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>> at >>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) >>> at >>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>> at >>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323) >>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) >>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) >>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) >>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413) >>> at org.junit.runners.Suite.runChild(Suite.java:128) >>> at org.junit.runners.Suite.runChild(Suite.java:27) >>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) >>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) >>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413) >>> at >>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) >>> at >>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) >>> at >>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) >>> at >>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) >>> at >>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) >>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) >>> at >>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) >>> at >>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) >>> at >>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) >>> at com.sun.proxy.$Proxy5.processTestClass(Unknown Source) >>> at >>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) >>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) >>> at >>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) >>> at >>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175) >>> at >>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157) >>> at >>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404) >>> at >>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63) >>> at >>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>> at >>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) >>> at java.base/java.lang.Thread.run(Thread.java:829) >>> >>> >>> >>> >>> >>> On Thu, Feb 2, 2023 at 1:06 PM Andrew Pilloud <apill...@google.com> wrote: >>>> >>>> It looks like Calcite stopped considering field names in RelNode equality >>>> as of Calcite 2.22 (which we use in Beam v2.34.0+). This can result in a >>>> planner state where two nodes that only differ by field name are >>>> considered equivalent. >>>> >>>> I have a fix for Beam in https://github.com/apache/beam/pull/25290 and >>>> I'll send an email to the Calcite dev list with more details. >>>> >>>> Andrew >>>> >>>> On Fri, Jan 27, 2023 at 11:33 AM Andrew Pilloud <apill...@google.com> >>>> wrote: >>>>> >>>>> Also this is at very least a Beam bug. You can file a Beam issue if you >>>>> want, otherwise I will when I get back. >>>>> >>>>> Andrew >>>>> >>>>> On Fri, Jan 27, 2023 at 11:27 AM Andrew Pilloud <apill...@google.com> >>>>> wrote: >>>>>> >>>>>> Hi Talat, >>>>>> >>>>>> I did get your test case running and added some logging to >>>>>> RexProgramBuilder.mergePrograms. There is only one merge that occurs >>>>>> during the test and it has an output type of RecordType(JavaType(int) >>>>>> ID, JavaType(class java.lang.String) V). This does seem like the correct >>>>>> output name but it doesn't match the final output name, so something is >>>>>> still different than the Beam test case. I also modified mergePrograms >>>>>> to purposely corrupt the output names, that did not cause the test to >>>>>> fail or trip the 'assert mergedProg.getOutputRowType() == >>>>>> topProgram.getOutputRowType();' in mergePrograms. I could not find any >>>>>> Calcite unit tests for RexProgramBuilder.mergePrograms or >>>>>> CoreRules.CALC_MERGE rule so I think it is still probable that the >>>>>> problem is in this area. >>>>>> >>>>>> One minor issue I encountered. It took me a while to get your test case >>>>>> running, it doesn't appear there are any calcite gradle rules to run >>>>>> CoreQuidemTest and constructing the classpath manually was tedious. Did >>>>>> I miss something? >>>>>> >>>>>> I'm still working on this but I'm out today and Monday, it will probably >>>>>> be Wednesday before I make any more progress. >>>>>> >>>>>> Andrew >>>>>> >>>>>> On Fri, Jan 27, 2023 at 10:40 AM Talat Uyarer >>>>>> <tuya...@paloaltonetworks.com> wrote: >>>>>>> >>>>>>> Hi Andrew, >>>>>>> >>>>>>> Yes This aligned also with my debugging. In My Kenn's reply you can see >>>>>>> a sql test which I wrote in Calcite. Somehow Calcite does not have this >>>>>>> issue with the 1.28 version. >>>>>>> >>>>>>> !use post >>>>>>> !set outputformat mysql >>>>>>> >>>>>>> #Test aliases with with clause >>>>>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, >>>>>>> "hr"."emps"."name" as v from "hr"."emps") >>>>>>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE >>>>>>> tempTable.v <> '11' ; >>>>>>> +-----+-----------+ >>>>>>> | ID | value | >>>>>>> +-----+-----------+ >>>>>>> | 100 | Bill | >>>>>>> | 110 | Theodore | >>>>>>> | 150 | Sebastian | >>>>>>> | 200 | Eric | >>>>>>> +-----+-----------+ >>>>>>> (4 rows) >>>>>>> >>>>>>> !ok >>>>>>> >>>>>>> >>>>>>> On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud <apill...@google.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> Yes, that worked. >>>>>>>> >>>>>>>> The issue does not occur if I disable all of the following planner >>>>>>>> rules: CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE, >>>>>>>> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE), and >>>>>>>> BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE). >>>>>>>> >>>>>>>> All the rules share a common call to RexProgramBuilder.mergePrograms, >>>>>>>> so I suspect the problem lies there. I spent some time looking but >>>>>>>> wasn't able to find it by code inspection, it looks like this code >>>>>>>> path is doing the right thing with names. I'll spend some time >>>>>>>> tomorrow trying to reproduce this on pure Calcite. >>>>>>>> >>>>>>>> Andrew >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer >>>>>>>> <tuya...@paloaltonetworks.com> wrote: >>>>>>>>> >>>>>>>>> Hi Andrew, >>>>>>>>> >>>>>>>>> Thanks for writing a test for this use case. Without Where clause it >>>>>>>>> works as expected on our test cases also too. Please add where clause >>>>>>>>> on second select. With the below query it does not return column >>>>>>>>> names. I tested on my local also. >>>>>>>>> >>>>>>>>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM >>>>>>>>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable >>>>>>>>> WHERE id > 1 >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud <apill...@google.com> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> +d...@beam.apache.org >>>>>>>>>> >>>>>>>>>> I tried reproducing this but was not successful, the output schema >>>>>>>>>> was as expected. I added the following to >>>>>>>>>> BeamSqlMultipleSchemasTest.java at head. (I did discover that >>>>>>>>>> PAssert.that(result).containsInAnyOrder(output) doesn't validate >>>>>>>>>> column names however.) >>>>>>>>>> >>>>>>>>>> @Test >>>>>>>>>> public void testSelectAs() { >>>>>>>>>> PCollection<Row> input = pipeline.apply(create(row(1, >>>>>>>>>> "strstr"))); >>>>>>>>>> >>>>>>>>>> PCollection<Row> result = >>>>>>>>>> input.apply(SqlTransform.query("WITH tempTable (id, v) AS >>>>>>>>>> (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS >>>>>>>>>> fout_int, v AS fout_string FROM tempTable")); >>>>>>>>>> >>>>>>>>>> Schema output_schema = >>>>>>>>>> >>>>>>>>>> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build(); >>>>>>>>>> assertThat(result.getSchema(), equalTo(output_schema)); >>>>>>>>>> >>>>>>>>>> Row output = Row.withSchema(output_schema).addValues(1, >>>>>>>>>> "strstr").build(); >>>>>>>>>> PAssert.that(result).containsInAnyOrder(output); >>>>>>>>>> pipeline.run(); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer >>>>>>>>>> <tuya...@paloaltonetworks.com> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Kenn, >>>>>>>>>>> >>>>>>>>>>> Thank you for replying back to my email. >>>>>>>>>>> >>>>>>>>>>> I was under the same impression about Calcite. But I wrote a test >>>>>>>>>>> on Calcite 1.28 too. It is working without issue that I see on BEAM >>>>>>>>>>> >>>>>>>>>>> Here is my test case. If you want you can also run on Calcite. >>>>>>>>>>> Please put under core/src/test/resources/sql as text file. and Run >>>>>>>>>>> CoreQuidemTest class. >>>>>>>>>>> >>>>>>>>>>> !use post >>>>>>>>>>> !set outputformat mysql >>>>>>>>>>> >>>>>>>>>>> #Test aliases with with clause >>>>>>>>>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, >>>>>>>>>>> "hr"."emps"."name" as v from "hr"."emps") >>>>>>>>>>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable >>>>>>>>>>> WHERE tempTable.v <> '11' ; >>>>>>>>>>> +-----+-----------+ >>>>>>>>>>> | ID | value | >>>>>>>>>>> +-----+-----------+ >>>>>>>>>>> | 100 | Bill | >>>>>>>>>>> | 110 | Theodore | >>>>>>>>>>> | 150 | Sebastian | >>>>>>>>>>> | 200 | Eric | >>>>>>>>>>> +-----+-----------+ >>>>>>>>>>> (4 rows) >>>>>>>>>>> >>>>>>>>>>> !ok >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles <k...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Looking at the code that turns a logical CalcRel into a >>>>>>>>>>>> BeamCalcRel I do not see any obvious cause for this: >>>>>>>>>>>> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69 >>>>>>>>>>>> >>>>>>>>>>>> I don't like to guess that upstream libraries have the bug, but in >>>>>>>>>>>> this case I wonder if the alias is lost in the Calcite optimizer >>>>>>>>>>>> rule for merging the projects and filters into a Calc. >>>>>>>>>>>> >>>>>>>>>>>> Kenn >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles <k...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I am not sure I understand the question, but I do see an issue. >>>>>>>>>>>>> >>>>>>>>>>>>> Context: "CalcRel" is an optimized relational operation that is >>>>>>>>>>>>> somewhat like ParDo, with a small snippet of a single-assignment >>>>>>>>>>>>> DSL embedded in it. Calcite will choose to merge all the projects >>>>>>>>>>>>> and filters into the node, and then generates Java bytecode to >>>>>>>>>>>>> directly execute the DSL. >>>>>>>>>>>>> >>>>>>>>>>>>> Problem: it looks like the CalcRel has output columns with >>>>>>>>>>>>> aliases "id" and "v" where it should have output columns with >>>>>>>>>>>>> aliases "id" and "value". >>>>>>>>>>>>> >>>>>>>>>>>>> Kenn >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay <al...@google.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Adding: @Andrew Pilloud @Kenneth Knowles >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user >>>>>>>>>>>>>> <user@beam.apache.org> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am using Beam 2.43 with Calcite SQL with Java. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a query with a WITH clause and some aliasing. Looks like >>>>>>>>>>>>>>> Beam Query optimizer after optimizing my query, it drops Select >>>>>>>>>>>>>>> statement's aliases. Can you help me to identify where the >>>>>>>>>>>>>>> problem is ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is my query >>>>>>>>>>>>>>> INFO: SQL: >>>>>>>>>>>>>>> WITH `tempTable` (`id`, `v`) AS (SELECT >>>>>>>>>>>>>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`, >>>>>>>>>>>>>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v` >>>>>>>>>>>>>>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT >>>>>>>>>>>>>>> `tempTable`.`id` AS `id`, `tempTable`.`v` AS `value` >>>>>>>>>>>>>>> FROM `tempTable` AS `tempTable` >>>>>>>>>>>>>>> WHERE `tempTable`.`v` <> '11') >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is Calcite Plan look at LogicalProject(id=[$0], >>>>>>>>>>>>>>> value=[$1]) in SQL plan. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jan 12, 2023 12:19:08 PM >>>>>>>>>>>>>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner >>>>>>>>>>>>>>> convertToBeamRel >>>>>>>>>>>>>>> INFO: SQLPlan> >>>>>>>>>>>>>>> LogicalProject(id=[$0], value=[$1]) >>>>>>>>>>>>>>> LogicalFilter(condition=[<>($1, '11')]) >>>>>>>>>>>>>>> LogicalProject(id=[$1.f_nestedInt], v=[$1.f_nestedString]) >>>>>>>>>>>>>>> BeamIOSourceRel(table=[[beam, PCOLLECTION]]) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> But Beam Plan does not have a LogicalProject(id=[$0], >>>>>>>>>>>>>>> value=[$1]) or similar. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jan 12, 2023 12:19:08 PM >>>>>>>>>>>>>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner >>>>>>>>>>>>>>> convertToBeamRel >>>>>>>>>>>>>>> INFO: BEAMPlan> >>>>>>>>>>>>>>> BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t1.f_nestedInt], >>>>>>>>>>>>>>> expr#3=[$t1.f_nestedString], expr#4=['11':VARCHAR], >>>>>>>>>>>>>>> expr#5=[<>($t3, $t4)], id=[$t2], v=[$t3], $condition=[$t5]) >>>>>>>>>>>>>>> BeamIOSourceRel(table=[[beam, PCOLLECTION]]) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks