stankiewicz opened a new pull request, #37535:
URL: https://github.com/apache/beam/pull/37535
# [BigQuery] Fix temporary table leakage during continuous dynamic reads
## ๐จ Problem
When using
[BigQueryIO](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:564:0-4638:1)
for continuous dynamic reads in unbounded streaming pipelines, temporary
tables created to hold query results were not being reliably cleaned up. The
original implementation either deleted the temporary table too early (before
parallel workers fully consumed the streams) or lacked a mechanism to track
stream completions across parallel workers altogether. As a result, unbounded
streaming jobs over time would leak BigQuery storage resources, leaving
orphaned temporary datasets and tables.
Global Window-based cleanup is insufficient for this unbounded streaming
scenario, requiring a more granular stream-tracking mechanism.
## ๐ ๏ธ Solution
This PR implements a robust, state-based cleanup mechanism that accurately
tracks when parallel workers finish reading from BigQuery Storage API streams,
deleting the temporary datasets and tables only when they are truly no longer
needed.
### Key Changes:
1. **Stateful Cleanup Tracking
([CleanupTempTableDoFn](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1168:2-1221:3))**:
- Introduced a stateful
[DoFn](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1168:2-1221:3)
that uses `ValueState` to track the total number of streams created for a
given query job, alongside a counter for how many streams have successfully
completed.
- When the completed stream count equals the total expected streams, the
DoFn safely drops the temporary BigQuery table (and the temporary dataset if it
was created by Beam).
2. **Stream Initialization & Side Outputs
([CreateBoundedSourceForTable](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:940:4-1042:5))**:
- Augmented the source creation step to emit a side output
`CleanupOperationMessage.initialize()` containing metadata
([CleanupInfo](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1102:2-1140:3)
with `projectId`, `datasetId`, `tableId`, and `totalStreams`).
3. **Completion Signaling
([ReadDynamicStreamSource](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1223:2-1250:3))**:
- Wrapped the underlying read operations. As each parallel stream is
fully consumed, it emits a `CleanupOperationMessage.streamComplete()` signal to
the cleanup
[DoFn](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1168:2-1221:3).
- Added a `.withFromQuery()` context toggle to
`BigQueryStorageStreamSource` so consumers know if the stream is bound to a
temporary query table that requires tracking.
4. **Serialization Safety**:
- Replaced the non-serializable Google API
[TableReference](cci:1://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1123:4-1131:5)
inside
[CleanupInfo](cci:2://file:///Users/radoslaws/repos/beam-jetski/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:1102:2-1140:3)
with primitive `String` fields to prevent `NotSerializableException`s during
state persistence and shuffling.
## ๐งช Testing
- **Added `CleanupTempTableDoFnTest`**: Built a dedicated unit test
utilizing `FakeDatasetService` and `FakeBigQueryServices`. It validates that:
- Sequential or out-of-order stream completion signals properly aggregate
in the stateful DoFn.
- The mock BigQuery API `deleteTable` / `deleteDataset` methods are
invoked exactly once after the final stream completes.
- Attempting to access the tables post-cleanup correctly results in a 404
Not Found exception.
- Verified that `spotlessApply` and `compileJava` pass locally without
format or syntax violations.
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example:
`addresses #123`), if applicable. This will automatically add a link to the
pull request in the issue. If you would like the issue to automatically close
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI or the [workflows
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md)
to see a list of phrases to trigger workflows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]