stankiewicz opened a new pull request, #37535:
URL: https://github.com/apache/beam/pull/37535

   # [BigQuery] Fix temporary table leakage during continuous dynamic reads
   
   ## ๐Ÿšจ Problem
   
   When using 
[BigQueryIO](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:564:0-4638:1)
 for continuous dynamic reads in unbounded streaming pipelines, temporary 
tables created to hold query results were not being reliably cleaned up. The 
original implementation either deleted the temporary table too early (before 
parallel workers fully consumed the streams) or lacked a mechanism to track 
stream completions across parallel workers altogether. As a result, unbounded 
streaming jobs over time would leak BigQuery storage resources, leaving 
orphaned temporary datasets and tables.
   
   Global Window-based cleanup is insufficient for this unbounded streaming 
scenario, requiring a more granular stream-tracking mechanism.
   
   ## ๐Ÿ› ๏ธ Solution
   
   This PR implements a robust, state-based cleanup mechanism that accurately 
tracks when parallel workers finish reading from BigQuery Storage API streams, 
deleting the temporary datasets and tables only when they are truly no longer 
needed.
   
   ### Key Changes:
   1. **Stateful Cleanup Tracking 
([CleanupTempTableDoFn](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1168:2-1221:3))**:
 
      - Introduced a stateful 
[DoFn](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1168:2-1221:3)
 that uses `ValueState` to track the total number of streams created for a 
given query job, alongside a counter for how many streams have successfully 
completed.
      - When the completed stream count equals the total expected streams, the 
DoFn safely drops the temporary BigQuery table (and the temporary dataset if it 
was created by Beam).
   2. **Stream Initialization & Side Outputs 
([CreateBoundedSourceForTable](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:940:4-1042:5))**:
 
      - Augmented the source creation step to emit a side output 
`CleanupOperationMessage.initialize()` containing metadata 
([CleanupInfo](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1102:2-1140:3)
 with `projectId`, `datasetId`, `tableId`, and `totalStreams`).
   3. **Completion Signaling 
([ReadDynamicStreamSource](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1223:2-1250:3))**:
 
      - Wrapped the underlying read operations. As each parallel stream is 
fully consumed, it emits a `CleanupOperationMessage.streamComplete()` signal to 
the cleanup 
[DoFn](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1168:2-1221:3).
      - Added a `.withFromQuery()` context toggle to 
`BigQueryStorageStreamSource` so consumers know if the stream is bound to a 
temporary query table that requires tracking. 
   4. **Serialization Safety**: 
      - Replaced the non-serializable Google API 
[TableReference](cci:1://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1123:4-1131:5)
 inside 
[CleanupInfo](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1102:2-1140:3)
 with primitive `String` fields to prevent `NotSerializableException`s during 
state persistence and shuffling.
   
   ## ๐Ÿงช Testing
   
   - **Added `CleanupTempTableDoFnTest`**: Built a dedicated unit test 
utilizing `FakeDatasetService` and `FakeBigQueryServices`. It validates that:
     - Sequential or out-of-order stream completion signals properly aggregate 
in the stateful DoFn.
     - The mock BigQuery API `deleteTable` / `deleteDataset` methods are 
invoked exactly once after the final stream completes.
     - Attempting to access the tables post-cleanup correctly results in a 404 
Not Found exception.
   - Verified that `spotlessApply` and `compileJava` pass locally without 
format or syntax violations.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/actions/workflows/build_wheels.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/actions/workflows/python_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/actions/workflows/java_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/actions/workflows/go_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to