So, can I make an issue and raise PR for the updated TensorRT handler?

On Sat, 20 Sept 2025 at 08:46, XQ Hu <[email protected]> wrote:

> This is what Beam tests use
> https://github.com/apache/beam/blob/master/sdks/python/test-suites/containers/tensorrt_runinference/tensor_rt.dockerfile#L17
>
> nvcr.io/nvidia/tensorrt:23.05-py3
>
> From the latest doc:
> https://docs.nvidia.com/deeplearning/tensorrt/latest/_static/python-api/infer/Core/Engine.html#icudaengine
> and https://github.com/NVIDIA/TensorRT/issues/4216, num_io_tensors should
> be used now.
>
> You can either use the tensorRT version Beam supports now or you can
> define your own TensorRTEngine model handler with the new tensorRT.
>
>
> On Sat, Sep 20, 2025 at 1:10 AM Sai Shashank <[email protected]>
> wrote:
>
>> So finally, I was able to resolve the issue  of docker image but now, I
>> saw this error           ^^^^^^^^^^^^^^^^^^^^^^
>>   File
>> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/tensorrt_inference.py",
>> line 132, in __init__
>>     for i in range(self.engine.num_bindings):
>>                    ^^^^^^^^^^^^^^^^^^^^^^^^
>> AttributeError: 'tensorrt.tensorrt.ICudaEngine' object has no attribute
>> 'num_bindings'
>> Traceback (most recent call last):
>>   File "apache_beam/runners/common.py", line 1562, in
>> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
>>   File "apache_beam/runners/common.py", line 602, in
>> apache_beam.runners.common.DoFnInvoker.invoke_setup
>>   File
>> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py",
>> line 1882, in setup
>>     self._model = self._load_model()
>>                   ^^^^^^^^^^^^^^^^^^
>>   File
>> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py",
>> line 1848, in _load_model
>>     model = self._shared_model_handle.acquire(load, tag=self._cur_tag)
>>             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>>
>> I have seen this error where the tensorrt  version is older than 10.x .
>> Is there any undated tensorRT handler , or am I doing something wrong ?
>>
>> On Fri, 19 Sept 2025 at 09:15, XQ Hu <[email protected]> wrote:
>>
>>> GPU driver: DRIVER_VERSION=535.261.03
>>> From the log, the driver was installed correctly (make sure this can be
>>> used for your tensor RT.)
>>>
>>> "Error syncing pod, skipping" err="failed to \"StartContainer\" for
>>> \"sdk-0-0\" with ErrImagePull: \"failed to pull and unpack image \\\"
>>> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\\\
>>> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C%5C%5C>":
>>> failed to extract layer
>>> sha256:a848022a4558c435b349317630b139960b44ae09f218ab7f93f764ba4661607d:
>>> write
>>> /var/lib/containerd/io.containerd.snapshotter.v1.gcfs/snapshotter/snapshots/55/fs/usr/local/cuda-13.0/targets/x86_64-linux/lib/libcusparseLt.so.0.8.0.4:
>>> no space left on device: unknown\""
>>> pod="default/df-inference-pipeline-4-09182012-1a3n-harness-rvxc"
>>> podUID="3a9b74645db23e77932d981439f1d3cc"
>>>
>>> The Dataflow worker cannot unpack the image:  no space left on device
>>>
>>> Try
>>> https://cloud.google.com/dataflow/docs/guides/configure-worker-vm#disk-size
>>>
>>>
>>>
>>>
>>> On Thu, Sep 18, 2025 at 11:27 PM Sai Shashank <[email protected]>
>>> wrote:
>>>
>>>> So, I was able the start, dataflow :  
>>>> 2025-09-18_20_12_41-10973298801093076892
>>>> , but not it having this error: 2025-09-18 23:21:25.401 EDT
>>>> SDK harnesses are not healthy after 5 minutes, status: Waiting for 4 of
>>>> 4 SDK Harnesses to register. I have noticed this error when there is a
>>>> mismatch of environments. As advice by you I try running Direct Runner in
>>>> the docker image and it was running perfectly. is there any tips which you
>>>> would give me to correct this error ?
>>>>
>>>>
>>>> On Wed, 17 Sept 2025 at 09:42, XQ Hu <[email protected]> wrote:
>>>>
>>>>> From the worker log,
>>>>>
>>>>> "Failed to read pods from URL" err="invalid pod:
>>>>> [spec.containers[3].image: Invalid value: \"
>>>>> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\
>>>>> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C>":
>>>>> must not have leading or trailing whitespace]"
>>>>>
>>>>> 2025-09-16_17_52_06-10817935125972705087
>>>>>
>>>>> Looks like you specify the image URL with the leading whitespace.
>>>>> Remove it and give it a try.
>>>>>
>>>>> And if you have any further questions about GPUs, I highly recommend
>>>>> you start the VM with L4 GPU and pull your image and ssh into it and run
>>>>> your pipeline locally with DirectRunner. That can make sure all your code
>>>>> works.
>>>>>
>>>>>
>>>>> On Wed, Sep 17, 2025 at 9:25 AM XQ Hu <[email protected]> wrote:
>>>>>
>>>>>> I saw it. Let me follow up internally.
>>>>>>
>>>>>> On Tue, Sep 16, 2025 at 10:10 PM Sai Shashank <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> I already I have open one but opening one more : case number is
>>>>>>> 63121285
>>>>>>>
>>>>>>> On Tue, Sep 16, 2025 at 10:05 PM XQ Hu <[email protected]> wrote:
>>>>>>>
>>>>>>>> Can you open a cloud support ticket? That can give us the
>>>>>>>> permission to access your job.
>>>>>>>>
>>>>>>>> On Tue, Sep 16, 2025, 9:57 PM Sai Shashank <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hey , can we connect on my office mail? since I could share more
>>>>>>>>> stuff like pipeline options and other stuff there better and I work 
>>>>>>>>> at CVS
>>>>>>>>> so that way it would be under compliance too
>>>>>>>>>
>>>>>>>>> On Tue, 16 Sept 2025 at 21:54, Sai Shashank <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> The code works without the custom image of TensorRT, I will only
>>>>>>>>>> get this:
>>>>>>>>>>
>>>>>>>>>> ject to benefit from faster worker startup and autoscaling. If you 
>>>>>>>>>> experience container-startup related issues, pass the 
>>>>>>>>>> "disable_image_streaming" experiment to disable image streaming for 
>>>>>>>>>> the job.
>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:10.327Z:
>>>>>>>>>>  JOB_MESSAGE_BASIC: Worker configuration: g2-standard-4 in 
>>>>>>>>>> us-east4-c.
>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.480Z:
>>>>>>>>>>  JOB_MESSAGE_BASIC: Executing operation [10]: Create 
>>>>>>>>>> URIs/Impulse+[10]: Create URIs/FlatMap(<lambda at 
>>>>>>>>>> core.py:3994>)+[10]: Create URIs/Map(decode)+[10]: Download 
>>>>>>>>>> PDFs+[10]: Load PDF Pages+[10]: Preprocess Images+[10]: Run 
>>>>>>>>>> Inference/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)+[10]: Run 
>>>>>>>>>> Inference/BeamML_RunInference
>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.543Z:
>>>>>>>>>>  JOB_MESSAGE_BASIC: Starting 1 workers in us-east4...
>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:Job 
>>>>>>>>>> 2025-09-16_17_52_06-10817935125972705087 is in state 
>>>>>>>>>> JOB_STATE_RUNNING
>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support 
>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>> httplib2.Http instance.
>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support 
>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>> httplib2.Http instance.
>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support 
>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>> httplib2.Http instance.
>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support 
>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>> httplib2.Http instance.
>>>>>>>>>>
>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support
>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>> httplib2.Http
>>>>>>>>>> instance.
>>>>>>>>>>
>>>>>>>>>> just recurring and after an hour this message pops up  Workflow
>>>>>>>>>> failed. Causes: The Dataflow job appears to be stuck because no 
>>>>>>>>>> worker
>>>>>>>>>> activity has been seen in the last 1h. For more information, see
>>>>>>>>>> https://cloud.google.com/dataflow/docs/guides/common-errors#error-syncing-pod.
>>>>>>>>>> You can also get help with Cloud Dataflow at
>>>>>>>>>> https://cloud.google.com/dataflow/support.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, 16 Sept 2025 at 21:35, XQ Hu <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Does the code work without using  TensorRT? Any logs?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Sep 16, 2025 at 9:28 PM Sai Shashank <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> import apache_beam as beam
>>>>>>>>>>>> from apache_beam.ml.inference.tensorrt_inference import
>>>>>>>>>>>> TensorRTEngineHandlerNumPy
>>>>>>>>>>>> from apache_beam.ml.inference.base import RunInference
>>>>>>>>>>>>
>>>>>>>>>>>> #!/usr/bin/env python3
>>>>>>>>>>>> """
>>>>>>>>>>>> Apache Beam pipeline for processing PDFs with Triton server and
>>>>>>>>>>>> saving results to BigQuery.
>>>>>>>>>>>> This pipeline combines functionality from
>>>>>>>>>>>> test_triton_document.py, create_bigquery_tables.py,
>>>>>>>>>>>> and save_to_bigquery.py into a single workflow.
>>>>>>>>>>>> """
>>>>>>>>>>>>
>>>>>>>>>>>> import os
>>>>>>>>>>>> import sys
>>>>>>>>>>>> import json
>>>>>>>>>>>> import uuid
>>>>>>>>>>>> import argparse
>>>>>>>>>>>> import logging
>>>>>>>>>>>> import tempfile
>>>>>>>>>>>> import datetime
>>>>>>>>>>>> import requests
>>>>>>>>>>>> import numpy as np
>>>>>>>>>>>> import cv2
>>>>>>>>>>>> from PIL import Image
>>>>>>>>>>>> import fitz  # PyMuPDF
>>>>>>>>>>>> from pathlib import Path
>>>>>>>>>>>> from typing import Dict, List, Tuple, Any, Optional, Iterator
>>>>>>>>>>>>
>>>>>>>>>>>> # Apache Beam imports
>>>>>>>>>>>> import apache_beam as beam
>>>>>>>>>>>> from apache_beam.options.pipeline_options import
>>>>>>>>>>>> PipelineOptions, SetupOptions
>>>>>>>>>>>> from apache_beam.ml.inference.base import RemoteModelHandler,
>>>>>>>>>>>> PredictionResult
>>>>>>>>>>>> from apache_beam.ml.inference.utils import _convert_to_result
>>>>>>>>>>>> from apache_beam.ml.inference.base import RunInference
>>>>>>>>>>>> from apache_beam.io.gcp.bigquery import WriteToBigQuery
>>>>>>>>>>>> from apache_beam.io.filesystems import FileSystems
>>>>>>>>>>>> from apache_beam.io.gcp.gcsio import GcsIO
>>>>>>>>>>>>
>>>>>>>>>>>> # Google Cloud imports
>>>>>>>>>>>> from google.cloud import storage
>>>>>>>>>>>> from google.cloud import bigquery
>>>>>>>>>>>>
>>>>>>>>>>>> # Set up logging
>>>>>>>>>>>> logging.basicConfig(level=logging.INFO)
>>>>>>>>>>>> logger = logging.getLogger(__name__)
>>>>>>>>>>>>
>>>>>>>>>>>> # DocLayNet classes
>>>>>>>>>>>> CLASS_ID_TO_NAME = {
>>>>>>>>>>>>     0: 'Caption',
>>>>>>>>>>>>     1: 'Footnote',
>>>>>>>>>>>>     2: 'Formula',
>>>>>>>>>>>>     3: 'List-item',
>>>>>>>>>>>>     4: 'Page-footer',
>>>>>>>>>>>>     5: 'Page-header',
>>>>>>>>>>>>     6: 'Picture',
>>>>>>>>>>>>     7: 'Section-header',
>>>>>>>>>>>>     8: 'Table',
>>>>>>>>>>>>     9: 'Text',
>>>>>>>>>>>>     10: 'Title'
>>>>>>>>>>>> }
>>>>>>>>>>>> class DownloadPDFFromGCS(beam.DoFn):
>>>>>>>>>>>>     """Download a PDF from Google Cloud Storage."""
>>>>>>>>>>>>
>>>>>>>>>>>>     def __init__(self, temp_dir=None):
>>>>>>>>>>>>         self.temp_dir = temp_dir or tempfile.gettempdir()
>>>>>>>>>>>>
>>>>>>>>>>>>     def process(self, gcs_uri):
>>>>>>>>>>>>         try:
>>>>>>>>>>>>             # Parse GCS URI
>>>>>>>>>>>>             if not gcs_uri.startswith("gs://"):
>>>>>>>>>>>>                 raise ValueError(f"Invalid GCS URI: {gcs_uri}")
>>>>>>>>>>>>
>>>>>>>>>>>>             # Remove gs:// prefix and split into bucket and
>>>>>>>>>>>> blob path
>>>>>>>>>>>>             path_parts = gcs_uri[5:].split("/", 1)
>>>>>>>>>>>>             bucket_name = path_parts[0]
>>>>>>>>>>>>             blob_path = path_parts[1]
>>>>>>>>>>>>
>>>>>>>>>>>>             # Get filename from blob path
>>>>>>>>>>>>             filename = os.path.basename(blob_path)
>>>>>>>>>>>>             local_path = os.path.join(self.temp_dir, filename)
>>>>>>>>>>>>
>>>>>>>>>>>>             # Create temp directory if it doesn't exist
>>>>>>>>>>>>             os.makedirs(self.temp_dir, exist_ok=True)
>>>>>>>>>>>>
>>>>>>>>>>>>             try:
>>>>>>>>>>>>                 # Download using Beam's GcsIO
>>>>>>>>>>>>                 with FileSystems.open(gcs_uri, 'rb') as
>>>>>>>>>>>> gcs_file:
>>>>>>>>>>>>                     with open(local_path, 'wb') as local_file:
>>>>>>>>>>>>                         local_file.write(gcs_file.read())
>>>>>>>>>>>>
>>>>>>>>>>>>                 logger.info(f"Downloaded {gcs_uri} to
>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>
>>>>>>>>>>>>                 # Return a dictionary with the local path and
>>>>>>>>>>>> original URI
>>>>>>>>>>>>                 yield {
>>>>>>>>>>>>                     'local_path': local_path,
>>>>>>>>>>>>                     'gcs_uri': gcs_uri,
>>>>>>>>>>>>                     'filename': filename
>>>>>>>>>>>>                 }
>>>>>>>>>>>>             except Exception as e:
>>>>>>>>>>>>                 logger.error(f"Error reading from GCS:
>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>                 # Try alternative download method
>>>>>>>>>>>>                 logger.info(f"Trying alternative download
>>>>>>>>>>>> method for {gcs_uri}")
>>>>>>>>>>>>
>>>>>>>>>>>>                 # For testing with local files
>>>>>>>>>>>>                 if os.path.exists(gcs_uri.replace("gs://", "")):
>>>>>>>>>>>>                     local_path = gcs_uri.replace("gs://", "")
>>>>>>>>>>>>                     logger.info(f"Using local file:
>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>                     yield {
>>>>>>>>>>>>                         'local_path': local_path,
>>>>>>>>>>>>                         'gcs_uri': gcs_uri,
>>>>>>>>>>>>                         'filename': os.path.basename(local_path)
>>>>>>>>>>>>                     }
>>>>>>>>>>>>                 else:
>>>>>>>>>>>>                     # Try using gsutil command
>>>>>>>>>>>>                     import subprocess
>>>>>>>>>>>>                     try:
>>>>>>>>>>>>                         subprocess.run(["gsutil", "cp",
>>>>>>>>>>>> gcs_uri, local_path], check=True)
>>>>>>>>>>>>                         logger.info(f"Downloaded {gcs_uri} to
>>>>>>>>>>>> {local_path} using gsutil")
>>>>>>>>>>>>                         yield {
>>>>>>>>>>>>                             'local_path': local_path,
>>>>>>>>>>>>                             'gcs_uri': gcs_uri,
>>>>>>>>>>>>                             'filename': filename
>>>>>>>>>>>>                         }
>>>>>>>>>>>>                     except Exception as e2:
>>>>>>>>>>>>                         logger.error(f"Failed to download using
>>>>>>>>>>>> gsutil: {str(e2)}")
>>>>>>>>>>>>
>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>             logger.error(f"Error downloading {gcs_uri}:
>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>> class LoadPDFPages(beam.DoFn):
>>>>>>>>>>>>     """Load PDF pages as images."""
>>>>>>>>>>>>
>>>>>>>>>>>>     def __init__(self, dpi=200):
>>>>>>>>>>>>         self.dpi = dpi
>>>>>>>>>>>>
>>>>>>>>>>>>     def process(self, element):
>>>>>>>>>>>>         doc = None
>>>>>>>>>>>>         try:
>>>>>>>>>>>>             # Make sure we have all required fields
>>>>>>>>>>>>             if not isinstance(element, dict):
>>>>>>>>>>>>                 logger.error(f"Expected dictionary, got
>>>>>>>>>>>> {type(element)}")
>>>>>>>>>>>>                 return
>>>>>>>>>>>>
>>>>>>>>>>>>             if 'local_path' not in element:
>>>>>>>>>>>>                 logger.error("Missing 'local_path' in element")
>>>>>>>>>>>>                 return
>>>>>>>>>>>>
>>>>>>>>>>>>             local_path = element['local_path']
>>>>>>>>>>>>             gcs_uri = element.get('gcs_uri', '')
>>>>>>>>>>>>
>>>>>>>>>>>>             # Extract filename from local_path if not provided
>>>>>>>>>>>>             filename = element.get('filename',
>>>>>>>>>>>> os.path.basename(local_path))
>>>>>>>>>>>>
>>>>>>>>>>>>             logger.info(f"Loading PDF: {local_path}, filename:
>>>>>>>>>>>> {filename}")
>>>>>>>>>>>>
>>>>>>>>>>>>             # Check if file exists and is accessible
>>>>>>>>>>>>             if not os.path.exists(local_path):
>>>>>>>>>>>>                 logger.error(f"File not found: {local_path}")
>>>>>>>>>>>>                 return
>>>>>>>>>>>>
>>>>>>>>>>>>             if not os.access(local_path, os.R_OK):
>>>>>>>>>>>>                 logger.error(f"File not readable: {local_path}")
>>>>>>>>>>>>                 return
>>>>>>>>>>>>
>>>>>>>>>>>>             # Open the PDF
>>>>>>>>>>>>             try:
>>>>>>>>>>>>                 doc = fitz.open(local_path)
>>>>>>>>>>>>                 if doc.is_closed:
>>>>>>>>>>>>                     logger.error(f"Failed to open PDF:
>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>                     return
>>>>>>>>>>>>             except Exception as e:
>>>>>>>>>>>>                 logger.error(f"Error opening PDF {local_path}:
>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>                 return
>>>>>>>>>>>>
>>>>>>>>>>>>             # Process each page
>>>>>>>>>>>>             page_count = len(doc)
>>>>>>>>>>>>             logger.info(f"Processing {page_count} pages from
>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>
>>>>>>>>>>>>             for i in range(page_count):
>>>>>>>>>>>>                 try:
>>>>>>>>>>>>                     if doc.is_closed:
>>>>>>>>>>>>                         logger.error(f"Document was closed
>>>>>>>>>>>> unexpectedly while processing page {i}")
>>>>>>>>>>>>                         break
>>>>>>>>>>>>
>>>>>>>>>>>>                     page = doc[i]
>>>>>>>>>>>>                     if page is None:
>>>>>>>>>>>>                         logger.error(f"Failed to get page {i}
>>>>>>>>>>>> from document")
>>>>>>>>>>>>                         continue
>>>>>>>>>>>>
>>>>>>>>>>>>                     # Use a higher resolution for better quality
>>>>>>>>>>>>                     scale = self.dpi / 72.0
>>>>>>>>>>>>                     mat = fitz.Matrix(scale, scale)
>>>>>>>>>>>>
>>>>>>>>>>>>                     try:
>>>>>>>>>>>>                         pix = page.get_pixmap(matrix=mat,
>>>>>>>>>>>> alpha=False)
>>>>>>>>>>>>                     except Exception as e:
>>>>>>>>>>>>                         logger.error(f"Error getting pixmap for
>>>>>>>>>>>> page {i}: {str(e)}")
>>>>>>>>>>>>                         continue
>>>>>>>>>>>>
>>>>>>>>>>>>                     # Check pixmap dimensions
>>>>>>>>>>>>                     if pix.height <= 0 or pix.width <= 0 or
>>>>>>>>>>>> pix.n <= 0:
>>>>>>>>>>>>                         logger.error(f"Invalid pixmap
>>>>>>>>>>>> dimensions: {pix.width}x{pix.height}x{pix.n}")
>>>>>>>>>>>>                         continue
>>>>>>>>>>>>
>>>>>>>>>>>>                     # Convert to numpy array
>>>>>>>>>>>>                     try:
>>>>>>>>>>>>                         arr = np.frombuffer(pix.samples,
>>>>>>>>>>>> dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
>>>>>>>>>>>>                     except Exception as e:
>>>>>>>>>>>>                         logger.error(f"Error converting pixmap
>>>>>>>>>>>> to numpy array: {str(e)}")
>>>>>>>>>>>>                         continue
>>>>>>>>>>>>
>>>>>>>>>>>>                     # Convert BGR to RGB if needed
>>>>>>>>>>>>                     if pix.n == 3:  # RGB
>>>>>>>>>>>>                         try:
>>>>>>>>>>>>                             arr = cv2.cvtColor(arr,
>>>>>>>>>>>> cv2.COLOR_BGR2RGB)
>>>>>>>>>>>>                         except Exception as e:
>>>>>>>>>>>>                             logger.error(f"Error converting BGR
>>>>>>>>>>>> to RGB: {str(e)}")
>>>>>>>>>>>>                             continue
>>>>>>>>>>>>
>>>>>>>>>>>>                     # Store original size for later use
>>>>>>>>>>>>                     original_size = (arr.shape[0], arr.shape[1])
>>>>>>>>>>>>
>>>>>>>>>>>>                     # Create page info
>>>>>>>>>>>>                     page_info = {
>>>>>>>>>>>>                         'page_num': i,
>>>>>>>>>>>>                         'image': arr,
>>>>>>>>>>>>                         'original_size': original_size,
>>>>>>>>>>>>                         'local_path': local_path,
>>>>>>>>>>>>                         'gcs_uri': gcs_uri,
>>>>>>>>>>>>                         'filename': filename
>>>>>>>>>>>>                     }
>>>>>>>>>>>>
>>>>>>>>>>>>                     # Use document ID and page number as key
>>>>>>>>>>>>                     doc_id = os.path.splitext(filename)[0]
>>>>>>>>>>>>                     key = f"{doc_id}_{i}"
>>>>>>>>>>>>
>>>>>>>>>>>>                     yield (key, page_info)
>>>>>>>>>>>>                 except Exception as e:
>>>>>>>>>>>>                     import traceback
>>>>>>>>>>>>                     logger.error(f"Error processing page {i}:
>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>                     logger.error(traceback.format_exc())
>>>>>>>>>>>>
>>>>>>>>>>>>             logger.info(f"Loaded {len(doc)} pages from
>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>
>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>             import traceback
>>>>>>>>>>>>             logger.error(f"Error loading PDF: {str(e)}")
>>>>>>>>>>>>             logger.error(traceback.format_exc())
>>>>>>>>>>>>         finally:
>>>>>>>>>>>>             # Make sure to close the document only if it was
>>>>>>>>>>>> successfully opened
>>>>>>>>>>>>             if doc is not None:
>>>>>>>>>>>>                 try:
>>>>>>>>>>>>                     if not doc.is_closed:
>>>>>>>>>>>>                         doc.close()
>>>>>>>>>>>>                 except Exception as e:
>>>>>>>>>>>>                     logger.debug(f"Error closing document:
>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>
>>>>>>>>>>>> class PreprocessImage(beam.DoFn):
>>>>>>>>>>>>     """Preprocess image for Triton server."""
>>>>>>>>>>>>
>>>>>>>>>>>>     def __init__(self, size=1024):
>>>>>>>>>>>>         self.size = size
>>>>>>>>>>>>
>>>>>>>>>>>>     def letterbox(self, img, new_shape=1024,
>>>>>>>>>>>> color=(114,114,114)):
>>>>>>>>>>>>         """Resize and pad image to target size."""
>>>>>>>>>>>>         h, w = img.shape[:2]
>>>>>>>>>>>>         r = min(new_shape / h, new_shape / w)
>>>>>>>>>>>>         nh, nw = int(round(h * r)), int(round(w * r))
>>>>>>>>>>>>         pad_h, pad_w = new_shape - nh, new_shape - nw
>>>>>>>>>>>>         top = pad_h // 2
>>>>>>>>>>>>         bottom = pad_h - top
>>>>>>>>>>>>         left = pad_w // 2
>>>>>>>>>>>>         right = pad_w - left
>>>>>>>>>>>>         img = cv2.resize(img, (nw, nh),
>>>>>>>>>>>> interpolation=cv2.INTER_LINEAR)
>>>>>>>>>>>>         img = cv2.copyMakeBorder(img, top, bottom, left, right,
>>>>>>>>>>>> cv2.BORDER_CONSTANT, value=color)
>>>>>>>>>>>>         return img, r, left, top
>>>>>>>>>>>>
>>>>>>>>>>>>     def process(self, element):
>>>>>>>>>>>>         try:
>>>>>>>>>>>>             if not isinstance(element, tuple) or len(element)
>>>>>>>>>>>> != 2:
>>>>>>>>>>>>                 logger.error(f"Expected (key, value) tuple, got
>>>>>>>>>>>> {type(element)}")
>>>>>>>>>>>>                 return
>>>>>>>>>>>>
>>>>>>>>>>>>             key, page_info = element
>>>>>>>>>>>>
>>>>>>>>>>>>             if not isinstance(page_info, dict):
>>>>>>>>>>>>                 logger.error(f"Expected dictionary for
>>>>>>>>>>>> page_info, got {type(page_info)}")
>>>>>>>>>>>>                 return
>>>>>>>>>>>>
>>>>>>>>>>>>             if 'image' not in page_info:
>>>>>>>>>>>>                 logger.error("Missing 'image' in page_info")
>>>>>>>>>>>>                 return
>>>>>>>>>>>>
>>>>>>>>>>>>             # Create a new dictionary to avoid modifying the
>>>>>>>>>>>> input
>>>>>>>>>>>>             new_page_info = dict(page_info)
>>>>>>>>>>>>
>>>>>>>>>>>>             # Apply letterbox resize
>>>>>>>>>>>>             img = new_page_info['image']
>>>>>>>>>>>>             lb, r, left, top = self.letterbox(img,
>>>>>>>>>>>> new_shape=self.size)
>>>>>>>>>>>>
>>>>>>>>>>>>             # Convert to float32 and normalize to [0,1]
>>>>>>>>>>>>             x = lb.astype(np.float32) / 255.0
>>>>>>>>>>>>
>>>>>>>>>>>>             # Convert to CHW format
>>>>>>>>>>>>             x = np.transpose(x, (2, 0, 1))
>>>>>>>>>>>>
>>>>>>>>>>>>             # Add batch dimension
>>>>>>>>>>>>             batched_img = np.expand_dims(x, axis=0)
>>>>>>>>>>>>
>>>>>>>>>>>>             # Update page info
>>>>>>>>>>>>             new_page_info['preprocessed_image'] = batched_img
>>>>>>>>>>>>             new_page_info['letterbox_info'] = (r, left, top)
>>>>>>>>>>>>
>>>>>>>>>>>>             yield (key, new_page_info)
>>>>>>>>>>>>
>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>             import traceback
>>>>>>>>>>>>             logger.error(f"Error preprocessing image: {str(e)}")
>>>>>>>>>>>>             logger.error(traceback.format_exc())
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> class ExtractBoxes(beam.DoFn):
>>>>>>>>>>>>     """Extract bounding boxes from Triton response."""
>>>>>>>>>>>>
>>>>>>>>>>>>     def __init__(self, conf_th=0.25, iou_th=0.7,
>>>>>>>>>>>> model_size=1024):
>>>>>>>>>>>>         self.conf_th = conf_th
>>>>>>>>>>>>         self.iou_th = iou_th
>>>>>>>>>>>>         self.model_size = model_size
>>>>>>>>>>>>
>>>>>>>>>>>>     def _nms(self, boxes, scores, iou_th=0.7):
>>>>>>>>>>>>         """Non-Maximum Suppression"""
>>>>>>>>>>>>         if len(boxes) == 0:
>>>>>>>>>>>>             return []
>>>>>>>>>>>>
>>>>>>>>>>>>         boxes = boxes.astype(np.float32)
>>>>>>>>>>>>         x1, y1, x2, y2 = boxes.T
>>>>>>>>>>>>         areas = (x2 - x1) * (y2 - y1)
>>>>>>>>>>>>         order = scores.argsort()[::-1]
>>>>>>>>>>>>
>>>>>>>>>>>>         keep = []
>>>>>>>>>>>>         while order.size > 0:
>>>>>>>>>>>>             i = order[0]
>>>>>>>>>>>>             keep.append(i)
>>>>>>>>>>>>
>>>>>>>>>>>>             xx1 = np.maximum(x1[i], x1[order[1:]])
>>>>>>>>>>>>             yy1 = np.maximum(y1[i], y1[order[1:]])
>>>>>>>>>>>>             xx2 = np.minimum(x2[i], x2[order[1:]])
>>>>>>>>>>>>             yy2 = np.minimum(y2[i], y2[order[1:]])
>>>>>>>>>>>>
>>>>>>>>>>>>             w = np.maximum(0.0, xx2 - xx1)
>>>>>>>>>>>>             h = np.maximum(0.0, yy2 - yy1)
>>>>>>>>>>>>             inter = w * h
>>>>>>>>>>>>
>>>>>>>>>>>>             iou = inter / (areas[i] + areas[order[1:]] - inter
>>>>>>>>>>>> + 1e-9)
>>>>>>>>>>>>             inds = np.where(iou <= iou_th)[0]
>>>>>>>>>>>>             order = order[inds + 1]
>>>>>>>>>>>>
>>>>>>>>>>>>         return keep
>>>>>>>>>>>>
>>>>>>>>>>>>     def process(self, page_info):
>>>>>>>>>>>>         try:
>>>>>>>>>>>>             triton_response = page_info['triton_response']
>>>>>>>>>>>>             original_size = page_info['original_size']
>>>>>>>>>>>>             r, left, top = page_info['letterbox_info']
>>>>>>>>>>>>
>>>>>>>>>>>>             if "outputs" not in triton_response or not
>>>>>>>>>>>> triton_response["outputs"]:
>>>>>>>>>>>>                 logger.error("Invalid response from Triton
>>>>>>>>>>>> server")
>>>>>>>>>>>>                 return []
>>>>>>>>>>>>
>>>>>>>>>>>>             out_meta = triton_response["outputs"][0]
>>>>>>>>>>>>             shape = out_meta["shape"]
>>>>>>>>>>>>             data = np.array(out_meta["data"],
>>>>>>>>>>>> dtype=np.float32).reshape(shape)
>>>>>>>>>>>>
>>>>>>>>>>>>             logger.info(f"Output shape: {shape}")
>>>>>>>>>>>>
>>>>>>>>>>>>             # For YOLO output [B, C, P] where C is channels
>>>>>>>>>>>> (box coords + objectness + classes)
>>>>>>>>>>>>             B, C, P = shape
>>>>>>>>>>>>
>>>>>>>>>>>>             # Assuming 4 box coordinates + class probabilities
>>>>>>>>>>>> (no objectness)
>>>>>>>>>>>>             has_objectness = False
>>>>>>>>>>>>             num_classes = C - 5 if has_objectness else C - 4
>>>>>>>>>>>>
>>>>>>>>>>>>             # Extract data
>>>>>>>>>>>>             xywh = data[:, 0:4, :]
>>>>>>>>>>>>             if has_objectness:
>>>>>>>>>>>>                 obj = data[:, 4:5, :]
>>>>>>>>>>>>                 cls = data[:, 5:5 + num_classes, :]
>>>>>>>>>>>>             else:
>>>>>>>>>>>>                 obj = None
>>>>>>>>>>>>                 cls = data[:, 4:4 + num_classes, :]
>>>>>>>>>>>>
>>>>>>>>>>>>             # Process batch item (we only have one)
>>>>>>>>>>>>             b = 0
>>>>>>>>>>>>             h, w = original_size
>>>>>>>>>>>>
>>>>>>>>>>>>             xywh_b = xywh[b].T  # (P,4)
>>>>>>>>>>>>             if obj is not None:
>>>>>>>>>>>>                 obj_b = obj[b].T.squeeze(1)  # (P,)
>>>>>>>>>>>>             else:
>>>>>>>>>>>>                 obj_b = np.ones((P,), dtype=np.float32)
>>>>>>>>>>>>             cls_b = cls[b].T  # (P,nc)
>>>>>>>>>>>>
>>>>>>>>>>>>             # Get scores and labels
>>>>>>>>>>>>             scores_all = (obj_b[:, None] * cls_b) if obj is not
>>>>>>>>>>>> None else cls_b
>>>>>>>>>>>>             labels = scores_all.argmax(axis=1)
>>>>>>>>>>>>             scores = scores_all.max(axis=1)
>>>>>>>>>>>>
>>>>>>>>>>>>             # Filter by confidence threshold
>>>>>>>>>>>>             keep = scores >= self.conf_th
>>>>>>>>>>>>             if not np.any(keep):
>>>>>>>>>>>>                 logger.info(f"No detections above threshold
>>>>>>>>>>>> {self.conf_th}")
>>>>>>>>>>>>                 return []
>>>>>>>>>>>>
>>>>>>>>>>>>             xywh_k = xywh_b[keep]
>>>>>>>>>>>>             scores_k = scores[keep]
>>>>>>>>>>>>             labels_k = labels[keep]
>>>>>>>>>>>>
>>>>>>>>>>>>             # xywh -> xyxy in model space
>>>>>>>>>>>>             cx, cy, ww, hh = xywh_k.T
>>>>>>>>>>>>             xyxy_model = np.stack([cx - ww / 2, cy - hh / 2, cx
>>>>>>>>>>>> + ww / 2, cy + hh / 2], axis=1)
>>>>>>>>>>>>
>>>>>>>>>>>>             # Apply NMS per class
>>>>>>>>>>>>             final_boxes = []
>>>>>>>>>>>>             final_scores = []
>>>>>>>>>>>>             final_labels = []
>>>>>>>>>>>>
>>>>>>>>>>>>             for c in np.unique(labels_k):
>>>>>>>>>>>>                 idxs = np.where(labels_k == c)[0]
>>>>>>>>>>>>                 if idxs.size == 0:
>>>>>>>>>>>>                     continue
>>>>>>>>>>>>                 keep_idx = self._nms(xyxy_model[idxs],
>>>>>>>>>>>> scores_k[idxs], iou_th=self.iou_th)
>>>>>>>>>>>>                 final_boxes.append(xyxy_model[idxs][keep_idx])
>>>>>>>>>>>>                 final_scores.append(scores_k[idxs][keep_idx])
>>>>>>>>>>>>                 final_labels.append(np.full(len(keep_idx), c,
>>>>>>>>>>>> dtype=int))
>>>>>>>>>>>>
>>>>>>>>>>>>             if not final_boxes:
>>>>>>>>>>>>                 logger.info("No detections after NMS")
>>>>>>>>>>>>                 return []
>>>>>>>>>>>>
>>>>>>>>>>>>             xyxy_model = np.vstack(final_boxes)
>>>>>>>>>>>>             scores_k = np.concatenate(final_scores)
>>>>>>>>>>>>             labels_k = np.concatenate(final_labels)
>>>>>>>>>>>>
>>>>>>>>>>>>             # Map boxes from model space to original image space
>>>>>>>>>>>>             xyxy_orig = xyxy_model.copy()
>>>>>>>>>>>>
>>>>>>>>>>>>             # Remove padding
>>>>>>>>>>>>             xyxy_orig[:, [0, 2]] -= left
>>>>>>>>>>>>             xyxy_orig[:, [1, 3]] -= top
>>>>>>>>>>>>
>>>>>>>>>>>>             # Scale back to original size
>>>>>>>>>>>>             xyxy_orig /= r
>>>>>>>>>>>>
>>>>>>>>>>>>             # Clip to image boundaries
>>>>>>>>>>>>             xyxy_orig[:, 0::2] = np.clip(xyxy_orig[:, 0::2], 0,
>>>>>>>>>>>> w - 1)
>>>>>>>>>>>>             xyxy_orig[:, 1::2] = np.clip(xyxy_orig[:, 1::2], 0,
>>>>>>>>>>>> h - 1)
>>>>>>>>>>>>
>>>>>>>>>>>>             # Format as requested: x_min, y_min, x_max, y_max,
>>>>>>>>>>>> class, probability
>>>>>>>>>>>>             boxes = []
>>>>>>>>>>>>             for (x1, y1, x2, y2), label, score in
>>>>>>>>>>>> zip(xyxy_orig, labels_k, scores_k):
>>>>>>>>>>>>                 class_name = CLASS_ID_TO_NAME.get(int(label))
>>>>>>>>>>>>                 box_info = {
>>>>>>>>>>>>                     "page": page_info['page_num'],
>>>>>>>>>>>>                     "x_min": float(x1),
>>>>>>>>>>>>                     "y_min": float(y1),
>>>>>>>>>>>>                     "x_max": float(x2),
>>>>>>>>>>>>                     "y_max": float(y2),
>>>>>>>>>>>>                     "class": int(label),
>>>>>>>>>>>>                     "class_name": class_name,
>>>>>>>>>>>>                     "probability": float(score),
>>>>>>>>>>>>                     "filename": page_info['filename'],
>>>>>>>>>>>>                     "local_path": page_info['local_path'],
>>>>>>>>>>>>                     "gcs_uri": page_info['gcs_uri']
>>>>>>>>>>>>                 }
>>>>>>>>>>>>                 boxes.append(box_info)
>>>>>>>>>>>>
>>>>>>>>>>>>             logger.info(f"Extracted {len(boxes)} boxes from
>>>>>>>>>>>> page {page_info['page_num']}")
>>>>>>>>>>>>
>>>>>>>>>>>>             return boxes
>>>>>>>>>>>>
>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>             logger.error(f"Error extracting boxes: {str(e)}")
>>>>>>>>>>>>             return []
>>>>>>>>>>>>
>>>>>>>>>>>> class PrepareForBigQuery(beam.DoFn):
>>>>>>>>>>>>     """Prepare data for BigQuery insertion."""
>>>>>>>>>>>>
>>>>>>>>>>>>     def process(self, box_info):
>>>>>>>>>>>>         try:
>>>>>>>>>>>>             # Generate UUIDs for primary keys
>>>>>>>>>>>>             v_note_id = str(uuid.uuid4())
>>>>>>>>>>>>             page_ocr_id = str(uuid.uuid4())
>>>>>>>>>>>>             class_prediction_id = str(uuid.uuid4())
>>>>>>>>>>>>
>>>>>>>>>>>>             # Create timestamp
>>>>>>>>>>>>             processing_time =
>>>>>>>>>>>> datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
>>>>>>>>>>>>
>>>>>>>>>>>>             # Create ocr_results row
>>>>>>>>>>>>             ocr_results_row = {
>>>>>>>>>>>>                 "v_note_id": v_note_id,
>>>>>>>>>>>>                 "filename": box_info['filename'],
>>>>>>>>>>>>                 "file_path": box_info['gcs_uri'],
>>>>>>>>>>>>                 "processing_time": processing_time,
>>>>>>>>>>>>                 "file_type": "pdf"
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>             # Create page_ocr row
>>>>>>>>>>>>             page_ocr_row = {
>>>>>>>>>>>>                 "page_ocr_id": page_ocr_id,
>>>>>>>>>>>>                 "v_note_id": v_note_id,
>>>>>>>>>>>>                 "page_number": box_info['page']
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>             # Create class_prediction row
>>>>>>>>>>>>             class_prediction_row = {
>>>>>>>>>>>>                 "class_prediction_id": class_prediction_id,
>>>>>>>>>>>>                 "page_ocr_id": page_ocr_id,
>>>>>>>>>>>>                 "xmin": box_info['x_min'],
>>>>>>>>>>>>                 "ymin": box_info['y_min'],
>>>>>>>>>>>>                 "xmax": box_info['x_max'],
>>>>>>>>>>>>                 "ymax": box_info['y_max'],
>>>>>>>>>>>>                 "class": box_info['class_name'] if
>>>>>>>>>>>> box_info['class_name'] else str(box_info['class']),
>>>>>>>>>>>>                 "confidence": box_info['probability']
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>             # Return all three rows with table names
>>>>>>>>>>>>             return [
>>>>>>>>>>>>                 ('ocr_results', ocr_results_row),
>>>>>>>>>>>>                 ('page_ocr', page_ocr_row),
>>>>>>>>>>>>                 ('class_prediction', class_prediction_row)
>>>>>>>>>>>>             ]
>>>>>>>>>>>>
>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>             logger.error(f"Error preparing for BigQuery:
>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>             return []
>>>>>>>>>>>>
>>>>>>>>>>>> model_handler = TensorRTEngineHandlerNumPy(
>>>>>>>>>>>>   min_batch_size=1,
>>>>>>>>>>>>   max_batch_size=1,
>>>>>>>>>>>>   engine_path="gs://temp/yolov11l-doclaynet.engine",
>>>>>>>>>>>> )
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> with beam.Pipeline(options=options) as pipeline:
>>>>>>>>>>>>
>>>>>>>>>>>>         # Create PCollection from input URIs
>>>>>>>>>>>>         pdf_uris = (
>>>>>>>>>>>>             pipeline
>>>>>>>>>>>>             | "Create URIs" >> beam.Create(["tmp.pdf"])
>>>>>>>>>>>>         )
>>>>>>>>>>>>
>>>>>>>>>>>>         # Download PDFs
>>>>>>>>>>>>         local_pdfs = (
>>>>>>>>>>>>             pdf_uris
>>>>>>>>>>>>             | "Download PDFs" >>
>>>>>>>>>>>> beam.ParDo(DownloadPDFFromGCS())
>>>>>>>>>>>>         )
>>>>>>>>>>>>
>>>>>>>>>>>>          # Load PDF pages
>>>>>>>>>>>>         pdf_pages = (
>>>>>>>>>>>>             local_pdfs
>>>>>>>>>>>>             | "Load PDF Pages" >> beam.ParDo(LoadPDFPages())
>>>>>>>>>>>>             #| "Flatten Pages" >> beam.FlatMap(lambda x: x)
>>>>>>>>>>>>         )
>>>>>>>>>>>>
>>>>>>>>>>>>         # Preprocess images
>>>>>>>>>>>>         preprocessed_pages = (
>>>>>>>>>>>>             pdf_pages
>>>>>>>>>>>>             | "Preprocess Images" >>
>>>>>>>>>>>> beam.ParDo(PreprocessImage())
>>>>>>>>>>>>         )
>>>>>>>>>>>>         inference_results = (
>>>>>>>>>>>>             preprocessed_pages
>>>>>>>>>>>>             | "Run Inference" >>
>>>>>>>>>>>> RunInference(model_handler=model_handler)
>>>>>>>>>>>>         )
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, 16 Sept 2025 at 21:23, XQ Hu <[email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Can you share your commands and outputs?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Sep 16, 2025 at 9:02 PM Sai Shashank <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Okay I have changed the docker image but  to now to RUN the
>>>>>>>>>>>>>> python command but it is still halting without are error or 
>>>>>>>>>>>>>> warnings or
>>>>>>>>>>>>>> errors
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, 16 Sept 2025 at 17:38, XQ Hu via dev <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The CMD is not necessary as it will be overridden by the
>>>>>>>>>>>>>>> ENTRYPOINT just like your comment.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you ssh to your Docker container like `docker run --rm
>>>>>>>>>>>>>>> -it --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE`, can you 
>>>>>>>>>>>>>>> run python and
>>>>>>>>>>>>>>> some Beam pipelines with a direct runner in the container? This 
>>>>>>>>>>>>>>> can help
>>>>>>>>>>>>>>> test the environment works fine.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have one old Dockerfile that used to work with the old
>>>>>>>>>>>>>>> Beam:
>>>>>>>>>>>>>>> https://github.com/google/dataflow-ml-starter/blob/main/tensor_rt.Dockerfile
>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Sep 16, 2025 at 4:56 PM Sai Shashank <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ---------- Forwarded message ---------
>>>>>>>>>>>>>>>> From: Sai Shashank <[email protected]>
>>>>>>>>>>>>>>>> Date: Tue, Sep 16, 2025 at 4:27 PM
>>>>>>>>>>>>>>>> Subject: TensorRT inference not starting
>>>>>>>>>>>>>>>> To: <[email protected]>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hey Everyone,
>>>>>>>>>>>>>>>>                          I was trying to use tensorRT
>>>>>>>>>>>>>>>> within the apache beam on dataflow but somehow , dataflow 
>>>>>>>>>>>>>>>> didn't start like
>>>>>>>>>>>>>>>> it did not even give me Worker logs. Below is the docker file 
>>>>>>>>>>>>>>>> that , use to
>>>>>>>>>>>>>>>> create a custom  image, at first I thought it is the version 
>>>>>>>>>>>>>>>> mismatched but
>>>>>>>>>>>>>>>> usually it gives me a harness error .
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ARG BUILD_IMAGE=nvcr.io/nvidia/tensorrt:25.08-py3
>>>>>>>>>>>>>>>> FROM ${BUILD_IMAGE}
>>>>>>>>>>>>>>>> ENV PATH="/usr/src/tensorrt/bin:${PATH}"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> WORKDIR /workspace
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> RUN apt-get update -y && apt-get install -y python3-venv
>>>>>>>>>>>>>>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.67.0
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> COPY --from=apache/beam_python3.10_sdk:2.67.0
>>>>>>>>>>>>>>>> /opt/apache/beam /opt/apache/beam
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> # Install additional dependencies
>>>>>>>>>>>>>>>> RUN pip install --upgrade pip \
>>>>>>>>>>>>>>>>     && pip install torch \
>>>>>>>>>>>>>>>>     && pip install torchvision \
>>>>>>>>>>>>>>>>     && pip install pillow>=8.0.0 \
>>>>>>>>>>>>>>>>     && pip install transformers>=4.18.0 \
>>>>>>>>>>>>>>>>     && pip install cuda-python \
>>>>>>>>>>>>>>>>     && pip install opencv-python==4.7.0.72 \
>>>>>>>>>>>>>>>>     && pip install PyMuPDF==1.22.5 \
>>>>>>>>>>>>>>>>     && pip install requests==2.31.0
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> # Set the default command to run the inference script
>>>>>>>>>>>>>>>> # This will be overridden by the Apache Beam boot script
>>>>>>>>>>>>>>>> CMD ["python", "/workspace/inference.py"]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> # Use the Apache Beam boot script as the entrypoint
>>>>>>>>>>>>>>>> ENTRYPOINT ["/opt/apache/beam/boot"]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>

Reply via email to