Beam High Priority Issue Report (40)

2023-01-24 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/24971 [Bug]: Messages are not published 
when a connection is closed with JmsIO
https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24655 [Bug]: Pipeline fusion should break 
at @RequiresStableInput boundary
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24367 [Bug]: workflow.tar.gz cannot be 
passed to flink runner
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/24267 [Failing Test]: Timeout waiting to 
lock gradle
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22961 [Bug]: WriteToBigQuery silently 
skips most of records without job fail
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21104 Flaky: 
apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/24953 [Bug]: Google LTS release for Beam 
2.45.0 to use LTS version of libraries-bom
https://github.com/apache/beam/issues/24464 [Epic]: Implement 
FileWriteSchemaTransformProvider
https://github.com/apache/beam/issues/23875 [Bug]: beam.Row.__eq__ returns true 
for unequal rows
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/22115 [Bug]: 
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
 is flaky
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21708 beam_PostCommit_Java_DataflowV2, 
testBigQueryStorageWrite30MProto failing consistently
https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit 
test action 
Sta

Adding configurable gRPC channels to External transform

2023-01-24 Thread Sahith Nallapareddy via dev
Hello,

I made a PR, https://github.com/apache/beam/pull/25151, to add configurable
gRPC channel to the ArtifactRetrievalService stub. We are hosting our
external transforms in different environments and we were trying to host
them in Google Cloud Run. This requires that the gRPC calls have to use
TransportSecurity and also include additional authorization in each call.
This was possible to implement for the Expand stub, but not the
ArtifactRetrieval stub. I am wondering who are the best people to request
reviews for this? This is my naive implementation and am very open to
change on how this should be implemented!

Thanks,

Sahith


Subscribe

2023-01-24 Thread Alan Zhang via dev



Re: Subscribe

2023-01-24 Thread Valentyn Tymofieiev via dev
Hello Alan,

To subscribe to the list, you should send an email to
dev-subscr...@beam.apache.org instead.

Best,
Valentyn

On Tue, Jan 24, 2023 at 5:19 PM Alan Zhang via dev 
wrote:

>


Re: Beam SQL Alias issue while using With Clause

2023-01-24 Thread Andrew Pilloud via dev
+dev@beam.apache.org 

I tried reproducing this but was not successful, the output schema was as
expected. I added the following to BeamSqlMultipleSchemasTest.java at head.
(I did discover that  PAssert.that(result).containsInAnyOrder(output)
doesn't validate column names however.)

  @Test
  public void testSelectAs() {
PCollection input = pipeline.apply(create(row(1, "strstr")));

PCollection result =
input.apply(SqlTransform.query("WITH tempTable (id, v) AS (SELECT
f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int, v AS
fout_string FROM tempTable"));

Schema output_schema =

Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
assertThat(result.getSchema(), equalTo(output_schema));

Row output = Row.withSchema(output_schema).addValues(1,
"strstr").build();
PAssert.that(result).containsInAnyOrder(output);
pipeline.run();
  }

On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer 
wrote:

> Hi Kenn,
>
> Thank you for replying back to my email.
>
> I was under the same impression about Calcite. But I wrote a test on
> Calcite 1.28 too. It is working without issue that I see on BEAM
>
> Here is my test case. If you want you can also run on Calcite. Please put
> under core/src/test/resources/sql as text file. and Run CoreQuidemTest
> class.
>
> !use post
> !set outputformat mysql
>
> #Test aliases with with clause
> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
> "hr"."emps"."name" as v from "hr"."emps")
> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
> tempTable.v <> '11' ;
> +-+---+
> | ID  | value |
> +-+---+
> | 100 | Bill  |
> | 110 | Theodore  |
> | 150 | Sebastian |
> | 200 | Eric  |
> +-+---+
> (4 rows)
>
> !ok
>
>
> On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles  wrote:
>
>> Looking at the code that turns a logical CalcRel into a BeamCalcRel I do
>> not see any obvious cause for this:
>> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
>> 
>>
>> I don't like to guess that upstream libraries have the bug, but in this
>> case I wonder if the alias is lost in the Calcite optimizer rule for
>> merging the projects and filters into a Calc.
>>
>> Kenn
>>
>> On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles  wrote:
>>
>>> I am not sure I understand the question, but I do see an issue.
>>>
>>> Context: "CalcRel" is an optimized relational operation that is somewhat
>>> like ParDo, with a small snippet of a single-assignment DSL embedded in it.
>>> Calcite will choose to merge all the projects and filters into the node,
>>> and then generates Java bytecode to directly execute the DSL.
>>>
>>> Problem: it looks like the CalcRel has output columns with aliases "id"
>>> and "v" where it should have output columns with aliases "id" and "value".
>>>
>>> Kenn
>>>
>>> On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay  wrote:
>>>
 Adding: @Andrew Pilloud  @Kenneth Knowles
 

 On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user <
 u...@beam.apache.org> wrote:

> Hi All,
>
> I am using Beam 2.43 with Calcite SQL with Java.
>
> I have a query with a WITH clause and some aliasing. Looks like Beam
> Query optimizer after optimizing my query, it drops Select statement's
> aliases. Can you help me to identify where the problem is ?
>
> This is my query
> INFO: SQL:
> WITH `tempTable` (`id`, `v`) AS (SELECT
> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`,
> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT `tempTable`.`id`
> AS `id`, `tempTable`.`v` AS `value`
> FROM `tempTable` AS `tempTable`
> WHERE `tempTable`.`v` <> '11')
>
> This is Calcite Plan look at LogicalProject(id=[$0], value=[$1]) in
> SQL plan.
>
> Jan 12, 2023 12:19:08 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner 
> convertToBeamRel
> INFO: SQLPlan>
> LogicalProject(id=[$0], value=[$1])
>   LogicalFilter(condition=[<>($1, '11')])
> LogicalProject(id=[$1.f_nestedInt], v=[$1.f_nestedString])
>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
> But Beam Plan does not have a LogicalProject(id=[$0], value=[$1]) or
> similar.
>
> Jan 12, 2023 12:19:08 PM
> org.apache.beam.sdk.extensions.sql

Re: Beam SQL Alias issue while using With Clause

2023-01-24 Thread Talat Uyarer via dev
Hi Andrew,

Thanks for writing a test for this use case. Without Where clause it works
as expected on our test cases also too. Please add where clause on second
select. With the below query it does not return column names. I tested on
my local also.

WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
id > 1

Thanks

On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud  wrote:

> +dev@beam.apache.org 
>
> I tried reproducing this but was not successful, the output schema was as
> expected. I added the following to BeamSqlMultipleSchemasTest.java at head.
> (I did discover that  PAssert.that(result).containsInAnyOrder(output)
> doesn't validate column names however.)
>
>   @Test
>   public void testSelectAs() {
> PCollection input = pipeline.apply(create(row(1, "strstr")));
>
> PCollection result =
> input.apply(SqlTransform.query("WITH tempTable (id, v) AS (SELECT
> f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int, v AS
> fout_string FROM tempTable"));
>
> Schema output_schema =
>
> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
> assertThat(result.getSchema(), equalTo(output_schema));
>
> Row output = Row.withSchema(output_schema).addValues(1,
> "strstr").build();
> PAssert.that(result).containsInAnyOrder(output);
> pipeline.run();
>   }
>
> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer 
> wrote:
>
>> Hi Kenn,
>>
>> Thank you for replying back to my email.
>>
>> I was under the same impression about Calcite. But I wrote a test on
>> Calcite 1.28 too. It is working without issue that I see on BEAM
>>
>> Here is my test case. If you want you can also run on Calcite. Please put
>> under core/src/test/resources/sql as text file. and Run CoreQuidemTest
>> class.
>>
>> !use post
>> !set outputformat mysql
>>
>> #Test aliases with with clause
>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>> "hr"."emps"."name" as v from "hr"."emps")
>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>> tempTable.v <> '11' ;
>> +-+---+
>> | ID  | value |
>> +-+---+
>> | 100 | Bill  |
>> | 110 | Theodore  |
>> | 150 | Sebastian |
>> | 200 | Eric  |
>> +-+---+
>> (4 rows)
>>
>> !ok
>>
>>
>> On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles  wrote:
>>
>>> Looking at the code that turns a logical CalcRel into a BeamCalcRel I do
>>> not see any obvious cause for this:
>>> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
>>> 
>>>
>>> I don't like to guess that upstream libraries have the bug, but in this
>>> case I wonder if the alias is lost in the Calcite optimizer rule for
>>> merging the projects and filters into a Calc.
>>>
>>> Kenn
>>>
>>> On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles 
>>> wrote:
>>>
 I am not sure I understand the question, but I do see an issue.

 Context: "CalcRel" is an optimized relational operation that is
 somewhat like ParDo, with a small snippet of a single-assignment DSL
 embedded in it. Calcite will choose to merge all the projects and filters
 into the node, and then generates Java bytecode to directly execute the 
 DSL.

 Problem: it looks like the CalcRel has output columns with aliases "id"
 and "v" where it should have output columns with aliases "id" and "value".

 Kenn

 On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay  wrote:

> Adding: @Andrew Pilloud  @Kenneth Knowles
> 
>
> On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user <
> u...@beam.apache.org> wrote:
>
>> Hi All,
>>
>> I am using Beam 2.43 with Calcite SQL with Java.
>>
>> I have a query with a WITH clause and some aliasing. Looks like Beam
>> Query optimizer after optimizing my query, it drops Select statement's
>> aliases. Can you help me to identify where the problem is ?
>>
>> This is my query
>> INFO: SQL:
>> WITH `tempTable` (`id`, `v`) AS (SELECT
>> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`,
>> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT `tempTable`.`id`
>> AS `id`, `tempTable`.`v` AS `value`
>> FROM `tempTable` AS `tempTable`
>> WHERE `tempTable`.`v` <> '11')
>>
>> This is Ca

Re: Subscribe

2023-01-24 Thread Alan Zhang via dev
Thanks, Valentyn! I realized that I made the mistake after I sent the email to 
this address right away. Now I am able to receive the latest emails from both 
user@ and dev@ maillists after correct the addresses .

Best,
Alan Zhang

From: Valentyn Tymofieiev 
Date: Tuesday, January 24, 2023 at 5:23 PM
To: dev@beam.apache.org , Alan Zhang 

Subject: Re: Subscribe
Hello Alan,

To subscribe to the list, you should send an email to 
dev-subscr...@beam.apache.org instead.

Best,
Valentyn

On Tue, Jan 24, 2023 at 5:19 PM Alan Zhang via dev 
mailto:dev@beam.apache.org>> wrote:


MapState/SetState(aka, MultimapUserState) are not fully supported in Beam portability framework?

2023-01-24 Thread Alan Zhang via dev
Hi everyone,

Why don’t we have some interfaces(e.g. MultimapUserStateHandler and 
MultimapUserStateHandlerFactory) for supporting MultimapUserState defined in 
the class 
StateRequestHandlers?
 Is this support on plan but not implement yet or there were some concerns, and 
we don’t want to support it? Or this class is not the right place to define 
these MultimapUserState related handler interfaces?

For example, for supporting the BagUserState, I saw this class defined two 
related interfaces 
BagUserStateHandler
 and 
BagUserStateHandlerFactory,
 and the runners(Samza/Flink/Spark) can have their own implementation(e.g. 
Samza’s 
SamzaStateRequestHandlers)
 for these interfaces to support ValueState, BagState and CombingState.

I saw the existing Fn Harness implementation is able to handle MapState and 
SetState by using 
FnApiStateAccessor,
 and build the right 
StateRequest/StateKey
 for them. So Beam Fn APIs can provide these interfaces to let each runner to 
integrate, then I would think MultimapUserState is fully supported in Beam 
portability framework.


A little bit introduction for myself:

This is Alan from Linkedin. We are building a new managed platform which is 
powered by Samza runner and Beam portability framework, and we wanted to let 
all Linkedin Beam use cases get benefit from this new portable architecture 
eventually.
But there are few feature gaps between classic Samza runner and portable Samza 
runner, the user state support is one of the gaps. The classic Samza runner 
support 5 major user state types: ValueState, BagState, CombingState, MapState 
and SetState, while the existing portable Samza runner only supports 
ValueState, BagState and CombingState. I’m trying to address this state feature 
gap now.



--
Best,
Alan Zhang