Sounds good. Thanks! On Sat, Sep 27, 2025 at 12:18 AM Sai Shashank <[email protected]> wrote:
> So, can I make an issue and raise PR for the updated TensorRT handler? > > On Sat, 20 Sept 2025 at 08:46, XQ Hu <[email protected]> wrote: > >> This is what Beam tests use >> https://github.com/apache/beam/blob/master/sdks/python/test-suites/containers/tensorrt_runinference/tensor_rt.dockerfile#L17 >> >> nvcr.io/nvidia/tensorrt:23.05-py3 >> >> From the latest doc: >> https://docs.nvidia.com/deeplearning/tensorrt/latest/_static/python-api/infer/Core/Engine.html#icudaengine >> and https://github.com/NVIDIA/TensorRT/issues/4216, num_io_tensors >> should be used now. >> >> You can either use the tensorRT version Beam supports now or you can >> define your own TensorRTEngine model handler with the new tensorRT. >> >> >> On Sat, Sep 20, 2025 at 1:10 AM Sai Shashank <[email protected]> >> wrote: >> >>> So finally, I was able to resolve the issue of docker image but now, I >>> saw this error ^^^^^^^^^^^^^^^^^^^^^^ >>> File >>> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/tensorrt_inference.py", >>> line 132, in __init__ >>> for i in range(self.engine.num_bindings): >>> ^^^^^^^^^^^^^^^^^^^^^^^^ >>> AttributeError: 'tensorrt.tensorrt.ICudaEngine' object has no attribute >>> 'num_bindings' >>> Traceback (most recent call last): >>> File "apache_beam/runners/common.py", line 1562, in >>> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method >>> File "apache_beam/runners/common.py", line 602, in >>> apache_beam.runners.common.DoFnInvoker.invoke_setup >>> File >>> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py", >>> line 1882, in setup >>> self._model = self._load_model() >>> ^^^^^^^^^^^^^^^^^^ >>> File >>> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py", >>> line 1848, in _load_model >>> model = self._shared_model_handle.acquire(load, tag=self._cur_tag) >>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ >>> >>> I have seen this error where the tensorrt version is older than 10.x . >>> Is there any undated tensorRT handler , or am I doing something wrong ? >>> >>> On Fri, 19 Sept 2025 at 09:15, XQ Hu <[email protected]> wrote: >>> >>>> GPU driver: DRIVER_VERSION=535.261.03 >>>> From the log, the driver was installed correctly (make sure this can be >>>> used for your tensor RT.) >>>> >>>> "Error syncing pod, skipping" err="failed to \"StartContainer\" for >>>> \"sdk-0-0\" with ErrImagePull: \"failed to pull and unpack image \\\" >>>> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\\\ >>>> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C%5C%5C>": >>>> failed to extract layer >>>> sha256:a848022a4558c435b349317630b139960b44ae09f218ab7f93f764ba4661607d: >>>> write >>>> /var/lib/containerd/io.containerd.snapshotter.v1.gcfs/snapshotter/snapshots/55/fs/usr/local/cuda-13.0/targets/x86_64-linux/lib/libcusparseLt.so.0.8.0.4: >>>> no space left on device: unknown\"" >>>> pod="default/df-inference-pipeline-4-09182012-1a3n-harness-rvxc" >>>> podUID="3a9b74645db23e77932d981439f1d3cc" >>>> >>>> The Dataflow worker cannot unpack the image: no space left on device >>>> >>>> Try >>>> https://cloud.google.com/dataflow/docs/guides/configure-worker-vm#disk-size >>>> >>>> >>>> >>>> >>>> On Thu, Sep 18, 2025 at 11:27 PM Sai Shashank <[email protected]> >>>> wrote: >>>> >>>>> So, I was able the start, dataflow : >>>>> 2025-09-18_20_12_41-10973298801093076892 >>>>> , but not it having this error: 2025-09-18 23:21:25.401 EDT >>>>> SDK harnesses are not healthy after 5 minutes, status: Waiting for 4 >>>>> of 4 SDK Harnesses to register. I have noticed this error when there is a >>>>> mismatch of environments. As advice by you I try running Direct Runner in >>>>> the docker image and it was running perfectly. is there any tips which you >>>>> would give me to correct this error ? >>>>> >>>>> >>>>> On Wed, 17 Sept 2025 at 09:42, XQ Hu <[email protected]> wrote: >>>>> >>>>>> From the worker log, >>>>>> >>>>>> "Failed to read pods from URL" err="invalid pod: >>>>>> [spec.containers[3].image: Invalid value: \" >>>>>> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\ >>>>>> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C>": >>>>>> must not have leading or trailing whitespace]" >>>>>> >>>>>> 2025-09-16_17_52_06-10817935125972705087 >>>>>> >>>>>> Looks like you specify the image URL with the leading whitespace. >>>>>> Remove it and give it a try. >>>>>> >>>>>> And if you have any further questions about GPUs, I highly recommend >>>>>> you start the VM with L4 GPU and pull your image and ssh into it and run >>>>>> your pipeline locally with DirectRunner. That can make sure all your code >>>>>> works. >>>>>> >>>>>> >>>>>> On Wed, Sep 17, 2025 at 9:25 AM XQ Hu <[email protected]> wrote: >>>>>> >>>>>>> I saw it. Let me follow up internally. >>>>>>> >>>>>>> On Tue, Sep 16, 2025 at 10:10 PM Sai Shashank < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> I already I have open one but opening one more : case number is >>>>>>>> 63121285 >>>>>>>> >>>>>>>> On Tue, Sep 16, 2025 at 10:05 PM XQ Hu <[email protected]> wrote: >>>>>>>> >>>>>>>>> Can you open a cloud support ticket? That can give us the >>>>>>>>> permission to access your job. >>>>>>>>> >>>>>>>>> On Tue, Sep 16, 2025, 9:57 PM Sai Shashank < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hey , can we connect on my office mail? since I could share more >>>>>>>>>> stuff like pipeline options and other stuff there better and I work >>>>>>>>>> at CVS >>>>>>>>>> so that way it would be under compliance too >>>>>>>>>> >>>>>>>>>> On Tue, 16 Sept 2025 at 21:54, Sai Shashank < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> The code works without the custom image of TensorRT, I will only >>>>>>>>>>> get this: >>>>>>>>>>> >>>>>>>>>>> ject to benefit from faster worker startup and autoscaling. If you >>>>>>>>>>> experience container-startup related issues, pass the >>>>>>>>>>> "disable_image_streaming" experiment to disable image streaming for >>>>>>>>>>> the job. >>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:10.327Z: >>>>>>>>>>> JOB_MESSAGE_BASIC: Worker configuration: g2-standard-4 in >>>>>>>>>>> us-east4-c. >>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.480Z: >>>>>>>>>>> JOB_MESSAGE_BASIC: Executing operation [10]: Create >>>>>>>>>>> URIs/Impulse+[10]: Create URIs/FlatMap(<lambda at >>>>>>>>>>> core.py:3994>)+[10]: Create URIs/Map(decode)+[10]: Download >>>>>>>>>>> PDFs+[10]: Load PDF Pages+[10]: Preprocess Images+[10]: Run >>>>>>>>>>> Inference/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)+[10]: Run >>>>>>>>>>> Inference/BeamML_RunInference >>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.543Z: >>>>>>>>>>> JOB_MESSAGE_BASIC: Starting 1 workers in us-east4... >>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:Job >>>>>>>>>>> 2025-09-16_17_52_06-10817935125972705087 is in state >>>>>>>>>>> JOB_STATE_RUNNING >>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>>> httplib2.Http instance. >>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>>> httplib2.Http instance. >>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>>> httplib2.Http instance. >>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>>> httplib2.Http instance. >>>>>>>>>>> >>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>>> httplib2.Http >>>>>>>>>>> instance. >>>>>>>>>>> >>>>>>>>>>> just recurring and after an hour this message pops up Workflow >>>>>>>>>>> failed. Causes: The Dataflow job appears to be stuck because no >>>>>>>>>>> worker >>>>>>>>>>> activity has been seen in the last 1h. For more information, see >>>>>>>>>>> https://cloud.google.com/dataflow/docs/guides/common-errors#error-syncing-pod. >>>>>>>>>>> You can also get help with Cloud Dataflow at >>>>>>>>>>> https://cloud.google.com/dataflow/support. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, 16 Sept 2025 at 21:35, XQ Hu <[email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Does the code work without using TensorRT? Any logs? >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Sep 16, 2025 at 9:28 PM Sai Shashank < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> import apache_beam as beam >>>>>>>>>>>>> from apache_beam.ml.inference.tensorrt_inference import >>>>>>>>>>>>> TensorRTEngineHandlerNumPy >>>>>>>>>>>>> from apache_beam.ml.inference.base import RunInference >>>>>>>>>>>>> >>>>>>>>>>>>> #!/usr/bin/env python3 >>>>>>>>>>>>> """ >>>>>>>>>>>>> Apache Beam pipeline for processing PDFs with Triton server >>>>>>>>>>>>> and saving results to BigQuery. >>>>>>>>>>>>> This pipeline combines functionality from >>>>>>>>>>>>> test_triton_document.py, create_bigquery_tables.py, >>>>>>>>>>>>> and save_to_bigquery.py into a single workflow. >>>>>>>>>>>>> """ >>>>>>>>>>>>> >>>>>>>>>>>>> import os >>>>>>>>>>>>> import sys >>>>>>>>>>>>> import json >>>>>>>>>>>>> import uuid >>>>>>>>>>>>> import argparse >>>>>>>>>>>>> import logging >>>>>>>>>>>>> import tempfile >>>>>>>>>>>>> import datetime >>>>>>>>>>>>> import requests >>>>>>>>>>>>> import numpy as np >>>>>>>>>>>>> import cv2 >>>>>>>>>>>>> from PIL import Image >>>>>>>>>>>>> import fitz # PyMuPDF >>>>>>>>>>>>> from pathlib import Path >>>>>>>>>>>>> from typing import Dict, List, Tuple, Any, Optional, Iterator >>>>>>>>>>>>> >>>>>>>>>>>>> # Apache Beam imports >>>>>>>>>>>>> import apache_beam as beam >>>>>>>>>>>>> from apache_beam.options.pipeline_options import >>>>>>>>>>>>> PipelineOptions, SetupOptions >>>>>>>>>>>>> from apache_beam.ml.inference.base import RemoteModelHandler, >>>>>>>>>>>>> PredictionResult >>>>>>>>>>>>> from apache_beam.ml.inference.utils import _convert_to_result >>>>>>>>>>>>> from apache_beam.ml.inference.base import RunInference >>>>>>>>>>>>> from apache_beam.io.gcp.bigquery import WriteToBigQuery >>>>>>>>>>>>> from apache_beam.io.filesystems import FileSystems >>>>>>>>>>>>> from apache_beam.io.gcp.gcsio import GcsIO >>>>>>>>>>>>> >>>>>>>>>>>>> # Google Cloud imports >>>>>>>>>>>>> from google.cloud import storage >>>>>>>>>>>>> from google.cloud import bigquery >>>>>>>>>>>>> >>>>>>>>>>>>> # Set up logging >>>>>>>>>>>>> logging.basicConfig(level=logging.INFO) >>>>>>>>>>>>> logger = logging.getLogger(__name__) >>>>>>>>>>>>> >>>>>>>>>>>>> # DocLayNet classes >>>>>>>>>>>>> CLASS_ID_TO_NAME = { >>>>>>>>>>>>> 0: 'Caption', >>>>>>>>>>>>> 1: 'Footnote', >>>>>>>>>>>>> 2: 'Formula', >>>>>>>>>>>>> 3: 'List-item', >>>>>>>>>>>>> 4: 'Page-footer', >>>>>>>>>>>>> 5: 'Page-header', >>>>>>>>>>>>> 6: 'Picture', >>>>>>>>>>>>> 7: 'Section-header', >>>>>>>>>>>>> 8: 'Table', >>>>>>>>>>>>> 9: 'Text', >>>>>>>>>>>>> 10: 'Title' >>>>>>>>>>>>> } >>>>>>>>>>>>> class DownloadPDFFromGCS(beam.DoFn): >>>>>>>>>>>>> """Download a PDF from Google Cloud Storage.""" >>>>>>>>>>>>> >>>>>>>>>>>>> def __init__(self, temp_dir=None): >>>>>>>>>>>>> self.temp_dir = temp_dir or tempfile.gettempdir() >>>>>>>>>>>>> >>>>>>>>>>>>> def process(self, gcs_uri): >>>>>>>>>>>>> try: >>>>>>>>>>>>> # Parse GCS URI >>>>>>>>>>>>> if not gcs_uri.startswith("gs://"): >>>>>>>>>>>>> raise ValueError(f"Invalid GCS URI: {gcs_uri}") >>>>>>>>>>>>> >>>>>>>>>>>>> # Remove gs:// prefix and split into bucket and >>>>>>>>>>>>> blob path >>>>>>>>>>>>> path_parts = gcs_uri[5:].split("/", 1) >>>>>>>>>>>>> bucket_name = path_parts[0] >>>>>>>>>>>>> blob_path = path_parts[1] >>>>>>>>>>>>> >>>>>>>>>>>>> # Get filename from blob path >>>>>>>>>>>>> filename = os.path.basename(blob_path) >>>>>>>>>>>>> local_path = os.path.join(self.temp_dir, filename) >>>>>>>>>>>>> >>>>>>>>>>>>> # Create temp directory if it doesn't exist >>>>>>>>>>>>> os.makedirs(self.temp_dir, exist_ok=True) >>>>>>>>>>>>> >>>>>>>>>>>>> try: >>>>>>>>>>>>> # Download using Beam's GcsIO >>>>>>>>>>>>> with FileSystems.open(gcs_uri, 'rb') as >>>>>>>>>>>>> gcs_file: >>>>>>>>>>>>> with open(local_path, 'wb') as local_file: >>>>>>>>>>>>> local_file.write(gcs_file.read()) >>>>>>>>>>>>> >>>>>>>>>>>>> logger.info(f"Downloaded {gcs_uri} to >>>>>>>>>>>>> {local_path}") >>>>>>>>>>>>> >>>>>>>>>>>>> # Return a dictionary with the local path and >>>>>>>>>>>>> original URI >>>>>>>>>>>>> yield { >>>>>>>>>>>>> 'local_path': local_path, >>>>>>>>>>>>> 'gcs_uri': gcs_uri, >>>>>>>>>>>>> 'filename': filename >>>>>>>>>>>>> } >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> logger.error(f"Error reading from GCS: >>>>>>>>>>>>> {str(e)}") >>>>>>>>>>>>> # Try alternative download method >>>>>>>>>>>>> logger.info(f"Trying alternative download >>>>>>>>>>>>> method for {gcs_uri}") >>>>>>>>>>>>> >>>>>>>>>>>>> # For testing with local files >>>>>>>>>>>>> if os.path.exists(gcs_uri.replace("gs://", >>>>>>>>>>>>> "")): >>>>>>>>>>>>> local_path = gcs_uri.replace("gs://", "") >>>>>>>>>>>>> logger.info(f"Using local file: >>>>>>>>>>>>> {local_path}") >>>>>>>>>>>>> yield { >>>>>>>>>>>>> 'local_path': local_path, >>>>>>>>>>>>> 'gcs_uri': gcs_uri, >>>>>>>>>>>>> 'filename': >>>>>>>>>>>>> os.path.basename(local_path) >>>>>>>>>>>>> } >>>>>>>>>>>>> else: >>>>>>>>>>>>> # Try using gsutil command >>>>>>>>>>>>> import subprocess >>>>>>>>>>>>> try: >>>>>>>>>>>>> subprocess.run(["gsutil", "cp", >>>>>>>>>>>>> gcs_uri, local_path], check=True) >>>>>>>>>>>>> logger.info(f"Downloaded {gcs_uri} to >>>>>>>>>>>>> {local_path} using gsutil") >>>>>>>>>>>>> yield { >>>>>>>>>>>>> 'local_path': local_path, >>>>>>>>>>>>> 'gcs_uri': gcs_uri, >>>>>>>>>>>>> 'filename': filename >>>>>>>>>>>>> } >>>>>>>>>>>>> except Exception as e2: >>>>>>>>>>>>> logger.error(f"Failed to download >>>>>>>>>>>>> using gsutil: {str(e2)}") >>>>>>>>>>>>> >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> logger.error(f"Error downloading {gcs_uri}: >>>>>>>>>>>>> {str(e)}") >>>>>>>>>>>>> class LoadPDFPages(beam.DoFn): >>>>>>>>>>>>> """Load PDF pages as images.""" >>>>>>>>>>>>> >>>>>>>>>>>>> def __init__(self, dpi=200): >>>>>>>>>>>>> self.dpi = dpi >>>>>>>>>>>>> >>>>>>>>>>>>> def process(self, element): >>>>>>>>>>>>> doc = None >>>>>>>>>>>>> try: >>>>>>>>>>>>> # Make sure we have all required fields >>>>>>>>>>>>> if not isinstance(element, dict): >>>>>>>>>>>>> logger.error(f"Expected dictionary, got >>>>>>>>>>>>> {type(element)}") >>>>>>>>>>>>> return >>>>>>>>>>>>> >>>>>>>>>>>>> if 'local_path' not in element: >>>>>>>>>>>>> logger.error("Missing 'local_path' in element") >>>>>>>>>>>>> return >>>>>>>>>>>>> >>>>>>>>>>>>> local_path = element['local_path'] >>>>>>>>>>>>> gcs_uri = element.get('gcs_uri', '') >>>>>>>>>>>>> >>>>>>>>>>>>> # Extract filename from local_path if not provided >>>>>>>>>>>>> filename = element.get('filename', >>>>>>>>>>>>> os.path.basename(local_path)) >>>>>>>>>>>>> >>>>>>>>>>>>> logger.info(f"Loading PDF: {local_path}, >>>>>>>>>>>>> filename: {filename}") >>>>>>>>>>>>> >>>>>>>>>>>>> # Check if file exists and is accessible >>>>>>>>>>>>> if not os.path.exists(local_path): >>>>>>>>>>>>> logger.error(f"File not found: {local_path}") >>>>>>>>>>>>> return >>>>>>>>>>>>> >>>>>>>>>>>>> if not os.access(local_path, os.R_OK): >>>>>>>>>>>>> logger.error(f"File not readable: >>>>>>>>>>>>> {local_path}") >>>>>>>>>>>>> return >>>>>>>>>>>>> >>>>>>>>>>>>> # Open the PDF >>>>>>>>>>>>> try: >>>>>>>>>>>>> doc = fitz.open(local_path) >>>>>>>>>>>>> if doc.is_closed: >>>>>>>>>>>>> logger.error(f"Failed to open PDF: >>>>>>>>>>>>> {local_path}") >>>>>>>>>>>>> return >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> logger.error(f"Error opening PDF {local_path}: >>>>>>>>>>>>> {str(e)}") >>>>>>>>>>>>> return >>>>>>>>>>>>> >>>>>>>>>>>>> # Process each page >>>>>>>>>>>>> page_count = len(doc) >>>>>>>>>>>>> logger.info(f"Processing {page_count} pages from >>>>>>>>>>>>> {local_path}") >>>>>>>>>>>>> >>>>>>>>>>>>> for i in range(page_count): >>>>>>>>>>>>> try: >>>>>>>>>>>>> if doc.is_closed: >>>>>>>>>>>>> logger.error(f"Document was closed >>>>>>>>>>>>> unexpectedly while processing page {i}") >>>>>>>>>>>>> break >>>>>>>>>>>>> >>>>>>>>>>>>> page = doc[i] >>>>>>>>>>>>> if page is None: >>>>>>>>>>>>> logger.error(f"Failed to get page {i} >>>>>>>>>>>>> from document") >>>>>>>>>>>>> continue >>>>>>>>>>>>> >>>>>>>>>>>>> # Use a higher resolution for better >>>>>>>>>>>>> quality >>>>>>>>>>>>> scale = self.dpi / 72.0 >>>>>>>>>>>>> mat = fitz.Matrix(scale, scale) >>>>>>>>>>>>> >>>>>>>>>>>>> try: >>>>>>>>>>>>> pix = page.get_pixmap(matrix=mat, >>>>>>>>>>>>> alpha=False) >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> logger.error(f"Error getting pixmap >>>>>>>>>>>>> for page {i}: {str(e)}") >>>>>>>>>>>>> continue >>>>>>>>>>>>> >>>>>>>>>>>>> # Check pixmap dimensions >>>>>>>>>>>>> if pix.height <= 0 or pix.width <= 0 or >>>>>>>>>>>>> pix.n <= 0: >>>>>>>>>>>>> logger.error(f"Invalid pixmap >>>>>>>>>>>>> dimensions: {pix.width}x{pix.height}x{pix.n}") >>>>>>>>>>>>> continue >>>>>>>>>>>>> >>>>>>>>>>>>> # Convert to numpy array >>>>>>>>>>>>> try: >>>>>>>>>>>>> arr = np.frombuffer(pix.samples, >>>>>>>>>>>>> dtype=np.uint8).reshape(pix.height, pix.width, pix.n) >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> logger.error(f"Error converting pixmap >>>>>>>>>>>>> to numpy array: {str(e)}") >>>>>>>>>>>>> continue >>>>>>>>>>>>> >>>>>>>>>>>>> # Convert BGR to RGB if needed >>>>>>>>>>>>> if pix.n == 3: # RGB >>>>>>>>>>>>> try: >>>>>>>>>>>>> arr = cv2.cvtColor(arr, >>>>>>>>>>>>> cv2.COLOR_BGR2RGB) >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> logger.error(f"Error converting >>>>>>>>>>>>> BGR to RGB: {str(e)}") >>>>>>>>>>>>> continue >>>>>>>>>>>>> >>>>>>>>>>>>> # Store original size for later use >>>>>>>>>>>>> original_size = (arr.shape[0], >>>>>>>>>>>>> arr.shape[1]) >>>>>>>>>>>>> >>>>>>>>>>>>> # Create page info >>>>>>>>>>>>> page_info = { >>>>>>>>>>>>> 'page_num': i, >>>>>>>>>>>>> 'image': arr, >>>>>>>>>>>>> 'original_size': original_size, >>>>>>>>>>>>> 'local_path': local_path, >>>>>>>>>>>>> 'gcs_uri': gcs_uri, >>>>>>>>>>>>> 'filename': filename >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> # Use document ID and page number as key >>>>>>>>>>>>> doc_id = os.path.splitext(filename)[0] >>>>>>>>>>>>> key = f"{doc_id}_{i}" >>>>>>>>>>>>> >>>>>>>>>>>>> yield (key, page_info) >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> import traceback >>>>>>>>>>>>> logger.error(f"Error processing page {i}: >>>>>>>>>>>>> {str(e)}") >>>>>>>>>>>>> logger.error(traceback.format_exc()) >>>>>>>>>>>>> >>>>>>>>>>>>> logger.info(f"Loaded {len(doc)} pages from >>>>>>>>>>>>> {local_path}") >>>>>>>>>>>>> >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> import traceback >>>>>>>>>>>>> logger.error(f"Error loading PDF: {str(e)}") >>>>>>>>>>>>> logger.error(traceback.format_exc()) >>>>>>>>>>>>> finally: >>>>>>>>>>>>> # Make sure to close the document only if it was >>>>>>>>>>>>> successfully opened >>>>>>>>>>>>> if doc is not None: >>>>>>>>>>>>> try: >>>>>>>>>>>>> if not doc.is_closed: >>>>>>>>>>>>> doc.close() >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> logger.debug(f"Error closing document: >>>>>>>>>>>>> {str(e)}") >>>>>>>>>>>>> >>>>>>>>>>>>> class PreprocessImage(beam.DoFn): >>>>>>>>>>>>> """Preprocess image for Triton server.""" >>>>>>>>>>>>> >>>>>>>>>>>>> def __init__(self, size=1024): >>>>>>>>>>>>> self.size = size >>>>>>>>>>>>> >>>>>>>>>>>>> def letterbox(self, img, new_shape=1024, >>>>>>>>>>>>> color=(114,114,114)): >>>>>>>>>>>>> """Resize and pad image to target size.""" >>>>>>>>>>>>> h, w = img.shape[:2] >>>>>>>>>>>>> r = min(new_shape / h, new_shape / w) >>>>>>>>>>>>> nh, nw = int(round(h * r)), int(round(w * r)) >>>>>>>>>>>>> pad_h, pad_w = new_shape - nh, new_shape - nw >>>>>>>>>>>>> top = pad_h // 2 >>>>>>>>>>>>> bottom = pad_h - top >>>>>>>>>>>>> left = pad_w // 2 >>>>>>>>>>>>> right = pad_w - left >>>>>>>>>>>>> img = cv2.resize(img, (nw, nh), >>>>>>>>>>>>> interpolation=cv2.INTER_LINEAR) >>>>>>>>>>>>> img = cv2.copyMakeBorder(img, top, bottom, left, >>>>>>>>>>>>> right, cv2.BORDER_CONSTANT, value=color) >>>>>>>>>>>>> return img, r, left, top >>>>>>>>>>>>> >>>>>>>>>>>>> def process(self, element): >>>>>>>>>>>>> try: >>>>>>>>>>>>> if not isinstance(element, tuple) or len(element) >>>>>>>>>>>>> != 2: >>>>>>>>>>>>> logger.error(f"Expected (key, value) tuple, >>>>>>>>>>>>> got {type(element)}") >>>>>>>>>>>>> return >>>>>>>>>>>>> >>>>>>>>>>>>> key, page_info = element >>>>>>>>>>>>> >>>>>>>>>>>>> if not isinstance(page_info, dict): >>>>>>>>>>>>> logger.error(f"Expected dictionary for >>>>>>>>>>>>> page_info, got {type(page_info)}") >>>>>>>>>>>>> return >>>>>>>>>>>>> >>>>>>>>>>>>> if 'image' not in page_info: >>>>>>>>>>>>> logger.error("Missing 'image' in page_info") >>>>>>>>>>>>> return >>>>>>>>>>>>> >>>>>>>>>>>>> # Create a new dictionary to avoid modifying the >>>>>>>>>>>>> input >>>>>>>>>>>>> new_page_info = dict(page_info) >>>>>>>>>>>>> >>>>>>>>>>>>> # Apply letterbox resize >>>>>>>>>>>>> img = new_page_info['image'] >>>>>>>>>>>>> lb, r, left, top = self.letterbox(img, >>>>>>>>>>>>> new_shape=self.size) >>>>>>>>>>>>> >>>>>>>>>>>>> # Convert to float32 and normalize to [0,1] >>>>>>>>>>>>> x = lb.astype(np.float32) / 255.0 >>>>>>>>>>>>> >>>>>>>>>>>>> # Convert to CHW format >>>>>>>>>>>>> x = np.transpose(x, (2, 0, 1)) >>>>>>>>>>>>> >>>>>>>>>>>>> # Add batch dimension >>>>>>>>>>>>> batched_img = np.expand_dims(x, axis=0) >>>>>>>>>>>>> >>>>>>>>>>>>> # Update page info >>>>>>>>>>>>> new_page_info['preprocessed_image'] = batched_img >>>>>>>>>>>>> new_page_info['letterbox_info'] = (r, left, top) >>>>>>>>>>>>> >>>>>>>>>>>>> yield (key, new_page_info) >>>>>>>>>>>>> >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> import traceback >>>>>>>>>>>>> logger.error(f"Error preprocessing image: >>>>>>>>>>>>> {str(e)}") >>>>>>>>>>>>> logger.error(traceback.format_exc()) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> class ExtractBoxes(beam.DoFn): >>>>>>>>>>>>> """Extract bounding boxes from Triton response.""" >>>>>>>>>>>>> >>>>>>>>>>>>> def __init__(self, conf_th=0.25, iou_th=0.7, >>>>>>>>>>>>> model_size=1024): >>>>>>>>>>>>> self.conf_th = conf_th >>>>>>>>>>>>> self.iou_th = iou_th >>>>>>>>>>>>> self.model_size = model_size >>>>>>>>>>>>> >>>>>>>>>>>>> def _nms(self, boxes, scores, iou_th=0.7): >>>>>>>>>>>>> """Non-Maximum Suppression""" >>>>>>>>>>>>> if len(boxes) == 0: >>>>>>>>>>>>> return [] >>>>>>>>>>>>> >>>>>>>>>>>>> boxes = boxes.astype(np.float32) >>>>>>>>>>>>> x1, y1, x2, y2 = boxes.T >>>>>>>>>>>>> areas = (x2 - x1) * (y2 - y1) >>>>>>>>>>>>> order = scores.argsort()[::-1] >>>>>>>>>>>>> >>>>>>>>>>>>> keep = [] >>>>>>>>>>>>> while order.size > 0: >>>>>>>>>>>>> i = order[0] >>>>>>>>>>>>> keep.append(i) >>>>>>>>>>>>> >>>>>>>>>>>>> xx1 = np.maximum(x1[i], x1[order[1:]]) >>>>>>>>>>>>> yy1 = np.maximum(y1[i], y1[order[1:]]) >>>>>>>>>>>>> xx2 = np.minimum(x2[i], x2[order[1:]]) >>>>>>>>>>>>> yy2 = np.minimum(y2[i], y2[order[1:]]) >>>>>>>>>>>>> >>>>>>>>>>>>> w = np.maximum(0.0, xx2 - xx1) >>>>>>>>>>>>> h = np.maximum(0.0, yy2 - yy1) >>>>>>>>>>>>> inter = w * h >>>>>>>>>>>>> >>>>>>>>>>>>> iou = inter / (areas[i] + areas[order[1:]] - inter >>>>>>>>>>>>> + 1e-9) >>>>>>>>>>>>> inds = np.where(iou <= iou_th)[0] >>>>>>>>>>>>> order = order[inds + 1] >>>>>>>>>>>>> >>>>>>>>>>>>> return keep >>>>>>>>>>>>> >>>>>>>>>>>>> def process(self, page_info): >>>>>>>>>>>>> try: >>>>>>>>>>>>> triton_response = page_info['triton_response'] >>>>>>>>>>>>> original_size = page_info['original_size'] >>>>>>>>>>>>> r, left, top = page_info['letterbox_info'] >>>>>>>>>>>>> >>>>>>>>>>>>> if "outputs" not in triton_response or not >>>>>>>>>>>>> triton_response["outputs"]: >>>>>>>>>>>>> logger.error("Invalid response from Triton >>>>>>>>>>>>> server") >>>>>>>>>>>>> return [] >>>>>>>>>>>>> >>>>>>>>>>>>> out_meta = triton_response["outputs"][0] >>>>>>>>>>>>> shape = out_meta["shape"] >>>>>>>>>>>>> data = np.array(out_meta["data"], >>>>>>>>>>>>> dtype=np.float32).reshape(shape) >>>>>>>>>>>>> >>>>>>>>>>>>> logger.info(f"Output shape: {shape}") >>>>>>>>>>>>> >>>>>>>>>>>>> # For YOLO output [B, C, P] where C is channels >>>>>>>>>>>>> (box coords + objectness + classes) >>>>>>>>>>>>> B, C, P = shape >>>>>>>>>>>>> >>>>>>>>>>>>> # Assuming 4 box coordinates + class probabilities >>>>>>>>>>>>> (no objectness) >>>>>>>>>>>>> has_objectness = False >>>>>>>>>>>>> num_classes = C - 5 if has_objectness else C - 4 >>>>>>>>>>>>> >>>>>>>>>>>>> # Extract data >>>>>>>>>>>>> xywh = data[:, 0:4, :] >>>>>>>>>>>>> if has_objectness: >>>>>>>>>>>>> obj = data[:, 4:5, :] >>>>>>>>>>>>> cls = data[:, 5:5 + num_classes, :] >>>>>>>>>>>>> else: >>>>>>>>>>>>> obj = None >>>>>>>>>>>>> cls = data[:, 4:4 + num_classes, :] >>>>>>>>>>>>> >>>>>>>>>>>>> # Process batch item (we only have one) >>>>>>>>>>>>> b = 0 >>>>>>>>>>>>> h, w = original_size >>>>>>>>>>>>> >>>>>>>>>>>>> xywh_b = xywh[b].T # (P,4) >>>>>>>>>>>>> if obj is not None: >>>>>>>>>>>>> obj_b = obj[b].T.squeeze(1) # (P,) >>>>>>>>>>>>> else: >>>>>>>>>>>>> obj_b = np.ones((P,), dtype=np.float32) >>>>>>>>>>>>> cls_b = cls[b].T # (P,nc) >>>>>>>>>>>>> >>>>>>>>>>>>> # Get scores and labels >>>>>>>>>>>>> scores_all = (obj_b[:, None] * cls_b) if obj is >>>>>>>>>>>>> not None else cls_b >>>>>>>>>>>>> labels = scores_all.argmax(axis=1) >>>>>>>>>>>>> scores = scores_all.max(axis=1) >>>>>>>>>>>>> >>>>>>>>>>>>> # Filter by confidence threshold >>>>>>>>>>>>> keep = scores >= self.conf_th >>>>>>>>>>>>> if not np.any(keep): >>>>>>>>>>>>> logger.info(f"No detections above threshold >>>>>>>>>>>>> {self.conf_th}") >>>>>>>>>>>>> return [] >>>>>>>>>>>>> >>>>>>>>>>>>> xywh_k = xywh_b[keep] >>>>>>>>>>>>> scores_k = scores[keep] >>>>>>>>>>>>> labels_k = labels[keep] >>>>>>>>>>>>> >>>>>>>>>>>>> # xywh -> xyxy in model space >>>>>>>>>>>>> cx, cy, ww, hh = xywh_k.T >>>>>>>>>>>>> xyxy_model = np.stack([cx - ww / 2, cy - hh / 2, >>>>>>>>>>>>> cx + ww / 2, cy + hh / 2], axis=1) >>>>>>>>>>>>> >>>>>>>>>>>>> # Apply NMS per class >>>>>>>>>>>>> final_boxes = [] >>>>>>>>>>>>> final_scores = [] >>>>>>>>>>>>> final_labels = [] >>>>>>>>>>>>> >>>>>>>>>>>>> for c in np.unique(labels_k): >>>>>>>>>>>>> idxs = np.where(labels_k == c)[0] >>>>>>>>>>>>> if idxs.size == 0: >>>>>>>>>>>>> continue >>>>>>>>>>>>> keep_idx = self._nms(xyxy_model[idxs], >>>>>>>>>>>>> scores_k[idxs], iou_th=self.iou_th) >>>>>>>>>>>>> final_boxes.append(xyxy_model[idxs][keep_idx]) >>>>>>>>>>>>> final_scores.append(scores_k[idxs][keep_idx]) >>>>>>>>>>>>> final_labels.append(np.full(len(keep_idx), c, >>>>>>>>>>>>> dtype=int)) >>>>>>>>>>>>> >>>>>>>>>>>>> if not final_boxes: >>>>>>>>>>>>> logger.info("No detections after NMS") >>>>>>>>>>>>> return [] >>>>>>>>>>>>> >>>>>>>>>>>>> xyxy_model = np.vstack(final_boxes) >>>>>>>>>>>>> scores_k = np.concatenate(final_scores) >>>>>>>>>>>>> labels_k = np.concatenate(final_labels) >>>>>>>>>>>>> >>>>>>>>>>>>> # Map boxes from model space to original image >>>>>>>>>>>>> space >>>>>>>>>>>>> xyxy_orig = xyxy_model.copy() >>>>>>>>>>>>> >>>>>>>>>>>>> # Remove padding >>>>>>>>>>>>> xyxy_orig[:, [0, 2]] -= left >>>>>>>>>>>>> xyxy_orig[:, [1, 3]] -= top >>>>>>>>>>>>> >>>>>>>>>>>>> # Scale back to original size >>>>>>>>>>>>> xyxy_orig /= r >>>>>>>>>>>>> >>>>>>>>>>>>> # Clip to image boundaries >>>>>>>>>>>>> xyxy_orig[:, 0::2] = np.clip(xyxy_orig[:, 0::2], >>>>>>>>>>>>> 0, w - 1) >>>>>>>>>>>>> xyxy_orig[:, 1::2] = np.clip(xyxy_orig[:, 1::2], >>>>>>>>>>>>> 0, h - 1) >>>>>>>>>>>>> >>>>>>>>>>>>> # Format as requested: x_min, y_min, x_max, y_max, >>>>>>>>>>>>> class, probability >>>>>>>>>>>>> boxes = [] >>>>>>>>>>>>> for (x1, y1, x2, y2), label, score in >>>>>>>>>>>>> zip(xyxy_orig, labels_k, scores_k): >>>>>>>>>>>>> class_name = CLASS_ID_TO_NAME.get(int(label)) >>>>>>>>>>>>> box_info = { >>>>>>>>>>>>> "page": page_info['page_num'], >>>>>>>>>>>>> "x_min": float(x1), >>>>>>>>>>>>> "y_min": float(y1), >>>>>>>>>>>>> "x_max": float(x2), >>>>>>>>>>>>> "y_max": float(y2), >>>>>>>>>>>>> "class": int(label), >>>>>>>>>>>>> "class_name": class_name, >>>>>>>>>>>>> "probability": float(score), >>>>>>>>>>>>> "filename": page_info['filename'], >>>>>>>>>>>>> "local_path": page_info['local_path'], >>>>>>>>>>>>> "gcs_uri": page_info['gcs_uri'] >>>>>>>>>>>>> } >>>>>>>>>>>>> boxes.append(box_info) >>>>>>>>>>>>> >>>>>>>>>>>>> logger.info(f"Extracted {len(boxes)} boxes from >>>>>>>>>>>>> page {page_info['page_num']}") >>>>>>>>>>>>> >>>>>>>>>>>>> return boxes >>>>>>>>>>>>> >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> logger.error(f"Error extracting boxes: {str(e)}") >>>>>>>>>>>>> return [] >>>>>>>>>>>>> >>>>>>>>>>>>> class PrepareForBigQuery(beam.DoFn): >>>>>>>>>>>>> """Prepare data for BigQuery insertion.""" >>>>>>>>>>>>> >>>>>>>>>>>>> def process(self, box_info): >>>>>>>>>>>>> try: >>>>>>>>>>>>> # Generate UUIDs for primary keys >>>>>>>>>>>>> v_note_id = str(uuid.uuid4()) >>>>>>>>>>>>> page_ocr_id = str(uuid.uuid4()) >>>>>>>>>>>>> class_prediction_id = str(uuid.uuid4()) >>>>>>>>>>>>> >>>>>>>>>>>>> # Create timestamp >>>>>>>>>>>>> processing_time = >>>>>>>>>>>>> datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") >>>>>>>>>>>>> >>>>>>>>>>>>> # Create ocr_results row >>>>>>>>>>>>> ocr_results_row = { >>>>>>>>>>>>> "v_note_id": v_note_id, >>>>>>>>>>>>> "filename": box_info['filename'], >>>>>>>>>>>>> "file_path": box_info['gcs_uri'], >>>>>>>>>>>>> "processing_time": processing_time, >>>>>>>>>>>>> "file_type": "pdf" >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> # Create page_ocr row >>>>>>>>>>>>> page_ocr_row = { >>>>>>>>>>>>> "page_ocr_id": page_ocr_id, >>>>>>>>>>>>> "v_note_id": v_note_id, >>>>>>>>>>>>> "page_number": box_info['page'] >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> # Create class_prediction row >>>>>>>>>>>>> class_prediction_row = { >>>>>>>>>>>>> "class_prediction_id": class_prediction_id, >>>>>>>>>>>>> "page_ocr_id": page_ocr_id, >>>>>>>>>>>>> "xmin": box_info['x_min'], >>>>>>>>>>>>> "ymin": box_info['y_min'], >>>>>>>>>>>>> "xmax": box_info['x_max'], >>>>>>>>>>>>> "ymax": box_info['y_max'], >>>>>>>>>>>>> "class": box_info['class_name'] if >>>>>>>>>>>>> box_info['class_name'] else str(box_info['class']), >>>>>>>>>>>>> "confidence": box_info['probability'] >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> # Return all three rows with table names >>>>>>>>>>>>> return [ >>>>>>>>>>>>> ('ocr_results', ocr_results_row), >>>>>>>>>>>>> ('page_ocr', page_ocr_row), >>>>>>>>>>>>> ('class_prediction', class_prediction_row) >>>>>>>>>>>>> ] >>>>>>>>>>>>> >>>>>>>>>>>>> except Exception as e: >>>>>>>>>>>>> logger.error(f"Error preparing for BigQuery: >>>>>>>>>>>>> {str(e)}") >>>>>>>>>>>>> return [] >>>>>>>>>>>>> >>>>>>>>>>>>> model_handler = TensorRTEngineHandlerNumPy( >>>>>>>>>>>>> min_batch_size=1, >>>>>>>>>>>>> max_batch_size=1, >>>>>>>>>>>>> engine_path="gs://temp/yolov11l-doclaynet.engine", >>>>>>>>>>>>> ) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> with beam.Pipeline(options=options) as pipeline: >>>>>>>>>>>>> >>>>>>>>>>>>> # Create PCollection from input URIs >>>>>>>>>>>>> pdf_uris = ( >>>>>>>>>>>>> pipeline >>>>>>>>>>>>> | "Create URIs" >> beam.Create(["tmp.pdf"]) >>>>>>>>>>>>> ) >>>>>>>>>>>>> >>>>>>>>>>>>> # Download PDFs >>>>>>>>>>>>> local_pdfs = ( >>>>>>>>>>>>> pdf_uris >>>>>>>>>>>>> | "Download PDFs" >> >>>>>>>>>>>>> beam.ParDo(DownloadPDFFromGCS()) >>>>>>>>>>>>> ) >>>>>>>>>>>>> >>>>>>>>>>>>> # Load PDF pages >>>>>>>>>>>>> pdf_pages = ( >>>>>>>>>>>>> local_pdfs >>>>>>>>>>>>> | "Load PDF Pages" >> beam.ParDo(LoadPDFPages()) >>>>>>>>>>>>> #| "Flatten Pages" >> beam.FlatMap(lambda x: x) >>>>>>>>>>>>> ) >>>>>>>>>>>>> >>>>>>>>>>>>> # Preprocess images >>>>>>>>>>>>> preprocessed_pages = ( >>>>>>>>>>>>> pdf_pages >>>>>>>>>>>>> | "Preprocess Images" >> >>>>>>>>>>>>> beam.ParDo(PreprocessImage()) >>>>>>>>>>>>> ) >>>>>>>>>>>>> inference_results = ( >>>>>>>>>>>>> preprocessed_pages >>>>>>>>>>>>> | "Run Inference" >> >>>>>>>>>>>>> RunInference(model_handler=model_handler) >>>>>>>>>>>>> ) >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, 16 Sept 2025 at 21:23, XQ Hu <[email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Can you share your commands and outputs? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Sep 16, 2025 at 9:02 PM Sai Shashank < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Okay I have changed the docker image but to now to RUN the >>>>>>>>>>>>>>> python command but it is still halting without are error or >>>>>>>>>>>>>>> warnings or >>>>>>>>>>>>>>> errors >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, 16 Sept 2025 at 17:38, XQ Hu via dev < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The CMD is not necessary as it will be overridden by the >>>>>>>>>>>>>>>> ENTRYPOINT just like your comment. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If you ssh to your Docker container like `docker run --rm >>>>>>>>>>>>>>>> -it --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE`, can you >>>>>>>>>>>>>>>> run python and >>>>>>>>>>>>>>>> some Beam pipelines with a direct runner in the container? >>>>>>>>>>>>>>>> This can help >>>>>>>>>>>>>>>> test the environment works fine. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I have one old Dockerfile that used to work with the old >>>>>>>>>>>>>>>> Beam: >>>>>>>>>>>>>>>> https://github.com/google/dataflow-ml-starter/blob/main/tensor_rt.Dockerfile >>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Sep 16, 2025 at 4:56 PM Sai Shashank < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ---------- Forwarded message --------- >>>>>>>>>>>>>>>>> From: Sai Shashank <[email protected]> >>>>>>>>>>>>>>>>> Date: Tue, Sep 16, 2025 at 4:27 PM >>>>>>>>>>>>>>>>> Subject: TensorRT inference not starting >>>>>>>>>>>>>>>>> To: <[email protected]> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hey Everyone, >>>>>>>>>>>>>>>>> I was trying to use tensorRT >>>>>>>>>>>>>>>>> within the apache beam on dataflow but somehow , dataflow >>>>>>>>>>>>>>>>> didn't start like >>>>>>>>>>>>>>>>> it did not even give me Worker logs. Below is the docker file >>>>>>>>>>>>>>>>> that , use to >>>>>>>>>>>>>>>>> create a custom image, at first I thought it is the version >>>>>>>>>>>>>>>>> mismatched but >>>>>>>>>>>>>>>>> usually it gives me a harness error . >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ARG BUILD_IMAGE=nvcr.io/nvidia/tensorrt:25.08-py3 >>>>>>>>>>>>>>>>> FROM ${BUILD_IMAGE} >>>>>>>>>>>>>>>>> ENV PATH="/usr/src/tensorrt/bin:${PATH}" >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> WORKDIR /workspace >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> RUN apt-get update -y && apt-get install -y python3-venv >>>>>>>>>>>>>>>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.67.0 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> COPY --from=apache/beam_python3.10_sdk:2.67.0 >>>>>>>>>>>>>>>>> /opt/apache/beam /opt/apache/beam >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> # Install additional dependencies >>>>>>>>>>>>>>>>> RUN pip install --upgrade pip \ >>>>>>>>>>>>>>>>> && pip install torch \ >>>>>>>>>>>>>>>>> && pip install torchvision \ >>>>>>>>>>>>>>>>> && pip install pillow>=8.0.0 \ >>>>>>>>>>>>>>>>> && pip install transformers>=4.18.0 \ >>>>>>>>>>>>>>>>> && pip install cuda-python \ >>>>>>>>>>>>>>>>> && pip install opencv-python==4.7.0.72 \ >>>>>>>>>>>>>>>>> && pip install PyMuPDF==1.22.5 \ >>>>>>>>>>>>>>>>> && pip install requests==2.31.0 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> # Set the default command to run the inference script >>>>>>>>>>>>>>>>> # This will be overridden by the Apache Beam boot script >>>>>>>>>>>>>>>>> CMD ["python", "/workspace/inference.py"] >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> # Use the Apache Beam boot script as the entrypoint >>>>>>>>>>>>>>>>> ENTRYPOINT ["/opt/apache/beam/boot"] >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>
