Beam High Priority Issue Report (40)
This is your daily summary of Beam's current high priority issues that may need attention. See https://beam.apache.org/contribute/issue-priorities for the meaning and expectations around issue priorities. Unassigned P1 Issues: https://github.com/apache/beam/issues/25140 [Bug]: GenerateSequence is broken on SDF https://github.com/apache/beam/issues/24971 [Bug]: Messages are not published when a connection is closed with JmsIO https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK Harness ProcessBundleProgress https://github.com/apache/beam/issues/24655 [Bug]: Pipeline fusion should break at @RequiresStableInput boundary https://github.com/apache/beam/issues/24389 [Failing Test]: HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError ContainerFetchException https://github.com/apache/beam/issues/24367 [Bug]: workflow.tar.gz cannot be passed to flink runner https://github.com/apache/beam/issues/24313 [Flaky]: apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder https://github.com/apache/beam/issues/24267 [Failing Test]: Timeout waiting to lock gradle https://github.com/apache/beam/issues/23944 beam_PreCommit_Python_Cron regularily failing - test_pardo_large_input flaky https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle https://github.com/apache/beam/issues/22969 Discrepancy in behavior of `DoFn.process()` when `yield` is combined with `return` statement, or vice versa https://github.com/apache/beam/issues/22961 [Bug]: WriteToBigQuery silently skips most of records without job fail https://github.com/apache/beam/issues/22913 [Bug]: beam_PostCommit_Java_ValidatesRunner_Flink is flakes in org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output to Failed Inserts PCollection https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not raise exception for unsuccessful states. https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial (order 1000 elements) numpy input flakes in non-cython environment https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: Connection refused https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) failing: ParDoTest$TimestampTests/OnWindowExpirationTests https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not follow spec https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit data at GC time https://github.com/apache/beam/issues/21121 apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it flakey https://github.com/apache/beam/issues/21104 Flaky: apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers https://github.com/apache/beam/issues/20976 apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with grpc.FutureTimeoutError on SDK harness startup https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM on Flink https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit empty pane when it should https://github.com/apache/beam/issues/19814 Flink streaming flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful https://github.com/apache/beam/issues/19465 Explore possibilities to lower in-use IP address quota footprint. https://github.com/apache/beam/issues/19241 Python Dataflow integration tests should export the pipeline Job ID and console output to Jenkins Test Result section P1 Issues with no update in the last week: https://github.com/apache/beam/issues/23875 [Bug]: beam.Row.__eq__ returns true for unequal rows https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder will drop message id and orderingKey https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it https://github.com/apache/beam/issues/22115 [Bug]: apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky https://github.com/apache/beam/issues/21714 PulsarIOTest.testReadFromSimpleTopic is very flaky https://github.com/apache/beam/issues/21708 beam_PostCommit_Java_DataflowV2, testBigQueryStorageWrite30MProto failing consistently https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit test action StatefulDoFnOnDirectRunne
Beam Website Feedback
Hi, I'm reading the documentation found in https://beam.apache.org/documentation/pipelines/test-your-pipeline/ from apache_beam.testing.test_pipeline import TestPipelinefrom apache_beam.testing.util import assert_thatfrom apache_beam.testing.util import equal_to class CountTest(unittest.TestCase): def test_count(self): # Our static input data, which will make up the initial PCollection. WORDS = [ "hi", "there", "hi", "hi", "sue", "bob", "hi", "sue", "", "", "ZOW", "bob", "" ] # Create a test pipeline. with TestPipeline() as p: # Create an input PCollection. input = p | beam.Create(WORDS) # Apply the Count transform under test. output = input | beam.combiners.Count.PerElement() # Assert on the results. assert_that( output, equal_to([ ("hi", 4), ("there", 1), ("sue", 2), ("bob", 2), ("", 3), ("ZOW", 1)])) It's a little bit unclear how to run the unit test. If execute it with "python3 test.py --runner DirectRunner" I get an error message: class CountTest(unittest.TestCase): NameError: name 'unittest' is not defined Thanks! -- Julian Ogando Cloud Data Engineer Latam Eng Delivery Center +54 911 65715933 julianoga...@google.com
Re: Beam Website Feedback
Some of the imports are missing. ``` # try importing import unittest import apache_beam as beam # rest of the code. ``` try running with pytest as *pytest test.py *and it should work. Thanks, Anand On Fri, Feb 10, 2023 at 10:09 AM Julian Ogando via dev wrote: > Hi, > I'm reading the documentation found in > https://beam.apache.org/documentation/pipelines/test-your-pipeline/ > > from apache_beam.testing.test_pipeline import TestPipelinefrom > apache_beam.testing.util import assert_thatfrom apache_beam.testing.util > import equal_to > class CountTest(unittest.TestCase): > > def test_count(self): > # Our static input data, which will make up the initial PCollection. > WORDS = [ > "hi", "there", "hi", "hi", "sue", "bob", > "hi", "sue", "", "", "ZOW", "bob", "" > ] > # Create a test pipeline. > with TestPipeline() as p: > > # Create an input PCollection. > input = p | beam.Create(WORDS) > > # Apply the Count transform under test. > output = input | beam.combiners.Count.PerElement() > > # Assert on the results. > assert_that( > output, > equal_to([ > ("hi", 4), > ("there", 1), > ("sue", 2), > ("bob", 2), > ("", 3), > ("ZOW", 1)])) > > > It's a little bit unclear how to run the unit test. If execute it with > "python3 test.py --runner DirectRunner" I get an error message: > class CountTest(unittest.TestCase): > NameError: name 'unittest' is not defined > > Thanks! > -- > > Julian Ogando > > Cloud Data Engineer > > Latam Eng Delivery Center > > +54 911 65715933 > > julianoga...@google.com > >
Re: Beam Website Feedback
Hi Julian, That's a good point. I think the focus of https://beam.apache.org/documentation/pipelines/test-your-pipeline/ is showing how to test Beam, but probably linking to working examples would be helpful to reduce friction. Related to your code, I believe you are missing 2 imports: import unittest import apache_beam as beam And you should invoke the test either through: python3 -m unittest test.py It is possible to start tests using the given command "python3 test.py --runner DirectRunner", but you'll need to add the following lines to your code: if __name__ == '__main__': unittest.main() Best, Bruno On Fri, Feb 10, 2023 at 10:09 AM Julian Ogando via dev wrote: > Hi, > I'm reading the documentation found in > https://beam.apache.org/documentation/pipelines/test-your-pipeline/ > > from apache_beam.testing.test_pipeline import TestPipelinefrom > apache_beam.testing.util import assert_thatfrom apache_beam.testing.util > import equal_to > class CountTest(unittest.TestCase): > > def test_count(self): > # Our static input data, which will make up the initial PCollection. > WORDS = [ > "hi", "there", "hi", "hi", "sue", "bob", > "hi", "sue", "", "", "ZOW", "bob", "" > ] > # Create a test pipeline. > with TestPipeline() as p: > > # Create an input PCollection. > input = p | beam.Create(WORDS) > > # Apply the Count transform under test. > output = input | beam.combiners.Count.PerElement() > > # Assert on the results. > assert_that( > output, > equal_to([ > ("hi", 4), > ("there", 1), > ("sue", 2), > ("bob", 2), > ("", 3), > ("ZOW", 1)])) > > > It's a little bit unclear how to run the unit test. If execute it with > "python3 test.py --runner DirectRunner" I get an error message: > class CountTest(unittest.TestCase): > NameError: name 'unittest' is not defined > > Thanks! > -- > > Julian Ogando > > Cloud Data Engineer > > Latam Eng Delivery Center > > +54 911 65715933 > > julianoga...@google.com > >
[VOTE] Release 2.45.0, Release Candidate #1
Hi everyone, Please review and vote on the release candidate #3 for the version 2.45.0, as follows: [ ] +1, Approve the release [ ] -1, Do not approve the release (please provide specific comments) Reviewers are encouraged to test their own use cases with the release candidate, and vote +1 if no issues are found. The complete staging area is available for your review, which includes: * GitHub Release notes [1], * the official Apache source release to be deployed to dist.apache.org [2], which is signed with the key with fingerprint 921F35F5EC5F5DDE [3], * all artifacts to be deployed to the Maven Central Repository [4], * source code tag "v2.45.0-RC1" [5], * website pull request listing the release [6], the blog post [6], and publishing the API reference manual [7]. * Java artifacts were built with Gradle GRADLE_VERSION and OpenJDK/Oracle JDK JDK_VERSION. * Python artifacts are deployed along with the source release to the dist.apache.org [2] and PyPI[8]. * Go artifacts and documentation are available at pkg.go.dev [9] * Validation sheet with a tab for 2.45.0release to help with validation [10]. * Docker images published to Docker Hub [11]. The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. For guidelines on how to try the release in your projects, check out our blog post at /blog/validate-beam-release/. Thanks, John Casey [1] https://github.com/apache/beam/milestone/8 [2] https://dist.apache.org/repos/dist/dev/beam/2.45.0/ [3] https://dist.apache.org/repos/dist/release/beam/KEYS [4] https://repository.apache.org/content/repositories/orgapachebeam-1293/ [5] https://github.com/apache/beam/tree/v2.45.0-RC1 [6] https://github.com/apache/beam/pull/25407 [7] https://github.com/apache/beam-site/pull/640 [8] https://pypi.org/project/apache-beam/2.45.0rc1/ [9] https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.45.0-RC1/go/pkg/beam [10] https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=2030665842 [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image
Re: Beam Website Feedback
Hi Bruno, Thanks for the quick response! Yes, I already have the code running by reading the "unittest" documentation, but it would be good to include those changes in the Beam documentation :) Thanks! Julian On Fri, Feb 10, 2023 at 12:28 PM Bruno Volpato wrote: > > Hi Julian, > > That's a good point. I think the focus of > https://beam.apache.org/documentation/pipelines/test-your-pipeline/ is > showing how to test Beam, but probably linking to working examples would be > helpful to reduce friction. > > Related to your code, I believe you are missing 2 imports: > > import unittest > > import apache_beam as beam > > > And you should invoke the test either through: > > python3 -m unittest test.py > > > It is possible to start tests using the given command "python3 test.py > --runner DirectRunner", but you'll need to add the following lines to your > code: > > if __name__ == '__main__': > > unittest.main() > > > > Best, > Bruno > > On Fri, Feb 10, 2023 at 10:09 AM Julian Ogando via dev < > dev@beam.apache.org> wrote: > >> Hi, >> I'm reading the documentation found in >> https://beam.apache.org/documentation/pipelines/test-your-pipeline/ >> >> from apache_beam.testing.test_pipeline import TestPipelinefrom >> apache_beam.testing.util import assert_thatfrom apache_beam.testing.util >> import equal_to >> class CountTest(unittest.TestCase): >> >> def test_count(self): >> # Our static input data, which will make up the initial PCollection. >> WORDS = [ >> "hi", "there", "hi", "hi", "sue", "bob", >> "hi", "sue", "", "", "ZOW", "bob", "" >> ] >> # Create a test pipeline. >> with TestPipeline() as p: >> >> # Create an input PCollection. >> input = p | beam.Create(WORDS) >> >> # Apply the Count transform under test. >> output = input | beam.combiners.Count.PerElement() >> >> # Assert on the results. >> assert_that( >> output, >> equal_to([ >> ("hi", 4), >> ("there", 1), >> ("sue", 2), >> ("bob", 2), >> ("", 3), >> ("ZOW", 1)])) >> >> >> It's a little bit unclear how to run the unit test. If execute it with >> "python3 test.py --runner DirectRunner" I get an error message: >> class CountTest(unittest.TestCase): >> NameError: name 'unittest' is not defined >> >> Thanks! >> -- >> >> Julian Ogando >> >> Cloud Data Engineer >> >> Latam Eng Delivery Center >> >> +54 911 65715933 >> >> julianoga...@google.com >> >> -- Julian Ogando Cloud Data Engineer Latam Eng Delivery Center +54 911 65715933 julianoga...@google.com
Re: Beam Website Feedback
I just added https://github.com/apache/beam/pull/25425 to do this On Fri, Feb 10, 2023 at 11:42 AM Julian Ogando via dev wrote: > Hi Bruno, > Thanks for the quick response! > Yes, I already have the code running by reading the "unittest" > documentation, but it would be good to include those changes in the Beam > documentation :) > > Thanks! > Julian > > On Fri, Feb 10, 2023 at 12:28 PM Bruno Volpato > wrote: > >> >> Hi Julian, >> >> That's a good point. I think the focus of >> https://beam.apache.org/documentation/pipelines/test-your-pipeline/ is >> showing how to test Beam, but probably linking to working examples would be >> helpful to reduce friction. >> >> Related to your code, I believe you are missing 2 imports: >> >> import unittest >> >> import apache_beam as beam >> >> >> And you should invoke the test either through: >> >> python3 -m unittest test.py >> >> >> It is possible to start tests using the given command "python3 test.py >> --runner DirectRunner", but you'll need to add the following lines to your >> code: >> >> if __name__ == '__main__': >> >> unittest.main() >> >> >> >> Best, >> Bruno >> >> On Fri, Feb 10, 2023 at 10:09 AM Julian Ogando via dev < >> dev@beam.apache.org> wrote: >> >>> Hi, >>> I'm reading the documentation found in >>> https://beam.apache.org/documentation/pipelines/test-your-pipeline/ >>> >>> from apache_beam.testing.test_pipeline import TestPipelinefrom >>> apache_beam.testing.util import assert_thatfrom apache_beam.testing.util >>> import equal_to >>> class CountTest(unittest.TestCase): >>> >>> def test_count(self): >>> # Our static input data, which will make up the initial PCollection. >>> WORDS = [ >>> "hi", "there", "hi", "hi", "sue", "bob", >>> "hi", "sue", "", "", "ZOW", "bob", "" >>> ] >>> # Create a test pipeline. >>> with TestPipeline() as p: >>> >>> # Create an input PCollection. >>> input = p | beam.Create(WORDS) >>> >>> # Apply the Count transform under test. >>> output = input | beam.combiners.Count.PerElement() >>> >>> # Assert on the results. >>> assert_that( >>> output, >>> equal_to([ >>> ("hi", 4), >>> ("there", 1), >>> ("sue", 2), >>> ("bob", 2), >>> ("", 3), >>> ("ZOW", 1)])) >>> >>> >>> It's a little bit unclear how to run the unit test. If execute it with >>> "python3 test.py --runner DirectRunner" I get an error message: >>> class CountTest(unittest.TestCase): >>> NameError: name 'unittest' is not defined >>> >>> Thanks! >>> -- >>> >>> Julian Ogando >>> >>> Cloud Data Engineer >>> >>> Latam Eng Delivery Center >>> >>> +54 911 65715933 >>> >>> julianoga...@google.com >>> >>> > > -- > > Julian Ogando > > Cloud Data Engineer > > Latam Eng Delivery Center > > +54 911 65715933 > > julianoga...@google.com > >
Re: [VOTE] Release 2.45.0, Release Candidate #1
Addendum to above email. Java artifacts were built with Gradle 7.5.1 and OpenJDK 1.8.0_362 On Fri, Feb 10, 2023 at 11:14 AM John Casey wrote: > Hi everyone, > Please review and vote on the release candidate #3 for the version 2.45.0, > as follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > > > Reviewers are encouraged to test their own use cases with the release > candidate, and vote +1 if no issues are found. > > The complete staging area is available for your review, which includes: > * GitHub Release notes [1], > * the official Apache source release to be deployed to dist.apache.org > [2], which is signed with the key with fingerprint 921F35F5EC5F5DDE [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag "v2.45.0-RC1" [5], > * website pull request listing the release [6], the blog post [6], and > publishing the API reference manual [7]. > * Java artifacts were built with Gradle GRADLE_VERSION and OpenJDK/Oracle > JDK JDK_VERSION. > * Python artifacts are deployed along with the source release to the > dist.apache.org [2] and PyPI[8]. > * Go artifacts and documentation are available at pkg.go.dev [9] > * Validation sheet with a tab for 2.45.0release to help with validation > [10]. > * Docker images published to Docker Hub [11]. > > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > > For guidelines on how to try the release in your projects, check out our > blog post at /blog/validate-beam-release/. > > Thanks, > John Casey > > [1] https://github.com/apache/beam/milestone/8 > [2] https://dist.apache.org/repos/dist/dev/beam/2.45.0/ > [3] https://dist.apache.org/repos/dist/release/beam/KEYS > [4] https://repository.apache.org/content/repositories/orgapachebeam-1293/ > [5] https://github.com/apache/beam/tree/v2.45.0-RC1 > [6] https://github.com/apache/beam/pull/25407 > [7] https://github.com/apache/beam-site/pull/640 > [8] https://pypi.org/project/apache-beam/2.45.0rc1/ > [9] > https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.45.0-RC1/go/pkg/beam > [10] > https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=2030665842 > [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image >
Re: [VOTE] Release 2.45.0, Release Candidate #1
+1 Validated release artifact signatures and verified the Java Flink and Spark quickstarts. On Fri, Feb 10, 2023 at 9:27 AM John Casey via dev wrote: > Addendum to above email. > > Java artifacts were built with Gradle 7.5.1 and OpenJDK 1.8.0_362 > > On Fri, Feb 10, 2023 at 11:14 AM John Casey > wrote: > >> Hi everyone, >> Please review and vote on the release candidate #3 for the version >> 2.45.0, as follows: >> [ ] +1, Approve the release >> [ ] -1, Do not approve the release (please provide specific comments) >> >> >> Reviewers are encouraged to test their own use cases with the release >> candidate, and vote +1 if no issues are found. >> >> The complete staging area is available for your review, which includes: >> * GitHub Release notes [1], >> * the official Apache source release to be deployed to dist.apache.org >> [2], which is signed with the key with fingerprint 921F35F5EC5F5DDE [3], >> * all artifacts to be deployed to the Maven Central Repository [4], >> * source code tag "v2.45.0-RC1" [5], >> * website pull request listing the release [6], the blog post [6], and >> publishing the API reference manual [7]. >> * Java artifacts were built with Gradle GRADLE_VERSION and OpenJDK/Oracle >> JDK JDK_VERSION. >> * Python artifacts are deployed along with the source release to the >> dist.apache.org [2] and PyPI[8]. >> * Go artifacts and documentation are available at pkg.go.dev [9] >> * Validation sheet with a tab for 2.45.0release to help with validation >> [10]. >> * Docker images published to Docker Hub [11]. >> >> The vote will be open for at least 72 hours. It is adopted by majority >> approval, with at least 3 PMC affirmative votes. >> >> For guidelines on how to try the release in your projects, check out our >> blog post at /blog/validate-beam-release/. >> >> Thanks, >> John Casey >> >> [1] https://github.com/apache/beam/milestone/8 >> [2] https://dist.apache.org/repos/dist/dev/beam/2.45.0/ >> [3] https://dist.apache.org/repos/dist/release/beam/KEYS >> [4] >> https://repository.apache.org/content/repositories/orgapachebeam-1293/ >> [5] https://github.com/apache/beam/tree/v2.45.0-RC1 >> [6] https://github.com/apache/beam/pull/25407 >> [7] https://github.com/apache/beam-site/pull/640 >> [8] https://pypi.org/project/apache-beam/2.45.0rc1/ >> [9] >> https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.45.0-RC1/go/pkg/beam >> [10] >> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=2030665842 >> [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image >> >
Re: A user-deployable Beam Transform Service
Thanks. I added some comments to the doc. On Mon, Feb 6, 2023 at 1:33 PM Chamikara Jayalath via dev wrote: > > Hi All, > > Beam PTransforms are currently primarily identified as operations in a > pipeline that perform specific tasks. PTransform implementations were > traditionally linked to specific Beam SDKs. > > With the advent of portability framework, multi-language pipelines, and > expansion services that can be used to build/expand and discover transforms, > we have an opportunity to make this more general and re-introduce Beam > PTransforms as computation units that can serve any use-case that needs to > discover or use Beam transforms. For example, any Beam SDK that runs a > pipeline using a portable Beam runner should be able to use a transform > offered through an expansion service irrespective of the implementation SDK > of the transform or the pipeline. > > I believe we can make such use-cases much easier to manage by introducing a > user-deployable service that encapsulates existing Beam expansion services in > the form of a Kubernetes cluster. The service will offer a single gRPC > endpoint and will include Beam expansion services developed in different > languages. Any Beam pipeline, irrespective of the pipeline SDK, should be > able to use any transform offered by the service. > > This will also offer a way to make multi-language pipeline execution, which > currently relies on locally downloaded large dependencies and locally started > expansion service processes, more robust. > > I have written a proposal for implementing such a service and it's available > at https://s.apache.org/beam-transform-service. > > Please take a look and let me know if you have any comments or questions. > > Thanks, > Cham
Re: A user-deployable Beam Transform Service
Seems like a useful thing to me and will make it easier for Beam users overall. On Fri, Feb 10, 2023 at 3:56 PM Robert Bradshaw via dev wrote: > Thanks. I added some comments to the doc. > > On Mon, Feb 6, 2023 at 1:33 PM Chamikara Jayalath via dev > wrote: > > > > Hi All, > > > > Beam PTransforms are currently primarily identified as operations in a > pipeline that perform specific tasks. PTransform implementations were > traditionally linked to specific Beam SDKs. > > > > With the advent of portability framework, multi-language pipelines, and > expansion services that can be used to build/expand and discover > transforms, we have an opportunity to make this more general and > re-introduce Beam PTransforms as computation units that can serve any > use-case that needs to discover or use Beam transforms. For example, any > Beam SDK that runs a pipeline using a portable Beam runner should be able > to use a transform offered through an expansion service irrespective of the > implementation SDK of the transform or the pipeline. > > > > I believe we can make such use-cases much easier to manage by > introducing a user-deployable service that encapsulates existing Beam > expansion services in the form of a Kubernetes cluster. The service will > offer a single gRPC endpoint and will include Beam expansion services > developed in different languages. Any Beam pipeline, irrespective of the > pipeline SDK, should be able to use any transform offered by the service. > > > > This will also offer a way to make multi-language pipeline execution, > which currently relies on locally downloaded large dependencies and locally > started expansion service processes, more robust. > > > > I have written a proposal for implementing such a service and it's > available at https://s.apache.org/beam-transform-service. > > > > Please take a look and let me know if you have any comments or questions. > > > > Thanks, > > Cham >
Re: Beam SQL Alias issue while using With Clause
I have a test case that I believe should reproduce this on both head and 2.43 but it ends up with a different logical plan. Can you provide your input types? We have a class of issues around compex types https://github.com/apache/beam/issues/19009 I don't believe the "LogicalFilter(condition=[=($2.name, 'User1')])" particularly "$2.name" is something that works, in my test it seems that the planner has flattened the complex input and reproduced a ROW at the output. INFO: SQLPlan> LogicalProject(col=[ROW($0, $1)], field=[$2]) LogicalFilter(condition=[=($0, 'innerStr')]) LogicalProject(string_field=[$0.string_field], long_field=[$0.long_field], field=[$1]) BeamIOSourceRel(table=[[beam, basicRowTestTable]]) Feb 10, 2023 6:07:35 PM org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel INFO: BEAMPlan> BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t0.string_field], expr#3=[$t0.long_field], expr#4=[ROW($t2, $t3)], expr#5=['innerStr':VARCHAR], expr#6=[=($t2, $t5)], col=[$t4], field=[$t1], $condition=[$t6]) BeamIOSourceRel(table=[[beam, basicRowTestTable]]) --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java @@ -127,8 +127,8 @@ public class BeamComplexTypeTest { .build())) .put( "basicRowTestTable", - TestBoundedTable.of(FieldType.row(innerRowSchema), "col") - .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build())) + TestBoundedTable.of(FieldType.row(innerRowSchema), "col", FieldType.INT64, "field") + .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), 1L)) .put( "rowWithArrayTestTable", TestBoundedTable.of(FieldType.row(rowWithArraySchema), "col") @@ -220,6 +220,21 @@ public class BeamComplexTypeTest { pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); } + @Test + public void testBasicRowWhereField() { +BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider); +PCollection stream = +BeamSqlRelUtils.toPCollection( +pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM basicRowTestTable WHERE basicRowTestTable.col.string_field = 'innerStr') SELECT * FROM tempTable")); +Schema outputSchema = Schema.builder().addRowField("col", innerRowSchema).addInt64Field("field").build(); +PAssert.that(stream) +.containsInAnyOrder( +Row.withSchema(outputSchema) + .addValues(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(), 1L) +.build()); +pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + @Test public void testArrayConstructor() { BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider); On Fri, Feb 3, 2023 at 2:06 PM Talat Uyarer wrote: > Hi Andrew, > > Thank you for your MR. I am parricated to help us to solve the issue. I > rerun our tests and they are partially passing now with your fix. However, > there is one more issue with the WITH clause. > > When i run following query somehow beam lost type of column > > WITH tempTable AS (SELECT * FROM PCOLLECTION WHERE > PCOLLECTION.`user_info`.`name` = 'User1') SELECT * FROM tempTable > > I havent test on Beam Master. I run with your latest patch on our code > base. This is the output > > 14:00:30.095 [Test worker] INFO > o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL: > WITH `tempTable` AS (SELECT `PCOLLECTION`.`id`, `PCOLLECTION`.`value`, > `PCOLLECTION`.`user_info` > FROM `beam`.`PCOLLECTION` AS `PCOLLECTION` > WHERE `PCOLLECTION`.`user_info`.`name` = 'User1') (SELECT > `tempTable`.`id`, `tempTable`.`value`, `tempTable`.`user_info` > FROM `tempTable` AS `tempTable`) > 14:00:30.106 [Test worker] DEBUG > o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting > SqlNode to RelNode > LogicalProject(id=[$0], value=[$1], user_info=[$2]) > LogicalFilter(condition=[=($2.name, 'User1')]) > BeamIOSourceRel(table=[[beam, PCOLLECTION]]) > > 14:00:30.107 [Test worker] DEBUG > o.a.b.v.calcite.v1_28_0.org.apache.calcite.sql2rel - Plan after converting > SqlNode to RelNode > LogicalProject(id=[$0], value=[$1], user_info=[$2]) > LogicalFilter(condition=[=($2.name, 'User1')]) > BeamIOSourceRel(table=[[beam, PCOLLECTION]]) > > 14:00:30.109 [Test worker] INFO > o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQLPlan> > LogicalProject(id=[$0], value=[$1], user_info=[ROW($2)]) > LogicalFilter(condition=[=($2.name, 'User1')]) > LogicalProject(id=[$0], value=[$1], name=[$2.name]) > BeamIOSourceRel(table=[[beam, PCOLLECTION]]) > > 14:00:30.173 [Test worker] DEBUG > o.a.b.v.c.v.org.apache.calcite.plan.RelOptPlanner - PLANNER = > org.apache.beam.vendor.calcite.v1_28_