[Peer Review] Pull Request Eliminate Finalize in Read

2023-01-27 Thread Dejan Spasic
Hello team,

I posted a pull request[0] some time ago where I (we) need your support. In
general, it is about the elimination of the finalize methods or the
replacement of the finalize methods[1]. I am looking forward to your
advice.

Have a nice one!

[0] https://github.com/apache/beam/pull/24841
[1] https://github.com/apache/beam/issues/24181


Beam High Priority Issue Report (44)

2023-01-27 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/25189 [Failing Test]: Python PostCommit 
failing xlang tests
https://github.com/apache/beam/issues/25166 [Bug]: BigQueryIO with Storage 
Write API may fail with NullPointerException
https://github.com/apache/beam/issues/25140 [Bug]: GenerateSequence is broken 
on SDF
https://github.com/apache/beam/issues/24971 [Bug]: Messages are not published 
when a connection is closed with JmsIO
https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24655 [Bug]: Pipeline fusion should break 
at @RequiresStableInput boundary
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24367 [Bug]: workflow.tar.gz cannot be 
passed to flink runner
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/24267 [Failing Test]: Timeout waiting to 
lock gradle
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22961 [Bug]: WriteToBigQuery silently 
skips most of records without job fail
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21104 Flaky: 
apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/24953 [Bug]: Google LTS release for Beam 
2.45.0 to use LTS version of libraries-bom
https://github.com/apache/beam/issues/24464 [Epic]: Implement 
FileWriteSchemaTransformProvider
https://github.com/apache/beam/issues/23875 [Bug]: beam.Row.__eq__ returns true 
for unequal rows
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/22115 [Bug]: 
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
 is flaky
https://github.com/

Re: Beam SQL Alias issue while using With Clause

2023-01-27 Thread Talat Uyarer via dev
Hi Andrew,

Yes This aligned also with my debugging. In My Kenn's reply you can see a
sql test which I wrote in Calcite. Somehow Calcite does not have this issue
with the 1.28 version.

!use post
!set outputformat mysql

#Test aliases with with clause
WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id,
"hr"."emps"."name" as v from "hr"."emps")
SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE
tempTable.v <> '11' ;
+-+---+
| ID  | value |
+-+---+
| 100 | Bill  |
| 110 | Theodore  |
| 150 | Sebastian |
| 200 | Eric  |
+-+---+
(4 rows)

!ok


On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud  wrote:

> Yes, that worked.
>
> The issue does not occur if I disable all of the following planner rules:
> CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
> and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>
> All the rules share a common call to RexProgramBuilder.mergePrograms, so I
> suspect the problem lies there. I spent some time looking but wasn't able
> to find it by code inspection, it looks like this code path is doing the
> right thing with names. I'll spend some time tomorrow trying to reproduce
> this on pure Calcite.
>
> Andrew
>
>
> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer 
> wrote:
>
>> Hi Andrew,
>>
>> Thanks for writing a test for this use case. Without Where clause it
>> works as expected on our test cases also too. Please add where clause on
>> second select. With the below query it does not return column names. I
>> tested on my local also.
>>
>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
>> id > 1
>>
>> Thanks
>>
>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud 
>> wrote:
>>
>>> +dev@beam.apache.org 
>>>
>>> I tried reproducing this but was not successful, the output schema was
>>> as expected. I added the following to BeamSqlMultipleSchemasTest.java at
>>> head. (I did discover that  PAssert.that(result).containsInAnyOrder(output)
>>> doesn't validate column names however.)
>>>
>>>   @Test
>>>   public void testSelectAs() {
>>> PCollection input = pipeline.apply(create(row(1, "strstr")));
>>>
>>> PCollection result =
>>> input.apply(SqlTransform.query("WITH tempTable (id, v) AS
>>> (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int,
>>> v AS fout_string FROM tempTable"));
>>>
>>> Schema output_schema =
>>>
>>> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
>>> assertThat(result.getSchema(), equalTo(output_schema));
>>>
>>> Row output = Row.withSchema(output_schema).addValues(1,
>>> "strstr").build();
>>> PAssert.that(result).containsInAnyOrder(output);
>>> pipeline.run();
>>>   }
>>>
>>> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
 Hi Kenn,

 Thank you for replying back to my email.

 I was under the same impression about Calcite. But I wrote a test on
 Calcite 1.28 too. It is working without issue that I see on BEAM

 Here is my test case. If you want you can also run on Calcite. Please
 put under core/src/test/resources/sql as text file. and Run CoreQuidemTest
 class.

 !use post
 !set outputformat mysql

 #Test aliases with with clause
 WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
 "hr"."emps"."name" as v from "hr"."emps")
 SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
 tempTable.v <> '11' ;
 +-+---+
 | ID  | value |
 +-+---+
 | 100 | Bill  |
 | 110 | Theodore  |
 | 150 | Sebastian |
 | 200 | Eric  |
 +-+---+
 (4 rows)

 !ok


 On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles 
 wrote:

> Looking at the code that turns a logical CalcRel into a BeamCalcRel I
> do not see any obvious cause for this:
> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
> 
>
> I don't like to guess that upstream libraries have the bug, but in
> this case I wonder if the alias is lost in the Calcite optimizer rule for
> merging the projects and filters into a Calc.
>
> Kenn

Re: Beam SQL Alias issue while using With Clause

2023-01-27 Thread Andrew Pilloud via dev
Hi Talat,

I did get your test case running and added some logging to
RexProgramBuilder.mergePrograms. There is only one merge that occurs during
the test and it has an output type of RecordType(JavaType(int) ID,
JavaType(class java.lang.String) V). This does seem like the correct output
name but it doesn't match the final output name, so something is still
different than the Beam test case. I also modified mergePrograms to
purposely corrupt the output names, that did not cause the test to fail or
trip the 'assert mergedProg.getOutputRowType() ==
topProgram.getOutputRowType();' in mergePrograms. I could not find any
Calcite unit tests for RexProgramBuilder.mergePrograms or
CoreRules.CALC_MERGE rule so I think it is still probable that the problem
is in this area.

One minor issue I encountered. It took me a while to get your test case
running, it doesn't appear there are any calcite gradle rules to run
CoreQuidemTest and constructing the classpath manually was tedious. Did I
miss something?

I'm still working on this but I'm out today and Monday, it will probably be
Wednesday before I make any more progress.

Andrew

On Fri, Jan 27, 2023 at 10:40 AM Talat Uyarer 
wrote:

> Hi Andrew,
>
> Yes This aligned also with my debugging. In My Kenn's reply you can see a
> sql test which I wrote in Calcite. Somehow Calcite does not have this issue
> with the 1.28 version.
>
> !use post
> !set outputformat mysql
>
> #Test aliases with with clause
> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
> "hr"."emps"."name" as v from "hr"."emps")
> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
> tempTable.v <> '11' ;
> +-+---+
> | ID  | value |
> +-+---+
> | 100 | Bill  |
> | 110 | Theodore  |
> | 150 | Sebastian |
> | 200 | Eric  |
> +-+---+
> (4 rows)
>
> !ok
>
>
> On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud 
> wrote:
>
>> Yes, that worked.
>>
>> The issue does not occur if I disable all of the following planner rules:
>> CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
>> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
>> and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>>
>> All the rules share a common call to RexProgramBuilder.mergePrograms, so
>> I suspect the problem lies there. I spent some time looking but wasn't able
>> to find it by code inspection, it looks like this code path is doing the
>> right thing with names. I'll spend some time tomorrow trying to reproduce
>> this on pure Calcite.
>>
>> Andrew
>>
>>
>> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Hi Andrew,
>>>
>>> Thanks for writing a test for this use case. Without Where clause it
>>> works as expected on our test cases also too. Please add where clause on
>>> second select. With the below query it does not return column names. I
>>> tested on my local also.
>>>
>>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
>>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
>>> id > 1
>>>
>>> Thanks
>>>
>>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud 
>>> wrote:
>>>
 +dev@beam.apache.org 

 I tried reproducing this but was not successful, the output schema was
 as expected. I added the following to BeamSqlMultipleSchemasTest.java at
 head. (I did discover that  PAssert.that(result).containsInAnyOrder(output)
 doesn't validate column names however.)

   @Test
   public void testSelectAs() {
 PCollection input = pipeline.apply(create(row(1, "strstr")));

 PCollection result =
 input.apply(SqlTransform.query("WITH tempTable (id, v) AS
 (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int,
 v AS fout_string FROM tempTable"));

 Schema output_schema =

 Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
 assertThat(result.getSchema(), equalTo(output_schema));

 Row output = Row.withSchema(output_schema).addValues(1,
 "strstr").build();
 PAssert.that(result).containsInAnyOrder(output);
 pipeline.run();
   }

 On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer <
 tuya...@paloaltonetworks.com> wrote:

> Hi Kenn,
>
> Thank you for replying back to my email.
>
> I was under the same impression about Calcite. But I wrote a test on
> Calcite 1.28 too. It is working without issue that I see on BEAM
>
> Here is my test case. If you want you can also run on Calcite. Please
> put under core/src/test/resources/sql as text file. and Run CoreQuidemTest
> class.
>
> !use post
> !set outputformat mysql
>
> #Test aliases with with clause
> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
> "hr"."emps"."name" as v from "hr"."emps")
> SELECT tempTable.id as id, tempTable.v as 

Re: Beam SQL Alias issue while using With Clause

2023-01-27 Thread Andrew Pilloud via dev
Also this is at very least a Beam bug. You can file a Beam issue if you
want, otherwise I will when I get back.

Andrew

On Fri, Jan 27, 2023 at 11:27 AM Andrew Pilloud  wrote:

> Hi Talat,
>
> I did get your test case running and added some logging to
> RexProgramBuilder.mergePrograms. There is only one merge that occurs during
> the test and it has an output type of RecordType(JavaType(int) ID,
> JavaType(class java.lang.String) V). This does seem like the correct output
> name but it doesn't match the final output name, so something is still
> different than the Beam test case. I also modified mergePrograms to
> purposely corrupt the output names, that did not cause the test to fail or
> trip the 'assert mergedProg.getOutputRowType() ==
> topProgram.getOutputRowType();' in mergePrograms. I could not find any
> Calcite unit tests for RexProgramBuilder.mergePrograms or
> CoreRules.CALC_MERGE rule so I think it is still probable that the problem
> is in this area.
>
> One minor issue I encountered. It took me a while to get your test case
> running, it doesn't appear there are any calcite gradle rules to run
> CoreQuidemTest and constructing the classpath manually was tedious. Did I
> miss something?
>
> I'm still working on this but I'm out today and Monday, it will probably
> be Wednesday before I make any more progress.
>
> Andrew
>
> On Fri, Jan 27, 2023 at 10:40 AM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
>
>> Hi Andrew,
>>
>> Yes This aligned also with my debugging. In My Kenn's reply you can see a
>> sql test which I wrote in Calcite. Somehow Calcite does not have this issue
>> with the 1.28 version.
>>
>> !use post
>> !set outputformat mysql
>>
>> #Test aliases with with clause
>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>> "hr"."emps"."name" as v from "hr"."emps")
>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>> tempTable.v <> '11' ;
>> +-+---+
>> | ID  | value |
>> +-+---+
>> | 100 | Bill  |
>> | 110 | Theodore  |
>> | 150 | Sebastian |
>> | 200 | Eric  |
>> +-+---+
>> (4 rows)
>>
>> !ok
>>
>>
>> On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud 
>> wrote:
>>
>>> Yes, that worked.
>>>
>>> The issue does not occur if I disable all of the following planner
>>> rules: CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
>>> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
>>> and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>>>
>>> All the rules share a common call to RexProgramBuilder.mergePrograms, so
>>> I suspect the problem lies there. I spent some time looking but wasn't able
>>> to find it by code inspection, it looks like this code path is doing the
>>> right thing with names. I'll spend some time tomorrow trying to reproduce
>>> this on pure Calcite.
>>>
>>> Andrew
>>>
>>>
>>> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
 Hi Andrew,

 Thanks for writing a test for this use case. Without Where clause it
 works as expected on our test cases also too. Please add where clause on
 second select. With the below query it does not return column names. I
 tested on my local also.

 WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
 PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
 id > 1

 Thanks

 On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud 
 wrote:

> +dev@beam.apache.org 
>
> I tried reproducing this but was not successful, the output schema was
> as expected. I added the following to BeamSqlMultipleSchemasTest.java at
> head. (I did discover that  
> PAssert.that(result).containsInAnyOrder(output)
> doesn't validate column names however.)
>
>   @Test
>   public void testSelectAs() {
> PCollection input = pipeline.apply(create(row(1, "strstr")));
>
> PCollection result =
> input.apply(SqlTransform.query("WITH tempTable (id, v) AS
> (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS 
> fout_int,
> v AS fout_string FROM tempTable"));
>
> Schema output_schema =
>
> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
> assertThat(result.getSchema(), equalTo(output_schema));
>
> Row output = Row.withSchema(output_schema).addValues(1,
> "strstr").build();
> PAssert.that(result).containsInAnyOrder(output);
> pipeline.run();
>   }
>
> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
>
>> Hi Kenn,
>>
>> Thank you for replying back to my email.
>>
>> I was under the same impression about Calcite. But I wrote a test on
>> Calcite 1.28 too. It is working without issue that I see on BEAM
>>
>> Here is my test case. If you want you can also