The code works without the custom image of TensorRT, I will only get this: ject to benefit from faster worker startup and autoscaling. If you experience container-startup related issues, pass the "disable_image_streaming" experiment to disable image streaming for the job. INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:10.327Z: JOB_MESSAGE_BASIC: Worker configuration: g2-standard-4 in us-east4-c. INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.480Z: JOB_MESSAGE_BASIC: Executing operation [10]: Create URIs/Impulse+[10]: Create URIs/FlatMap(<lambda at core.py:3994>)+[10]: Create URIs/Map(decode)+[10]: Download PDFs+[10]: Load PDF Pages+[10]: Preprocess Images+[10]: Run Inference/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)+[10]: Run Inference/BeamML_RunInference INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.543Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-east4... INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2025-09-16_17_52_06-10817935125972705087 is in state JOB_STATE_RUNNING WARNING:google_auth_httplib2:httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance. WARNING:google_auth_httplib2:httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance. WARNING:google_auth_httplib2:httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance. WARNING:google_auth_httplib2:httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance.
WARNING:google_auth_httplib2:httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance. just recurring and after an hour this message pops up Workflow failed. Causes: The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors#error-syncing-pod. You can also get help with Cloud Dataflow at https://cloud.google.com/dataflow/support. On Tue, 16 Sept 2025 at 21:35, XQ Hu <[email protected]> wrote: > Does the code work without using TensorRT? Any logs? > > On Tue, Sep 16, 2025 at 9:28 PM Sai Shashank <[email protected]> > wrote: > >> import apache_beam as beam >> from apache_beam.ml.inference.tensorrt_inference import >> TensorRTEngineHandlerNumPy >> from apache_beam.ml.inference.base import RunInference >> >> #!/usr/bin/env python3 >> """ >> Apache Beam pipeline for processing PDFs with Triton server and saving >> results to BigQuery. >> This pipeline combines functionality from test_triton_document.py, >> create_bigquery_tables.py, >> and save_to_bigquery.py into a single workflow. >> """ >> >> import os >> import sys >> import json >> import uuid >> import argparse >> import logging >> import tempfile >> import datetime >> import requests >> import numpy as np >> import cv2 >> from PIL import Image >> import fitz # PyMuPDF >> from pathlib import Path >> from typing import Dict, List, Tuple, Any, Optional, Iterator >> >> # Apache Beam imports >> import apache_beam as beam >> from apache_beam.options.pipeline_options import PipelineOptions, >> SetupOptions >> from apache_beam.ml.inference.base import RemoteModelHandler, >> PredictionResult >> from apache_beam.ml.inference.utils import _convert_to_result >> from apache_beam.ml.inference.base import RunInference >> from apache_beam.io.gcp.bigquery import WriteToBigQuery >> from apache_beam.io.filesystems import FileSystems >> from apache_beam.io.gcp.gcsio import GcsIO >> >> # Google Cloud imports >> from google.cloud import storage >> from google.cloud import bigquery >> >> # Set up logging >> logging.basicConfig(level=logging.INFO) >> logger = logging.getLogger(__name__) >> >> # DocLayNet classes >> CLASS_ID_TO_NAME = { >> 0: 'Caption', >> 1: 'Footnote', >> 2: 'Formula', >> 3: 'List-item', >> 4: 'Page-footer', >> 5: 'Page-header', >> 6: 'Picture', >> 7: 'Section-header', >> 8: 'Table', >> 9: 'Text', >> 10: 'Title' >> } >> class DownloadPDFFromGCS(beam.DoFn): >> """Download a PDF from Google Cloud Storage.""" >> >> def __init__(self, temp_dir=None): >> self.temp_dir = temp_dir or tempfile.gettempdir() >> >> def process(self, gcs_uri): >> try: >> # Parse GCS URI >> if not gcs_uri.startswith("gs://"): >> raise ValueError(f"Invalid GCS URI: {gcs_uri}") >> >> # Remove gs:// prefix and split into bucket and blob path >> path_parts = gcs_uri[5:].split("/", 1) >> bucket_name = path_parts[0] >> blob_path = path_parts[1] >> >> # Get filename from blob path >> filename = os.path.basename(blob_path) >> local_path = os.path.join(self.temp_dir, filename) >> >> # Create temp directory if it doesn't exist >> os.makedirs(self.temp_dir, exist_ok=True) >> >> try: >> # Download using Beam's GcsIO >> with FileSystems.open(gcs_uri, 'rb') as gcs_file: >> with open(local_path, 'wb') as local_file: >> local_file.write(gcs_file.read()) >> >> logger.info(f"Downloaded {gcs_uri} to {local_path}") >> >> # Return a dictionary with the local path and original URI >> yield { >> 'local_path': local_path, >> 'gcs_uri': gcs_uri, >> 'filename': filename >> } >> except Exception as e: >> logger.error(f"Error reading from GCS: {str(e)}") >> # Try alternative download method >> logger.info(f"Trying alternative download method for >> {gcs_uri}") >> >> # For testing with local files >> if os.path.exists(gcs_uri.replace("gs://", "")): >> local_path = gcs_uri.replace("gs://", "") >> logger.info(f"Using local file: {local_path}") >> yield { >> 'local_path': local_path, >> 'gcs_uri': gcs_uri, >> 'filename': os.path.basename(local_path) >> } >> else: >> # Try using gsutil command >> import subprocess >> try: >> subprocess.run(["gsutil", "cp", gcs_uri, >> local_path], check=True) >> logger.info(f"Downloaded {gcs_uri} to >> {local_path} using gsutil") >> yield { >> 'local_path': local_path, >> 'gcs_uri': gcs_uri, >> 'filename': filename >> } >> except Exception as e2: >> logger.error(f"Failed to download using gsutil: >> {str(e2)}") >> >> except Exception as e: >> logger.error(f"Error downloading {gcs_uri}: {str(e)}") >> class LoadPDFPages(beam.DoFn): >> """Load PDF pages as images.""" >> >> def __init__(self, dpi=200): >> self.dpi = dpi >> >> def process(self, element): >> doc = None >> try: >> # Make sure we have all required fields >> if not isinstance(element, dict): >> logger.error(f"Expected dictionary, got {type(element)}") >> return >> >> if 'local_path' not in element: >> logger.error("Missing 'local_path' in element") >> return >> >> local_path = element['local_path'] >> gcs_uri = element.get('gcs_uri', '') >> >> # Extract filename from local_path if not provided >> filename = element.get('filename', >> os.path.basename(local_path)) >> >> logger.info(f"Loading PDF: {local_path}, filename: >> {filename}") >> >> # Check if file exists and is accessible >> if not os.path.exists(local_path): >> logger.error(f"File not found: {local_path}") >> return >> >> if not os.access(local_path, os.R_OK): >> logger.error(f"File not readable: {local_path}") >> return >> >> # Open the PDF >> try: >> doc = fitz.open(local_path) >> if doc.is_closed: >> logger.error(f"Failed to open PDF: {local_path}") >> return >> except Exception as e: >> logger.error(f"Error opening PDF {local_path}: {str(e)}") >> return >> >> # Process each page >> page_count = len(doc) >> logger.info(f"Processing {page_count} pages from >> {local_path}") >> >> for i in range(page_count): >> try: >> if doc.is_closed: >> logger.error(f"Document was closed unexpectedly >> while processing page {i}") >> break >> >> page = doc[i] >> if page is None: >> logger.error(f"Failed to get page {i} from >> document") >> continue >> >> # Use a higher resolution for better quality >> scale = self.dpi / 72.0 >> mat = fitz.Matrix(scale, scale) >> >> try: >> pix = page.get_pixmap(matrix=mat, alpha=False) >> except Exception as e: >> logger.error(f"Error getting pixmap for page {i}: >> {str(e)}") >> continue >> >> # Check pixmap dimensions >> if pix.height <= 0 or pix.width <= 0 or pix.n <= 0: >> logger.error(f"Invalid pixmap dimensions: >> {pix.width}x{pix.height}x{pix.n}") >> continue >> >> # Convert to numpy array >> try: >> arr = np.frombuffer(pix.samples, >> dtype=np.uint8).reshape(pix.height, pix.width, pix.n) >> except Exception as e: >> logger.error(f"Error converting pixmap to numpy >> array: {str(e)}") >> continue >> >> # Convert BGR to RGB if needed >> if pix.n == 3: # RGB >> try: >> arr = cv2.cvtColor(arr, cv2.COLOR_BGR2RGB) >> except Exception as e: >> logger.error(f"Error converting BGR to RGB: >> {str(e)}") >> continue >> >> # Store original size for later use >> original_size = (arr.shape[0], arr.shape[1]) >> >> # Create page info >> page_info = { >> 'page_num': i, >> 'image': arr, >> 'original_size': original_size, >> 'local_path': local_path, >> 'gcs_uri': gcs_uri, >> 'filename': filename >> } >> >> # Use document ID and page number as key >> doc_id = os.path.splitext(filename)[0] >> key = f"{doc_id}_{i}" >> >> yield (key, page_info) >> except Exception as e: >> import traceback >> logger.error(f"Error processing page {i}: {str(e)}") >> logger.error(traceback.format_exc()) >> >> logger.info(f"Loaded {len(doc)} pages from {local_path}") >> >> except Exception as e: >> import traceback >> logger.error(f"Error loading PDF: {str(e)}") >> logger.error(traceback.format_exc()) >> finally: >> # Make sure to close the document only if it was successfully >> opened >> if doc is not None: >> try: >> if not doc.is_closed: >> doc.close() >> except Exception as e: >> logger.debug(f"Error closing document: {str(e)}") >> >> class PreprocessImage(beam.DoFn): >> """Preprocess image for Triton server.""" >> >> def __init__(self, size=1024): >> self.size = size >> >> def letterbox(self, img, new_shape=1024, color=(114,114,114)): >> """Resize and pad image to target size.""" >> h, w = img.shape[:2] >> r = min(new_shape / h, new_shape / w) >> nh, nw = int(round(h * r)), int(round(w * r)) >> pad_h, pad_w = new_shape - nh, new_shape - nw >> top = pad_h // 2 >> bottom = pad_h - top >> left = pad_w // 2 >> right = pad_w - left >> img = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR) >> img = cv2.copyMakeBorder(img, top, bottom, left, right, >> cv2.BORDER_CONSTANT, value=color) >> return img, r, left, top >> >> def process(self, element): >> try: >> if not isinstance(element, tuple) or len(element) != 2: >> logger.error(f"Expected (key, value) tuple, got >> {type(element)}") >> return >> >> key, page_info = element >> >> if not isinstance(page_info, dict): >> logger.error(f"Expected dictionary for page_info, got >> {type(page_info)}") >> return >> >> if 'image' not in page_info: >> logger.error("Missing 'image' in page_info") >> return >> >> # Create a new dictionary to avoid modifying the input >> new_page_info = dict(page_info) >> >> # Apply letterbox resize >> img = new_page_info['image'] >> lb, r, left, top = self.letterbox(img, new_shape=self.size) >> >> # Convert to float32 and normalize to [0,1] >> x = lb.astype(np.float32) / 255.0 >> >> # Convert to CHW format >> x = np.transpose(x, (2, 0, 1)) >> >> # Add batch dimension >> batched_img = np.expand_dims(x, axis=0) >> >> # Update page info >> new_page_info['preprocessed_image'] = batched_img >> new_page_info['letterbox_info'] = (r, left, top) >> >> yield (key, new_page_info) >> >> except Exception as e: >> import traceback >> logger.error(f"Error preprocessing image: {str(e)}") >> logger.error(traceback.format_exc()) >> >> >> >> class ExtractBoxes(beam.DoFn): >> """Extract bounding boxes from Triton response.""" >> >> def __init__(self, conf_th=0.25, iou_th=0.7, model_size=1024): >> self.conf_th = conf_th >> self.iou_th = iou_th >> self.model_size = model_size >> >> def _nms(self, boxes, scores, iou_th=0.7): >> """Non-Maximum Suppression""" >> if len(boxes) == 0: >> return [] >> >> boxes = boxes.astype(np.float32) >> x1, y1, x2, y2 = boxes.T >> areas = (x2 - x1) * (y2 - y1) >> order = scores.argsort()[::-1] >> >> keep = [] >> while order.size > 0: >> i = order[0] >> keep.append(i) >> >> xx1 = np.maximum(x1[i], x1[order[1:]]) >> yy1 = np.maximum(y1[i], y1[order[1:]]) >> xx2 = np.minimum(x2[i], x2[order[1:]]) >> yy2 = np.minimum(y2[i], y2[order[1:]]) >> >> w = np.maximum(0.0, xx2 - xx1) >> h = np.maximum(0.0, yy2 - yy1) >> inter = w * h >> >> iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-9) >> inds = np.where(iou <= iou_th)[0] >> order = order[inds + 1] >> >> return keep >> >> def process(self, page_info): >> try: >> triton_response = page_info['triton_response'] >> original_size = page_info['original_size'] >> r, left, top = page_info['letterbox_info'] >> >> if "outputs" not in triton_response or not >> triton_response["outputs"]: >> logger.error("Invalid response from Triton server") >> return [] >> >> out_meta = triton_response["outputs"][0] >> shape = out_meta["shape"] >> data = np.array(out_meta["data"], >> dtype=np.float32).reshape(shape) >> >> logger.info(f"Output shape: {shape}") >> >> # For YOLO output [B, C, P] where C is channels (box coords + >> objectness + classes) >> B, C, P = shape >> >> # Assuming 4 box coordinates + class probabilities (no >> objectness) >> has_objectness = False >> num_classes = C - 5 if has_objectness else C - 4 >> >> # Extract data >> xywh = data[:, 0:4, :] >> if has_objectness: >> obj = data[:, 4:5, :] >> cls = data[:, 5:5 + num_classes, :] >> else: >> obj = None >> cls = data[:, 4:4 + num_classes, :] >> >> # Process batch item (we only have one) >> b = 0 >> h, w = original_size >> >> xywh_b = xywh[b].T # (P,4) >> if obj is not None: >> obj_b = obj[b].T.squeeze(1) # (P,) >> else: >> obj_b = np.ones((P,), dtype=np.float32) >> cls_b = cls[b].T # (P,nc) >> >> # Get scores and labels >> scores_all = (obj_b[:, None] * cls_b) if obj is not None else >> cls_b >> labels = scores_all.argmax(axis=1) >> scores = scores_all.max(axis=1) >> >> # Filter by confidence threshold >> keep = scores >= self.conf_th >> if not np.any(keep): >> logger.info(f"No detections above threshold >> {self.conf_th}") >> return [] >> >> xywh_k = xywh_b[keep] >> scores_k = scores[keep] >> labels_k = labels[keep] >> >> # xywh -> xyxy in model space >> cx, cy, ww, hh = xywh_k.T >> xyxy_model = np.stack([cx - ww / 2, cy - hh / 2, cx + ww / 2, >> cy + hh / 2], axis=1) >> >> # Apply NMS per class >> final_boxes = [] >> final_scores = [] >> final_labels = [] >> >> for c in np.unique(labels_k): >> idxs = np.where(labels_k == c)[0] >> if idxs.size == 0: >> continue >> keep_idx = self._nms(xyxy_model[idxs], scores_k[idxs], >> iou_th=self.iou_th) >> final_boxes.append(xyxy_model[idxs][keep_idx]) >> final_scores.append(scores_k[idxs][keep_idx]) >> final_labels.append(np.full(len(keep_idx), c, dtype=int)) >> >> if not final_boxes: >> logger.info("No detections after NMS") >> return [] >> >> xyxy_model = np.vstack(final_boxes) >> scores_k = np.concatenate(final_scores) >> labels_k = np.concatenate(final_labels) >> >> # Map boxes from model space to original image space >> xyxy_orig = xyxy_model.copy() >> >> # Remove padding >> xyxy_orig[:, [0, 2]] -= left >> xyxy_orig[:, [1, 3]] -= top >> >> # Scale back to original size >> xyxy_orig /= r >> >> # Clip to image boundaries >> xyxy_orig[:, 0::2] = np.clip(xyxy_orig[:, 0::2], 0, w - 1) >> xyxy_orig[:, 1::2] = np.clip(xyxy_orig[:, 1::2], 0, h - 1) >> >> # Format as requested: x_min, y_min, x_max, y_max, class, >> probability >> boxes = [] >> for (x1, y1, x2, y2), label, score in zip(xyxy_orig, >> labels_k, scores_k): >> class_name = CLASS_ID_TO_NAME.get(int(label)) >> box_info = { >> "page": page_info['page_num'], >> "x_min": float(x1), >> "y_min": float(y1), >> "x_max": float(x2), >> "y_max": float(y2), >> "class": int(label), >> "class_name": class_name, >> "probability": float(score), >> "filename": page_info['filename'], >> "local_path": page_info['local_path'], >> "gcs_uri": page_info['gcs_uri'] >> } >> boxes.append(box_info) >> >> logger.info(f"Extracted {len(boxes)} boxes from page >> {page_info['page_num']}") >> >> return boxes >> >> except Exception as e: >> logger.error(f"Error extracting boxes: {str(e)}") >> return [] >> >> class PrepareForBigQuery(beam.DoFn): >> """Prepare data for BigQuery insertion.""" >> >> def process(self, box_info): >> try: >> # Generate UUIDs for primary keys >> v_note_id = str(uuid.uuid4()) >> page_ocr_id = str(uuid.uuid4()) >> class_prediction_id = str(uuid.uuid4()) >> >> # Create timestamp >> processing_time = datetime.datetime.now().strftime("%Y-%m-%d >> %H:%M:%S") >> >> # Create ocr_results row >> ocr_results_row = { >> "v_note_id": v_note_id, >> "filename": box_info['filename'], >> "file_path": box_info['gcs_uri'], >> "processing_time": processing_time, >> "file_type": "pdf" >> } >> >> # Create page_ocr row >> page_ocr_row = { >> "page_ocr_id": page_ocr_id, >> "v_note_id": v_note_id, >> "page_number": box_info['page'] >> } >> >> # Create class_prediction row >> class_prediction_row = { >> "class_prediction_id": class_prediction_id, >> "page_ocr_id": page_ocr_id, >> "xmin": box_info['x_min'], >> "ymin": box_info['y_min'], >> "xmax": box_info['x_max'], >> "ymax": box_info['y_max'], >> "class": box_info['class_name'] if box_info['class_name'] >> else str(box_info['class']), >> "confidence": box_info['probability'] >> } >> >> # Return all three rows with table names >> return [ >> ('ocr_results', ocr_results_row), >> ('page_ocr', page_ocr_row), >> ('class_prediction', class_prediction_row) >> ] >> >> except Exception as e: >> logger.error(f"Error preparing for BigQuery: {str(e)}") >> return [] >> >> model_handler = TensorRTEngineHandlerNumPy( >> min_batch_size=1, >> max_batch_size=1, >> engine_path="gs://temp/yolov11l-doclaynet.engine", >> ) >> >> >> with beam.Pipeline(options=options) as pipeline: >> >> # Create PCollection from input URIs >> pdf_uris = ( >> pipeline >> | "Create URIs" >> beam.Create(["tmp.pdf"]) >> ) >> >> # Download PDFs >> local_pdfs = ( >> pdf_uris >> | "Download PDFs" >> beam.ParDo(DownloadPDFFromGCS()) >> ) >> >> # Load PDF pages >> pdf_pages = ( >> local_pdfs >> | "Load PDF Pages" >> beam.ParDo(LoadPDFPages()) >> #| "Flatten Pages" >> beam.FlatMap(lambda x: x) >> ) >> >> # Preprocess images >> preprocessed_pages = ( >> pdf_pages >> | "Preprocess Images" >> beam.ParDo(PreprocessImage()) >> ) >> inference_results = ( >> preprocessed_pages >> | "Run Inference" >> RunInference(model_handler=model_handler) >> ) >> >> On Tue, 16 Sept 2025 at 21:23, XQ Hu <[email protected]> wrote: >> >>> Can you share your commands and outputs? >>> >>> On Tue, Sep 16, 2025 at 9:02 PM Sai Shashank <[email protected]> >>> wrote: >>> >>>> Okay I have changed the docker image but to now to RUN the python >>>> command but it is still halting without are error or warnings or errors >>>> >>>> On Tue, 16 Sept 2025 at 17:38, XQ Hu via dev <[email protected]> >>>> wrote: >>>> >>>>> The CMD is not necessary as it will be overridden by the ENTRYPOINT >>>>> just like your comment. >>>>> >>>>> If you ssh to your Docker container like `docker run --rm -it >>>>> --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE`, can you run python and >>>>> some Beam pipelines with a direct runner in the container? This can help >>>>> test the environment works fine. >>>>> >>>>> I have one old Dockerfile that used to work with the old Beam: >>>>> https://github.com/google/dataflow-ml-starter/blob/main/tensor_rt.Dockerfile >>>>> . >>>>> >>>>> On Tue, Sep 16, 2025 at 4:56 PM Sai Shashank <[email protected]> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> ---------- Forwarded message --------- >>>>>> From: Sai Shashank <[email protected]> >>>>>> Date: Tue, Sep 16, 2025 at 4:27 PM >>>>>> Subject: TensorRT inference not starting >>>>>> To: <[email protected]> >>>>>> >>>>>> >>>>>> Hey Everyone, >>>>>> I was trying to use tensorRT within the >>>>>> apache beam on dataflow but somehow , dataflow didn't start like it did >>>>>> not >>>>>> even give me Worker logs. Below is the docker file that , use to create a >>>>>> custom image, at first I thought it is the version mismatched but >>>>>> usually >>>>>> it gives me a harness error . >>>>>> >>>>>> ARG BUILD_IMAGE=nvcr.io/nvidia/tensorrt:25.08-py3 >>>>>> FROM ${BUILD_IMAGE} >>>>>> ENV PATH="/usr/src/tensorrt/bin:${PATH}" >>>>>> >>>>>> WORKDIR /workspace >>>>>> >>>>>> RUN apt-get update -y && apt-get install -y python3-venv >>>>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.67.0 >>>>>> >>>>>> COPY --from=apache/beam_python3.10_sdk:2.67.0 /opt/apache/beam >>>>>> /opt/apache/beam >>>>>> >>>>>> # Install additional dependencies >>>>>> RUN pip install --upgrade pip \ >>>>>> && pip install torch \ >>>>>> && pip install torchvision \ >>>>>> && pip install pillow>=8.0.0 \ >>>>>> && pip install transformers>=4.18.0 \ >>>>>> && pip install cuda-python \ >>>>>> && pip install opencv-python==4.7.0.72 \ >>>>>> && pip install PyMuPDF==1.22.5 \ >>>>>> && pip install requests==2.31.0 >>>>>> >>>>>> # Set the default command to run the inference script >>>>>> # This will be overridden by the Apache Beam boot script >>>>>> CMD ["python", "/workspace/inference.py"] >>>>>> >>>>>> # Use the Apache Beam boot script as the entrypoint >>>>>> ENTRYPOINT ["/opt/apache/beam/boot"] >>>>>> >>>>>> >>>>>>
