claudevdm commented on code in PR #37724: URL: https://github.com/apache/beam/pull/37724#discussion_r2875542677
########## sdks/python/apache_beam/io/gcp/bigquery_change_history.py: ########## @@ -0,0 +1,1187 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Streaming source for BigQuery change history (APPENDS/CHANGES functions). + +This module provides ``ReadBigQueryChangeHistory``, a streaming PTransform +that continuously polls BigQuery APPENDS() or CHANGES() functions and emits +changed rows as an unbounded PCollection. + +**Status: Experimental**: API may change without notice. + +Usage:: + + import apache_beam as beam + from apache_beam.io.gcp.bigquery_change_history import ReadBigQueryChangeHistory + + with beam.Pipeline(options=pipeline_options) as p: + changes = ( + p + | ReadBigQueryChangeHistory( + table='my-project:my_dataset.my_table', + change_function='APPENDS', + poll_interval_sec=60)) + +Architecture: + Poll: Polling SDF emits lightweight _QueryRange instructions. + Query: _ExecuteQueryFn runs the BQ query, writes to a temp table. + Read: SDF reads temp table via Storage Read API with dynamic splitting. + Cleanup: Stateful DoFn tracks stream completion, deletes temp tables. +""" + +import dataclasses +import datetime +import logging +import sys +import time +import uuid +from typing import Any +from typing import Dict +from typing import Iterable +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union + +import apache_beam as beam +from apache_beam.io.gcp import bigquery_tools +from apache_beam.io.gcp.internal.clients import bigquery +from apache_beam.io.iobase import WatermarkEstimator +from apache_beam.io.restriction_trackers import OffsetRange +from apache_beam.io.restriction_trackers import OffsetRestrictionTracker +from apache_beam.io.watermark_estimators import ManualWatermarkEstimator +from apache_beam.metrics import Metrics +from apache_beam.transforms.core import WatermarkEstimatorProvider +from apache_beam.transforms.window import TimestampedValue +from apache_beam.utils.timestamp import MAX_TIMESTAMP +from apache_beam.utils.timestamp import Timestamp + +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None # type: ignore + +try: + from google.cloud import bigquery_storage_v1 as bq_storage +except ImportError: + bq_storage = None # type: ignore + +try: + import pyarrow +except ImportError: + pyarrow = None # type: ignore + +_LOGGER = logging.getLogger(__name__) + +__all__ = ['ReadBigQueryChangeHistory'] + +# Max time range for CHANGES() queries: 1 day in seconds. +_MAX_CHANGES_RANGE_SEC = 86400 + +# Side output tag for cleanup signals between the Read SDF and Cleanup DoFn. +_CLEANUP_TAG = 'cleanup' + +# Default number of Storage Read API streams to request. +# Matches ReadFromBigQuery's MIN_SPLIT_COUNT to enable parallelism. +# The server may return fewer streams if the table is small. +_DEFAULT_MAX_STREAMS = 10 + +# Default table expiration for auto-created temp datasets: 24 hours in ms. +# Tables created in the dataset auto-expire after this duration if not +# explicitly deleted, acting as a safety net for orphaned temp tables +# (e.g. pipeline crash before cleanup runs). +_DEFAULT_TABLE_EXPIRATION_MS = 24 * 60 * 60 * 1000 + + [email protected] +class _QueryResult: + """Bridges the Query step (query execution) to the Read SDF. + + After _ExecuteQueryFn runs a CHANGES/APPENDS query, it emits a _QueryResult + pointing to the temp table containing query results. The Read SDF reads + rows from that temp table via the Storage Read API. + + range_start/range_end define the time window this query covers. + The Read SDF uses range_start to set an initial watermark hold so the runner + doesn't advance the watermark past the data's timestamps. + """ + temp_table_ref: 'bigquery.TableReference' + range_start: float + range_end: float + + [email protected] +class _PollConfig: + """Input element for the polling SDF. + + Only contains start_time, which _PollWatermarkEstimatorProvider uses + to initialize the watermark hold. All other config is passed via + _PollChangeHistoryFn.__init__. + """ + start_time: float + + [email protected] +class _QueryRange: + """Lightweight instruction emitted by the polling SDF. + + Contains only the time range to query. Static config (table, project, + etc.) is held by _ExecuteQueryFn which receives these after a Reshuffle + commit boundary, preventing duplicate queries on SDF re-dispatch. + """ + chunk_start: float + chunk_end: float + + +class _StreamRestriction: + """Restriction carrying BQ Storage stream names for cross-worker safety. + + Unlike a plain OffsetRange(0, N), this restriction is self-contained: + each split carries the actual stream name strings so it can be processed + on any worker. Composes an OffsetRange for offset logic. + """ + __slots__ = ('stream_names', 'range') + + def __init__( + self, stream_names: Tuple[str, ...], start: int, stop: int) -> None: + self.stream_names = stream_names # tuple of BQ stream name strings + self.range = OffsetRange(start, stop) + + @property + def start(self) -> int: + return self.range.start + + @property + def stop(self) -> int: + return self.range.stop + + def __eq__(self, other: object) -> bool: + if not isinstance(other, _StreamRestriction): + return False + return ( + self.stream_names == other.stream_names and self.range == other.range) + + def __hash__(self) -> int: + return hash((type(self), self.stream_names, self.range)) + + def __repr__(self) -> str: + return ( + '_StreamRestriction(streams=%d, start=%d, stop=%d)' % + (len(self.stream_names), self.start, self.stop)) + + def size(self) -> int: + return self.range.size() + + +class _StreamRestrictionTracker(beam.io.iobase.RestrictionTracker): + """Tracker for _StreamRestriction, delegating offset logic to + OffsetRestrictionTracker.""" + def __init__(self, restriction: _StreamRestriction) -> None: + self._stream_names = restriction.stream_names + self._offset_tracker = OffsetRestrictionTracker(restriction.range) + + def current_restriction(self) -> _StreamRestriction: + r = self._offset_tracker.current_restriction() + return _StreamRestriction(self._stream_names, r.start, r.stop) + + def try_claim(self, position: int) -> bool: + return self._offset_tracker.try_claim(position) + + def try_split( + self, fraction_of_remainder: float + ) -> Optional[Tuple[_StreamRestriction, _StreamRestriction]]: + result = self._offset_tracker.try_split(fraction_of_remainder) + if result is not None: + primary, residual = result + return ( + _StreamRestriction(self._stream_names, primary.start, primary.stop), + _StreamRestriction(self._stream_names, residual.start, residual.stop)) + return None + + def check_done(self) -> None: + self._offset_tracker.check_done() + + def current_progress(self): + return self._offset_tracker.current_progress() + + def is_bounded(self) -> bool: + return True + + +class _NonSplittableOffsetTracker(OffsetRestrictionTracker): + """OffsetRestrictionTracker that allows checkpointing but prevents splitting. + + Checkpointing (fraction=0) is required for defer_remainder(). All other + split fractions are refused, ensuring the polling SDF runs as a singleton. + """ + def try_split( + self, fraction_of_remainder: float + ) -> Optional[Tuple[OffsetRange, OffsetRange]]: + if fraction_of_remainder == 0: + return super().try_split(fraction_of_remainder) + return None + + +class _PollWatermarkEstimator(WatermarkEstimator): + """Watermark estimator that tracks both a watermark hold and poll cursor. + + The watermark hold (reported via current_watermark) is set to start_ts: + the earliest data timestamp emitted by the current poll. This prevents + downstream stages from seeing data as late. + + The poll cursor (last_end_ts) tracks where the next poll should start. + This is separate from the watermark so we can hold the watermark back + at start_ts while still advancing the poll cursor to end_ts. + + State is checkpointed as (watermark_hold, last_end_ts) so + both values survive SDF re-dispatch. + """ + def __init__(self, state: Tuple[Timestamp, float]) -> None: + # state is (watermark_hold: Timestamp, last_end_ts: float) + self._watermark_hold, self._last_end_ts = state + + def observe_timestamp(self, timestamp: Timestamp) -> None: + pass + + def current_watermark(self) -> Timestamp: + return self._watermark_hold + + def get_estimator_state(self) -> Tuple[Timestamp, float]: + return (self._watermark_hold, self._last_end_ts) + + def set_watermark(self, timestamp: Timestamp) -> None: + if not isinstance(timestamp, Timestamp): + raise ValueError('set_watermark expects a Timestamp as input') + if self._watermark_hold and self._watermark_hold > timestamp: + raise ValueError( + 'Watermark must be monotonically increasing. ' + 'Provided %s < current %s' % (timestamp, self._watermark_hold)) + self._watermark_hold = timestamp + + def advance_poll_cursor(self, end_ts: float) -> None: + """Record end_ts so the next poll starts from here.""" + self._last_end_ts = end_ts + + def poll_cursor(self) -> float: + """Return the start_ts for the next poll.""" + return self._last_end_ts + + +class _PollWatermarkEstimatorProvider(WatermarkEstimatorProvider): + """Provider for _PollWatermarkEstimator. + + Initializes with watermark hold at start_time and poll cursor at + start_time (first poll will query from start_time). + """ + def initial_estimator_state( + self, element: _PollConfig, + restriction: OffsetRange) -> Tuple[Timestamp, float]: + return (Timestamp(element.start_time), element.start_time) + + def create_watermark_estimator( + self, estimator_state: Tuple[Timestamp, + float]) -> _PollWatermarkEstimator: + return _PollWatermarkEstimator(estimator_state) + + +def _table_key(table_ref: 'bigquery.TableReference') -> str: + """Convert a TableReference to a 'project.dataset.table' string.""" + return f'{table_ref.projectId}.{table_ref.datasetId}.{table_ref.tableId}' + + +def build_changes_query( + table: str, + start_ts: float, + end_ts: float, + change_function: str, + change_type_column: str = 'change_type', + change_timestamp_column: str = 'change_timestamp', + columns: Optional[List[str]] = None, + row_filter: Optional[str] = None) -> str: + """Build a CHANGES() or APPENDS() SQL query. + + Args: + table: Table name as 'project.dataset.table' or 'project:dataset.table'. + start_ts: Start timestamp (float, seconds since epoch). Inclusive. + end_ts: End timestamp (float, seconds since epoch). Exclusive. + change_function: 'CHANGES' or 'APPENDS'. + change_type_column: Output column name for _CHANGE_TYPE pseudo-column. + change_timestamp_column: Output column name for _CHANGE_TIMESTAMP + pseudo-column. + columns: Optional list of column names to select. If None, selects all + columns. Pseudo-columns are always appended regardless. + row_filter: Optional SQL WHERE clause (without the WHERE keyword). + Applied after the CHANGES/APPENDS function. + + Returns: + SQL string. + """ + # Normalize 'project:dataset.table' to 'project.dataset.table' + table = table.replace(':', '.') + start_iso = datetime.datetime.fromtimestamp( + start_ts, tz=datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + end_iso = datetime.datetime.fromtimestamp( + end_ts, tz=datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + # Pseudo-columns (_CHANGE_TYPE, _CHANGE_TIMESTAMP) can't be written to + # destination tables with their original names. Rename them so they can + # be persisted to the temp table for Storage Read API reading. + pseudo = ( + f"_CHANGE_TYPE AS {change_type_column}, " + f"_CHANGE_TIMESTAMP AS {change_timestamp_column}") + if columns is None: + select = f"SELECT * EXCEPT(_CHANGE_TYPE, _CHANGE_TIMESTAMP), {pseudo}" + else: + select = f"SELECT {', '.join(columns)}, {pseudo}" + from_clause = ( + f"FROM {change_function}" + f"(TABLE `{table}`, " + f"TIMESTAMP '{start_iso}', " + f"TIMESTAMP '{end_iso}')") + where = f" WHERE {row_filter}" if row_filter else "" + return f"{select} {from_clause}{where}" + + +def compute_ranges(start_ts: float, end_ts: float, + change_function: str) -> List[Tuple[float, float]]: + """Split [start_ts, end_ts) into query-safe chunks. + + CHANGES() has a max 1-day range. APPENDS() has no limit. + + Args: + start_ts: Start timestamp (float, seconds since epoch). + end_ts: End timestamp (float, seconds since epoch). + change_function: 'CHANGES' or 'APPENDS'. + + Returns: + List of (start, end) float tuples. Empty if end_ts <= start_ts. + """ + if end_ts <= start_ts: + return [] + + if change_function != 'CHANGES': + return [(start_ts, end_ts)] + + # CHANGES: chunk into <=1-day ranges + ranges = [] + current = start_ts + while current < end_ts: + chunk_end = min(current + _MAX_CHANGES_RANGE_SEC, end_ts) + ranges.append((current, chunk_end)) + current = chunk_end + return ranges + + +def _utc(ts: Union[float, Timestamp]) -> str: + """Format an epoch-seconds float or Timestamp as a UTC string.""" + if isinstance(ts, Timestamp): + ts = ts.seconds() + return datetime.datetime.fromtimestamp( + ts, tz=datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S') + + +# ============================================================================= +# Poll: _PollChangeHistoryFn (Polling SDF) +# ============================================================================= + + +class _PollChangeHistoryFn(beam.DoFn, beam.transforms.core.RestrictionProvider): + """SDF that periodically emits _QueryRange instructions. + + Uses defer_remainder() for poll timing and _PollWatermarkEstimator to + control the watermark. The watermark is initially held at start_time , then + advanced to start_ts of each poll. + + Derives start_ts from the poll cursor. On each poll: + 1. start_ts = poll cursor (last end_ts, or start_time on first poll) + 2. end_ts = now - buffer_sec + 3. Computes query chunks, yields _QueryRange per chunk + 4. Advances poll cursor to end_ts (for next poll's start) + 5. Advances watermark to start_ts (earliest data in this poll) + 6. Defers to next poll interval + """ + def __init__( + self, + table: str, + project: str, + change_function: str, + buffer_sec: float, + start_time: float, + stop_time: Union[float, Timestamp], + poll_interval_sec: float, + location: Optional[str] = None) -> None: + self._table = table + self._project = project + self._change_function = change_function + self._buffer_sec = buffer_sec + self._start_time = start_time + self._stop_time = stop_time + self._poll_interval_sec = poll_interval_sec + self._location = location + + def initial_restriction(self, element: _PollConfig) -> OffsetRange: + return OffsetRange(0, sys.maxsize) + + def create_tracker( + self, restriction: OffsetRange) -> _NonSplittableOffsetTracker: + # Guarantee at least one poll cycle: restriction.start == 0 on the first + # invocation (from initial_restriction). After the first try_claim(0) + + # defer_remainder, subsequent invocations arrive with start >= 1. + if restriction.start > 0 and time.time() >= self._stop_time: + _LOGGER.info( + '[Poll] create_tracker: stop_time reached, ' + 'returning empty range to terminate SDF') + return _NonSplittableOffsetTracker( + OffsetRange(restriction.start, restriction.start)) + return _NonSplittableOffsetTracker(restriction) + + def restriction_size( + self, element: _PollConfig, restriction: OffsetRange) -> int: + return 1 + + def split(self, element: _PollConfig, + restriction: OffsetRange) -> Iterable[OffsetRange]: + yield restriction + + def truncate(self, element: _PollConfig, restriction: OffsetRange) -> None: + return None + + def _next_poll_time(self, start_ts: float, now: float) -> Optional[Timestamp]: + """Return a Timestamp to defer to, or None if we should poll now.""" + earliest = start_ts + self._buffer_sec + self._poll_interval_sec + if now < earliest: + return Timestamp.of(earliest) + return None + + def _emit_query_ranges( + self, + start_ts: float, + end_ts: float, + now: float, + watermark_estimator: _PollWatermarkEstimator) -> Iterable[_QueryRange]: + """Compute and yield _QueryRange elements, advancing estimator state.""" + ranges = compute_ranges(start_ts, end_ts, self._change_function) + _LOGGER.info( + '[Poll] %d chunks for [%s, %s)', + len(ranges), + _utc(start_ts), + _utc(end_ts)) + Metrics.counter('BigQueryChangeHistory', 'polls').inc() + + watermark_estimator.advance_poll_cursor(end_ts) + watermark_estimator.set_watermark(Timestamp(start_ts)) Review Comment: Correct. I will file a bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
