Beam High Priority Issue Report (40)

2023-02-10 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/25140 [Bug]: GenerateSequence is broken 
on SDF
https://github.com/apache/beam/issues/24971 [Bug]: Messages are not published 
when a connection is closed with JmsIO
https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24655 [Bug]: Pipeline fusion should break 
at @RequiresStableInput boundary
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24367 [Bug]: workflow.tar.gz cannot be 
passed to flink runner
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/24267 [Failing Test]: Timeout waiting to 
lock gradle
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22961 [Bug]: WriteToBigQuery silently 
skips most of records without job fail
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21104 Flaky: 
apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19465 Explore possibilities to lower 
in-use IP address quota footprint.
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/23875 [Bug]: beam.Row.__eq__ returns true 
for unequal rows
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/22115 [Bug]: 
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
 is flaky
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21708 beam_PostCommit_Java_DataflowV2, 
testBigQueryStorageWrite30MProto failing consistently
https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit 
test action 
StatefulDoFnOnDirectRunne

Beam Website Feedback

2023-02-10 Thread Julian Ogando via dev
Hi,
I'm reading the documentation found in
https://beam.apache.org/documentation/pipelines/test-your-pipeline/

from apache_beam.testing.test_pipeline import TestPipelinefrom
apache_beam.testing.util import assert_thatfrom
apache_beam.testing.util import equal_to
class CountTest(unittest.TestCase):

  def test_count(self):
# Our static input data, which will make up the initial PCollection.
WORDS = [
  "hi", "there", "hi", "hi", "sue", "bob",
  "hi", "sue", "", "", "ZOW", "bob", ""
]
# Create a test pipeline.
with TestPipeline() as p:

  # Create an input PCollection.
  input = p | beam.Create(WORDS)

  # Apply the Count transform under test.
  output = input | beam.combiners.Count.PerElement()

  # Assert on the results.
  assert_that(
output,
equal_to([
("hi", 4),
("there", 1),
("sue", 2),
("bob", 2),
("", 3),
("ZOW", 1)]))


It's a little bit unclear how to run the unit test. If execute it with
"python3 test.py --runner DirectRunner" I get an error message:
class CountTest(unittest.TestCase):
NameError: name 'unittest' is not defined

Thanks!
-- 

Julian Ogando

Cloud Data Engineer

Latam Eng Delivery Center

+54 911 65715933

julianoga...@google.com


Re: Beam Website Feedback

2023-02-10 Thread Anand Inguva via dev
Some of the imports are missing.
```
# try importing

import unittest
import apache_beam as beam

# rest of the code.
```
try running with pytest as *pytest test.py *and it should work.

Thanks,
Anand

On Fri, Feb 10, 2023 at 10:09 AM Julian Ogando via dev 
wrote:

> Hi,
> I'm reading the documentation found in
> https://beam.apache.org/documentation/pipelines/test-your-pipeline/
>
> from apache_beam.testing.test_pipeline import TestPipelinefrom 
> apache_beam.testing.util import assert_thatfrom apache_beam.testing.util 
> import equal_to
> class CountTest(unittest.TestCase):
>
>   def test_count(self):
> # Our static input data, which will make up the initial PCollection.
> WORDS = [
>   "hi", "there", "hi", "hi", "sue", "bob",
>   "hi", "sue", "", "", "ZOW", "bob", ""
> ]
> # Create a test pipeline.
> with TestPipeline() as p:
>
>   # Create an input PCollection.
>   input = p | beam.Create(WORDS)
>
>   # Apply the Count transform under test.
>   output = input | beam.combiners.Count.PerElement()
>
>   # Assert on the results.
>   assert_that(
> output,
> equal_to([
> ("hi", 4),
> ("there", 1),
> ("sue", 2),
> ("bob", 2),
> ("", 3),
> ("ZOW", 1)]))
>
>
> It's a little bit unclear how to run the unit test. If execute it with
> "python3 test.py --runner DirectRunner" I get an error message:
> class CountTest(unittest.TestCase):
> NameError: name 'unittest' is not defined
>
> Thanks!
> --
>
> Julian Ogando
>
> Cloud Data Engineer
>
> Latam Eng Delivery Center
>
> +54 911 65715933
>
> julianoga...@google.com
>
>


Re: Beam Website Feedback

2023-02-10 Thread Bruno Volpato via dev
Hi Julian,

That's a good point. I think the focus of
https://beam.apache.org/documentation/pipelines/test-your-pipeline/ is
showing how to test Beam, but probably linking to working examples would be
helpful to reduce friction.

Related to your code, I believe you are missing 2 imports:

import unittest

import apache_beam as beam


And you should invoke the test either through:

python3 -m unittest test.py


It is possible to start tests using the given command "python3 test.py
--runner DirectRunner", but you'll need to add the following lines to your
code:

if __name__ == '__main__':

  unittest.main()



Best,
Bruno

On Fri, Feb 10, 2023 at 10:09 AM Julian Ogando via dev 
wrote:

> Hi,
> I'm reading the documentation found in
> https://beam.apache.org/documentation/pipelines/test-your-pipeline/
>
> from apache_beam.testing.test_pipeline import TestPipelinefrom 
> apache_beam.testing.util import assert_thatfrom apache_beam.testing.util 
> import equal_to
> class CountTest(unittest.TestCase):
>
>   def test_count(self):
> # Our static input data, which will make up the initial PCollection.
> WORDS = [
>   "hi", "there", "hi", "hi", "sue", "bob",
>   "hi", "sue", "", "", "ZOW", "bob", ""
> ]
> # Create a test pipeline.
> with TestPipeline() as p:
>
>   # Create an input PCollection.
>   input = p | beam.Create(WORDS)
>
>   # Apply the Count transform under test.
>   output = input | beam.combiners.Count.PerElement()
>
>   # Assert on the results.
>   assert_that(
> output,
> equal_to([
> ("hi", 4),
> ("there", 1),
> ("sue", 2),
> ("bob", 2),
> ("", 3),
> ("ZOW", 1)]))
>
>
> It's a little bit unclear how to run the unit test. If execute it with
> "python3 test.py --runner DirectRunner" I get an error message:
> class CountTest(unittest.TestCase):
> NameError: name 'unittest' is not defined
>
> Thanks!
> --
>
> Julian Ogando
>
> Cloud Data Engineer
>
> Latam Eng Delivery Center
>
> +54 911 65715933
>
> julianoga...@google.com
>
>


[VOTE] Release 2.45.0, Release Candidate #1

2023-02-10 Thread John Casey via dev
Hi everyone,
Please review and vote on the release candidate #3 for the version 2.45.0,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


Reviewers are encouraged to test their own use cases with the release
candidate, and vote +1 if no issues are found.

The complete staging area is available for your review, which includes:
* GitHub Release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2],
which is signed with the key with fingerprint 921F35F5EC5F5DDE [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "v2.45.0-RC1" [5],
* website pull request listing the release [6], the blog post [6], and
publishing the API reference manual [7].
* Java artifacts were built with Gradle GRADLE_VERSION and OpenJDK/Oracle
JDK JDK_VERSION.
* Python artifacts are deployed along with the source release to the
dist.apache.org [2] and PyPI[8].
* Go artifacts and documentation are available at pkg.go.dev [9]
* Validation sheet with a tab for 2.45.0release to help with validation
[10].
* Docker images published to Docker Hub [11].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

For guidelines on how to try the release in your projects, check out our
blog post at /blog/validate-beam-release/.

Thanks,
John Casey

[1] https://github.com/apache/beam/milestone/8
[2] https://dist.apache.org/repos/dist/dev/beam/2.45.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1293/
[5] https://github.com/apache/beam/tree/v2.45.0-RC1
[6] https://github.com/apache/beam/pull/25407
[7] https://github.com/apache/beam-site/pull/640
[8] https://pypi.org/project/apache-beam/2.45.0rc1/
[9]
https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.45.0-RC1/go/pkg/beam
[10]
https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=2030665842
[11] https://hub.docker.com/search?q=apache%2Fbeam&type=image


Re: Beam Website Feedback

2023-02-10 Thread Julian Ogando via dev
Hi Bruno,
Thanks for the quick response!
Yes, I already have the code running by reading the "unittest"
documentation, but it would be good to include those changes in the Beam
documentation :)

Thanks!
Julian

On Fri, Feb 10, 2023 at 12:28 PM Bruno Volpato  wrote:

>
> Hi Julian,
>
> That's a good point. I think the focus of
> https://beam.apache.org/documentation/pipelines/test-your-pipeline/ is
> showing how to test Beam, but probably linking to working examples would be
> helpful to reduce friction.
>
> Related to your code, I believe you are missing 2 imports:
>
> import unittest
>
> import apache_beam as beam
>
>
> And you should invoke the test either through:
>
> python3 -m unittest test.py
>
>
> It is possible to start tests using the given command "python3 test.py
> --runner DirectRunner", but you'll need to add the following lines to your
> code:
>
> if __name__ == '__main__':
>
>   unittest.main()
>
>
>
> Best,
> Bruno
>
> On Fri, Feb 10, 2023 at 10:09 AM Julian Ogando via dev <
> dev@beam.apache.org> wrote:
>
>> Hi,
>> I'm reading the documentation found in
>> https://beam.apache.org/documentation/pipelines/test-your-pipeline/
>>
>> from apache_beam.testing.test_pipeline import TestPipelinefrom 
>> apache_beam.testing.util import assert_thatfrom apache_beam.testing.util 
>> import equal_to
>> class CountTest(unittest.TestCase):
>>
>>   def test_count(self):
>> # Our static input data, which will make up the initial PCollection.
>> WORDS = [
>>   "hi", "there", "hi", "hi", "sue", "bob",
>>   "hi", "sue", "", "", "ZOW", "bob", ""
>> ]
>> # Create a test pipeline.
>> with TestPipeline() as p:
>>
>>   # Create an input PCollection.
>>   input = p | beam.Create(WORDS)
>>
>>   # Apply the Count transform under test.
>>   output = input | beam.combiners.Count.PerElement()
>>
>>   # Assert on the results.
>>   assert_that(
>> output,
>> equal_to([
>> ("hi", 4),
>> ("there", 1),
>> ("sue", 2),
>> ("bob", 2),
>> ("", 3),
>> ("ZOW", 1)]))
>>
>>
>> It's a little bit unclear how to run the unit test. If execute it with
>> "python3 test.py --runner DirectRunner" I get an error message:
>> class CountTest(unittest.TestCase):
>> NameError: name 'unittest' is not defined
>>
>> Thanks!
>> --
>>
>> Julian Ogando
>>
>> Cloud Data Engineer
>>
>> Latam Eng Delivery Center
>>
>> +54 911 65715933
>>
>> julianoga...@google.com
>>
>>

-- 

Julian Ogando

Cloud Data Engineer

Latam Eng Delivery Center

+54 911 65715933

julianoga...@google.com


Re: Beam Website Feedback

2023-02-10 Thread Danny McCormick via dev
I just added https://github.com/apache/beam/pull/25425 to do this

On Fri, Feb 10, 2023 at 11:42 AM Julian Ogando via dev 
wrote:

> Hi Bruno,
> Thanks for the quick response!
> Yes, I already have the code running by reading the "unittest"
> documentation, but it would be good to include those changes in the Beam
> documentation :)
>
> Thanks!
> Julian
>
> On Fri, Feb 10, 2023 at 12:28 PM Bruno Volpato 
> wrote:
>
>>
>> Hi Julian,
>>
>> That's a good point. I think the focus of
>> https://beam.apache.org/documentation/pipelines/test-your-pipeline/ is
>> showing how to test Beam, but probably linking to working examples would be
>> helpful to reduce friction.
>>
>> Related to your code, I believe you are missing 2 imports:
>>
>> import unittest
>>
>> import apache_beam as beam
>>
>>
>> And you should invoke the test either through:
>>
>> python3 -m unittest test.py
>>
>>
>> It is possible to start tests using the given command "python3 test.py
>> --runner DirectRunner", but you'll need to add the following lines to your
>> code:
>>
>> if __name__ == '__main__':
>>
>>   unittest.main()
>>
>>
>>
>> Best,
>> Bruno
>>
>> On Fri, Feb 10, 2023 at 10:09 AM Julian Ogando via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi,
>>> I'm reading the documentation found in
>>> https://beam.apache.org/documentation/pipelines/test-your-pipeline/
>>>
>>> from apache_beam.testing.test_pipeline import TestPipelinefrom 
>>> apache_beam.testing.util import assert_thatfrom apache_beam.testing.util 
>>> import equal_to
>>> class CountTest(unittest.TestCase):
>>>
>>>   def test_count(self):
>>> # Our static input data, which will make up the initial PCollection.
>>> WORDS = [
>>>   "hi", "there", "hi", "hi", "sue", "bob",
>>>   "hi", "sue", "", "", "ZOW", "bob", ""
>>> ]
>>> # Create a test pipeline.
>>> with TestPipeline() as p:
>>>
>>>   # Create an input PCollection.
>>>   input = p | beam.Create(WORDS)
>>>
>>>   # Apply the Count transform under test.
>>>   output = input | beam.combiners.Count.PerElement()
>>>
>>>   # Assert on the results.
>>>   assert_that(
>>> output,
>>> equal_to([
>>> ("hi", 4),
>>> ("there", 1),
>>> ("sue", 2),
>>> ("bob", 2),
>>> ("", 3),
>>> ("ZOW", 1)]))
>>>
>>>
>>> It's a little bit unclear how to run the unit test. If execute it with
>>> "python3 test.py --runner DirectRunner" I get an error message:
>>> class CountTest(unittest.TestCase):
>>> NameError: name 'unittest' is not defined
>>>
>>> Thanks!
>>> --
>>>
>>> Julian Ogando
>>>
>>> Cloud Data Engineer
>>>
>>> Latam Eng Delivery Center
>>>
>>> +54 911 65715933
>>>
>>> julianoga...@google.com
>>>
>>>
>
> --
>
> Julian Ogando
>
> Cloud Data Engineer
>
> Latam Eng Delivery Center
>
> +54 911 65715933
>
> julianoga...@google.com
>
>


Re: [VOTE] Release 2.45.0, Release Candidate #1

2023-02-10 Thread John Casey via dev
Addendum to above email.

Java artifacts were built with Gradle 7.5.1 and OpenJDK 1.8.0_362

On Fri, Feb 10, 2023 at 11:14 AM John Casey  wrote:

> Hi everyone,
> Please review and vote on the release candidate #3 for the version 2.45.0,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> Reviewers are encouraged to test their own use cases with the release
> candidate, and vote +1 if no issues are found.
>
> The complete staging area is available for your review, which includes:
> * GitHub Release notes [1],
> * the official Apache source release to be deployed to dist.apache.org
> [2], which is signed with the key with fingerprint 921F35F5EC5F5DDE [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "v2.45.0-RC1" [5],
> * website pull request listing the release [6], the blog post [6], and
> publishing the API reference manual [7].
> * Java artifacts were built with Gradle GRADLE_VERSION and OpenJDK/Oracle
> JDK JDK_VERSION.
> * Python artifacts are deployed along with the source release to the
> dist.apache.org [2] and PyPI[8].
> * Go artifacts and documentation are available at pkg.go.dev [9]
> * Validation sheet with a tab for 2.45.0release to help with validation
> [10].
> * Docker images published to Docker Hub [11].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> For guidelines on how to try the release in your projects, check out our
> blog post at /blog/validate-beam-release/.
>
> Thanks,
> John Casey
>
> [1] https://github.com/apache/beam/milestone/8
> [2] https://dist.apache.org/repos/dist/dev/beam/2.45.0/
> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> [4] https://repository.apache.org/content/repositories/orgapachebeam-1293/
> [5] https://github.com/apache/beam/tree/v2.45.0-RC1
> [6] https://github.com/apache/beam/pull/25407
> [7] https://github.com/apache/beam-site/pull/640
> [8] https://pypi.org/project/apache-beam/2.45.0rc1/
> [9]
> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.45.0-RC1/go/pkg/beam
> [10]
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=2030665842
> [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image
>


Re: [VOTE] Release 2.45.0, Release Candidate #1

2023-02-10 Thread Luke Cwik via dev
+1

Validated release artifact signatures and verified the Java Flink and Spark
quickstarts.

On Fri, Feb 10, 2023 at 9:27 AM John Casey via dev 
wrote:

> Addendum to above email.
>
> Java artifacts were built with Gradle 7.5.1 and OpenJDK 1.8.0_362
>
> On Fri, Feb 10, 2023 at 11:14 AM John Casey 
> wrote:
>
>> Hi everyone,
>> Please review and vote on the release candidate #3 for the version
>> 2.45.0, as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>>
>> Reviewers are encouraged to test their own use cases with the release
>> candidate, and vote +1 if no issues are found.
>>
>> The complete staging area is available for your review, which includes:
>> * GitHub Release notes [1],
>> * the official Apache source release to be deployed to dist.apache.org
>> [2], which is signed with the key with fingerprint 921F35F5EC5F5DDE [3],
>> * all artifacts to be deployed to the Maven Central Repository [4],
>> * source code tag "v2.45.0-RC1" [5],
>> * website pull request listing the release [6], the blog post [6], and
>> publishing the API reference manual [7].
>> * Java artifacts were built with Gradle GRADLE_VERSION and OpenJDK/Oracle
>> JDK JDK_VERSION.
>> * Python artifacts are deployed along with the source release to the
>> dist.apache.org [2] and PyPI[8].
>> * Go artifacts and documentation are available at pkg.go.dev [9]
>> * Validation sheet with a tab for 2.45.0release to help with validation
>> [10].
>> * Docker images published to Docker Hub [11].
>>
>> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>>
>> For guidelines on how to try the release in your projects, check out our
>> blog post at /blog/validate-beam-release/.
>>
>> Thanks,
>> John Casey
>>
>> [1] https://github.com/apache/beam/milestone/8
>> [2] https://dist.apache.org/repos/dist/dev/beam/2.45.0/
>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1293/
>> [5] https://github.com/apache/beam/tree/v2.45.0-RC1
>> [6] https://github.com/apache/beam/pull/25407
>> [7] https://github.com/apache/beam-site/pull/640
>> [8] https://pypi.org/project/apache-beam/2.45.0rc1/
>> [9]
>> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.45.0-RC1/go/pkg/beam
>> [10]
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=2030665842
>> [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image
>>
>


Re: A user-deployable Beam Transform Service

2023-02-10 Thread Robert Bradshaw via dev
Thanks. I added some comments to the doc.

On Mon, Feb 6, 2023 at 1:33 PM Chamikara Jayalath via dev
 wrote:
>
> Hi All,
>
> Beam PTransforms are currently primarily identified as operations in a 
> pipeline that perform specific tasks. PTransform implementations were 
> traditionally linked to specific Beam SDKs.
>
> With the advent of portability framework, multi-language pipelines, and 
> expansion services that can be used to build/expand and discover transforms, 
> we have an opportunity to make this more general and re-introduce Beam 
> PTransforms as computation units that can serve any use-case that needs to 
> discover or use Beam transforms. For example, any Beam SDK that runs a 
> pipeline using a portable Beam runner should be able to use a transform 
> offered through an expansion service irrespective of the implementation SDK 
> of the transform or the pipeline.
>
> I believe we can make such use-cases much easier to manage by introducing a 
> user-deployable service that encapsulates existing Beam expansion services in 
> the form of a Kubernetes cluster. The service will offer a single gRPC 
> endpoint and will include Beam expansion services developed in different 
> languages. Any Beam pipeline, irrespective of the pipeline SDK, should be 
> able to use any transform offered by the service.
>
> This will also offer a way to make multi-language pipeline execution, which 
> currently relies on locally downloaded large dependencies and locally started 
> expansion service processes, more robust.
>
> I have written a proposal for implementing such a service and it's available 
> at https://s.apache.org/beam-transform-service.
>
> Please take a look and let me know if you have any comments or questions.
>
> Thanks,
> Cham


Re: A user-deployable Beam Transform Service

2023-02-10 Thread Luke Cwik via dev
Seems like a useful thing to me and will make it easier for Beam users
overall.

On Fri, Feb 10, 2023 at 3:56 PM Robert Bradshaw via dev 
wrote:

> Thanks. I added some comments to the doc.
>
> On Mon, Feb 6, 2023 at 1:33 PM Chamikara Jayalath via dev
>  wrote:
> >
> > Hi All,
> >
> > Beam PTransforms are currently primarily identified as operations in a
> pipeline that perform specific tasks. PTransform implementations were
> traditionally linked to specific Beam SDKs.
> >
> > With the advent of portability framework, multi-language pipelines, and
> expansion services that can be used to build/expand and discover
> transforms, we have an opportunity to make this more general and
> re-introduce Beam PTransforms as computation units that can serve any
> use-case that needs to discover or use Beam transforms. For example, any
> Beam SDK that runs a pipeline using a portable Beam runner should be able
> to use a transform offered through an expansion service irrespective of the
> implementation SDK of the transform or the pipeline.
> >
> > I believe we can make such use-cases much easier to manage by
> introducing a user-deployable service that encapsulates existing Beam
> expansion services in the form of a Kubernetes cluster. The service will
> offer a single gRPC endpoint and will include Beam expansion services
> developed in different languages. Any Beam pipeline, irrespective of the
> pipeline SDK, should be able to use any transform offered by the service.
> >
> > This will also offer a way to make multi-language pipeline execution,
> which currently relies on locally downloaded large dependencies and locally
> started expansion service processes, more robust.
> >
> > I have written a proposal for implementing such a service and it's
> available at https://s.apache.org/beam-transform-service.
> >
> > Please take a look and let me know if you have any comments or questions.
> >
> > Thanks,
> > Cham
>


Re: Beam SQL Alias issue while using With Clause

2023-02-10 Thread Andrew Pilloud via dev
I have a test case that I believe should reproduce this on both head
and 2.43 but it ends up with a different logical plan. Can you provide your
input types?

We have a class of issues around compex types
https://github.com/apache/beam/issues/19009 I don't believe the
"LogicalFilter(condition=[=($2.name, 'User1')])" particularly "$2.name" is
something that works, in my test it seems that the planner has flattened
the complex input and reproduced a ROW at the output.

INFO: SQLPlan>
LogicalProject(col=[ROW($0, $1)], field=[$2])
  LogicalFilter(condition=[=($0, 'innerStr')])
LogicalProject(string_field=[$0.string_field],
long_field=[$0.long_field], field=[$1])
  BeamIOSourceRel(table=[[beam, basicRowTestTable]])

Feb 10, 2023 6:07:35 PM
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
INFO: BEAMPlan>
BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t0.string_field],
expr#3=[$t0.long_field], expr#4=[ROW($t2, $t3)],
expr#5=['innerStr':VARCHAR], expr#6=[=($t2, $t5)], col=[$t4], field=[$t1],
$condition=[$t6])
  BeamIOSourceRel(table=[[beam, basicRowTestTable]])

---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
@@ -127,8 +127,8 @@ public class BeamComplexTypeTest {
   .build()))
   .put(
   "basicRowTestTable",
-  TestBoundedTable.of(FieldType.row(innerRowSchema), "col")
-
 .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
+  TestBoundedTable.of(FieldType.row(innerRowSchema),
"col", FieldType.INT64, "field")
+
 .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(),
1L))
   .put(
   "rowWithArrayTestTable",
   TestBoundedTable.of(FieldType.row(rowWithArraySchema),
"col")
@@ -220,6 +220,21 @@ public class BeamComplexTypeTest {
 pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
   }

+  @Test
+  public void testBasicRowWhereField() {
+BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
+PCollection stream =
+BeamSqlRelUtils.toPCollection(
+pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM
basicRowTestTable WHERE basicRowTestTable.col.string_field = 'innerStr')
SELECT * FROM tempTable"));
+Schema outputSchema = Schema.builder().addRowField("col",
innerRowSchema).addInt64Field("field").build();
+PAssert.that(stream)
+.containsInAnyOrder(
+Row.withSchema(outputSchema)
+
 .addValues(Row.withSchema(innerRowSchema).addValues("innerStr",
1L).build(), 1L)
+.build());
+pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
   @Test
   public void testArrayConstructor() {
 BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);


On Fri, Feb 3, 2023 at 2:06 PM Talat Uyarer 
wrote:

> Hi Andrew,
>
> Thank you for your MR. I am parricated to help us to solve the issue. I
> rerun our tests and they are partially passing now with your fix.  However,
> there is one more issue with the WITH clause.
>
> When i run following query somehow beam lost type of column
>
> WITH tempTable AS (SELECT * FROM PCOLLECTION WHERE
> PCOLLECTION.`user_info`.`name` = 'User1') SELECT * FROM tempTable
>
> I havent test on Beam Master. I run with your latest patch on our code
> base. This is the output
>
> 14:00:30.095 [Test worker] INFO
>  o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL:
> WITH `tempTable` AS (SELECT `PCOLLECTION`.`id`, `PCOLLECTION`.`value`,
> `PCOLLECTION`.`user_info`
> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
> WHERE `PCOLLECTION`.`user_info`.`name` = 'User1') (SELECT
> `tempTable`.`id`, `tempTable`.`value`, `tempTable`.`user_info`
> FROM `tempTable` AS `tempTable`)
> 14:00:30.106 [Test worker] DEBUG
> o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting
> SqlNode to RelNode
> LogicalProject(id=[$0], value=[$1], user_info=[$2])
>   LogicalFilter(condition=[=($2.name, 'User1')])
> BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
> 14:00:30.107 [Test worker] DEBUG
> o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting
> SqlNode to RelNode
> LogicalProject(id=[$0], value=[$1], user_info=[$2])
>   LogicalFilter(condition=[=($2.name, 'User1')])
> BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
> 14:00:30.109 [Test worker] INFO
>  o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQLPlan>
> LogicalProject(id=[$0], value=[$1], user_info=[ROW($2)])
>   LogicalFilter(condition=[=($2.name, 'User1')])
> LogicalProject(id=[$0], value=[$1], name=[$2.name])
>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
> 14:00:30.173 [Test worker] DEBUG
> o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER =
> org.apache.beam.vendor.calcite.v1_28_