Sounds good. Thanks!

On Sat, Sep 27, 2025 at 12:18 AM Sai Shashank <[email protected]>
wrote:

> So, can I make an issue and raise PR for the updated TensorRT handler?
>
> On Sat, 20 Sept 2025 at 08:46, XQ Hu <[email protected]> wrote:
>
>> This is what Beam tests use
>> https://github.com/apache/beam/blob/master/sdks/python/test-suites/containers/tensorrt_runinference/tensor_rt.dockerfile#L17
>>
>> nvcr.io/nvidia/tensorrt:23.05-py3
>>
>> From the latest doc:
>> https://docs.nvidia.com/deeplearning/tensorrt/latest/_static/python-api/infer/Core/Engine.html#icudaengine
>> and https://github.com/NVIDIA/TensorRT/issues/4216, num_io_tensors
>> should be used now.
>>
>> You can either use the tensorRT version Beam supports now or you can
>> define your own TensorRTEngine model handler with the new tensorRT.
>>
>>
>> On Sat, Sep 20, 2025 at 1:10 AM Sai Shashank <[email protected]>
>> wrote:
>>
>>> So finally, I was able to resolve the issue  of docker image but now, I
>>> saw this error           ^^^^^^^^^^^^^^^^^^^^^^
>>>   File
>>> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/tensorrt_inference.py",
>>> line 132, in __init__
>>>     for i in range(self.engine.num_bindings):
>>>                    ^^^^^^^^^^^^^^^^^^^^^^^^
>>> AttributeError: 'tensorrt.tensorrt.ICudaEngine' object has no attribute
>>> 'num_bindings'
>>> Traceback (most recent call last):
>>>   File "apache_beam/runners/common.py", line 1562, in
>>> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
>>>   File "apache_beam/runners/common.py", line 602, in
>>> apache_beam.runners.common.DoFnInvoker.invoke_setup
>>>   File
>>> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py",
>>> line 1882, in setup
>>>     self._model = self._load_model()
>>>                   ^^^^^^^^^^^^^^^^^^
>>>   File
>>> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py",
>>> line 1848, in _load_model
>>>     model = self._shared_model_handle.acquire(load, tag=self._cur_tag)
>>>             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>>>
>>> I have seen this error where the tensorrt  version is older than 10.x .
>>> Is there any undated tensorRT handler , or am I doing something wrong ?
>>>
>>> On Fri, 19 Sept 2025 at 09:15, XQ Hu <[email protected]> wrote:
>>>
>>>> GPU driver: DRIVER_VERSION=535.261.03
>>>> From the log, the driver was installed correctly (make sure this can be
>>>> used for your tensor RT.)
>>>>
>>>> "Error syncing pod, skipping" err="failed to \"StartContainer\" for
>>>> \"sdk-0-0\" with ErrImagePull: \"failed to pull and unpack image \\\"
>>>> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\\\
>>>> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C%5C%5C>":
>>>> failed to extract layer
>>>> sha256:a848022a4558c435b349317630b139960b44ae09f218ab7f93f764ba4661607d:
>>>> write
>>>> /var/lib/containerd/io.containerd.snapshotter.v1.gcfs/snapshotter/snapshots/55/fs/usr/local/cuda-13.0/targets/x86_64-linux/lib/libcusparseLt.so.0.8.0.4:
>>>> no space left on device: unknown\""
>>>> pod="default/df-inference-pipeline-4-09182012-1a3n-harness-rvxc"
>>>> podUID="3a9b74645db23e77932d981439f1d3cc"
>>>>
>>>> The Dataflow worker cannot unpack the image:  no space left on device
>>>>
>>>> Try
>>>> https://cloud.google.com/dataflow/docs/guides/configure-worker-vm#disk-size
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Sep 18, 2025 at 11:27 PM Sai Shashank <[email protected]>
>>>> wrote:
>>>>
>>>>> So, I was able the start, dataflow :  
>>>>> 2025-09-18_20_12_41-10973298801093076892
>>>>> , but not it having this error: 2025-09-18 23:21:25.401 EDT
>>>>> SDK harnesses are not healthy after 5 minutes, status: Waiting for 4
>>>>> of 4 SDK Harnesses to register. I have noticed this error when there is a
>>>>> mismatch of environments. As advice by you I try running Direct Runner in
>>>>> the docker image and it was running perfectly. is there any tips which you
>>>>> would give me to correct this error ?
>>>>>
>>>>>
>>>>> On Wed, 17 Sept 2025 at 09:42, XQ Hu <[email protected]> wrote:
>>>>>
>>>>>> From the worker log,
>>>>>>
>>>>>> "Failed to read pods from URL" err="invalid pod:
>>>>>> [spec.containers[3].image: Invalid value: \"
>>>>>> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\
>>>>>> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C>":
>>>>>> must not have leading or trailing whitespace]"
>>>>>>
>>>>>> 2025-09-16_17_52_06-10817935125972705087
>>>>>>
>>>>>> Looks like you specify the image URL with the leading whitespace.
>>>>>> Remove it and give it a try.
>>>>>>
>>>>>> And if you have any further questions about GPUs, I highly recommend
>>>>>> you start the VM with L4 GPU and pull your image and ssh into it and run
>>>>>> your pipeline locally with DirectRunner. That can make sure all your code
>>>>>> works.
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 17, 2025 at 9:25 AM XQ Hu <[email protected]> wrote:
>>>>>>
>>>>>>> I saw it. Let me follow up internally.
>>>>>>>
>>>>>>> On Tue, Sep 16, 2025 at 10:10 PM Sai Shashank <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> I already I have open one but opening one more : case number is
>>>>>>>> 63121285
>>>>>>>>
>>>>>>>> On Tue, Sep 16, 2025 at 10:05 PM XQ Hu <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Can you open a cloud support ticket? That can give us the
>>>>>>>>> permission to access your job.
>>>>>>>>>
>>>>>>>>> On Tue, Sep 16, 2025, 9:57 PM Sai Shashank <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hey , can we connect on my office mail? since I could share more
>>>>>>>>>> stuff like pipeline options and other stuff there better and I work 
>>>>>>>>>> at CVS
>>>>>>>>>> so that way it would be under compliance too
>>>>>>>>>>
>>>>>>>>>> On Tue, 16 Sept 2025 at 21:54, Sai Shashank <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> The code works without the custom image of TensorRT, I will only
>>>>>>>>>>> get this:
>>>>>>>>>>>
>>>>>>>>>>> ject to benefit from faster worker startup and autoscaling. If you 
>>>>>>>>>>> experience container-startup related issues, pass the 
>>>>>>>>>>> "disable_image_streaming" experiment to disable image streaming for 
>>>>>>>>>>> the job.
>>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:10.327Z:
>>>>>>>>>>>  JOB_MESSAGE_BASIC: Worker configuration: g2-standard-4 in 
>>>>>>>>>>> us-east4-c.
>>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.480Z:
>>>>>>>>>>>  JOB_MESSAGE_BASIC: Executing operation [10]: Create 
>>>>>>>>>>> URIs/Impulse+[10]: Create URIs/FlatMap(<lambda at 
>>>>>>>>>>> core.py:3994>)+[10]: Create URIs/Map(decode)+[10]: Download 
>>>>>>>>>>> PDFs+[10]: Load PDF Pages+[10]: Preprocess Images+[10]: Run 
>>>>>>>>>>> Inference/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)+[10]: Run 
>>>>>>>>>>> Inference/BeamML_RunInference
>>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.543Z:
>>>>>>>>>>>  JOB_MESSAGE_BASIC: Starting 1 workers in us-east4...
>>>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:Job 
>>>>>>>>>>> 2025-09-16_17_52_06-10817935125972705087 is in state 
>>>>>>>>>>> JOB_STATE_RUNNING
>>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support 
>>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>>> httplib2.Http instance.
>>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support 
>>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>>> httplib2.Http instance.
>>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support 
>>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>>> httplib2.Http instance.
>>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support 
>>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>>> httplib2.Http instance.
>>>>>>>>>>>
>>>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support
>>>>>>>>>>> per-request timeout. Set the timeout when constructing the 
>>>>>>>>>>> httplib2.Http
>>>>>>>>>>> instance.
>>>>>>>>>>>
>>>>>>>>>>> just recurring and after an hour this message pops up  Workflow
>>>>>>>>>>> failed. Causes: The Dataflow job appears to be stuck because no 
>>>>>>>>>>> worker
>>>>>>>>>>> activity has been seen in the last 1h. For more information, see
>>>>>>>>>>> https://cloud.google.com/dataflow/docs/guides/common-errors#error-syncing-pod.
>>>>>>>>>>> You can also get help with Cloud Dataflow at
>>>>>>>>>>> https://cloud.google.com/dataflow/support.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, 16 Sept 2025 at 21:35, XQ Hu <[email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Does the code work without using  TensorRT? Any logs?
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Sep 16, 2025 at 9:28 PM Sai Shashank <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> import apache_beam as beam
>>>>>>>>>>>>> from apache_beam.ml.inference.tensorrt_inference import
>>>>>>>>>>>>> TensorRTEngineHandlerNumPy
>>>>>>>>>>>>> from apache_beam.ml.inference.base import RunInference
>>>>>>>>>>>>>
>>>>>>>>>>>>> #!/usr/bin/env python3
>>>>>>>>>>>>> """
>>>>>>>>>>>>> Apache Beam pipeline for processing PDFs with Triton server
>>>>>>>>>>>>> and saving results to BigQuery.
>>>>>>>>>>>>> This pipeline combines functionality from
>>>>>>>>>>>>> test_triton_document.py, create_bigquery_tables.py,
>>>>>>>>>>>>> and save_to_bigquery.py into a single workflow.
>>>>>>>>>>>>> """
>>>>>>>>>>>>>
>>>>>>>>>>>>> import os
>>>>>>>>>>>>> import sys
>>>>>>>>>>>>> import json
>>>>>>>>>>>>> import uuid
>>>>>>>>>>>>> import argparse
>>>>>>>>>>>>> import logging
>>>>>>>>>>>>> import tempfile
>>>>>>>>>>>>> import datetime
>>>>>>>>>>>>> import requests
>>>>>>>>>>>>> import numpy as np
>>>>>>>>>>>>> import cv2
>>>>>>>>>>>>> from PIL import Image
>>>>>>>>>>>>> import fitz  # PyMuPDF
>>>>>>>>>>>>> from pathlib import Path
>>>>>>>>>>>>> from typing import Dict, List, Tuple, Any, Optional, Iterator
>>>>>>>>>>>>>
>>>>>>>>>>>>> # Apache Beam imports
>>>>>>>>>>>>> import apache_beam as beam
>>>>>>>>>>>>> from apache_beam.options.pipeline_options import
>>>>>>>>>>>>> PipelineOptions, SetupOptions
>>>>>>>>>>>>> from apache_beam.ml.inference.base import RemoteModelHandler,
>>>>>>>>>>>>> PredictionResult
>>>>>>>>>>>>> from apache_beam.ml.inference.utils import _convert_to_result
>>>>>>>>>>>>> from apache_beam.ml.inference.base import RunInference
>>>>>>>>>>>>> from apache_beam.io.gcp.bigquery import WriteToBigQuery
>>>>>>>>>>>>> from apache_beam.io.filesystems import FileSystems
>>>>>>>>>>>>> from apache_beam.io.gcp.gcsio import GcsIO
>>>>>>>>>>>>>
>>>>>>>>>>>>> # Google Cloud imports
>>>>>>>>>>>>> from google.cloud import storage
>>>>>>>>>>>>> from google.cloud import bigquery
>>>>>>>>>>>>>
>>>>>>>>>>>>> # Set up logging
>>>>>>>>>>>>> logging.basicConfig(level=logging.INFO)
>>>>>>>>>>>>> logger = logging.getLogger(__name__)
>>>>>>>>>>>>>
>>>>>>>>>>>>> # DocLayNet classes
>>>>>>>>>>>>> CLASS_ID_TO_NAME = {
>>>>>>>>>>>>>     0: 'Caption',
>>>>>>>>>>>>>     1: 'Footnote',
>>>>>>>>>>>>>     2: 'Formula',
>>>>>>>>>>>>>     3: 'List-item',
>>>>>>>>>>>>>     4: 'Page-footer',
>>>>>>>>>>>>>     5: 'Page-header',
>>>>>>>>>>>>>     6: 'Picture',
>>>>>>>>>>>>>     7: 'Section-header',
>>>>>>>>>>>>>     8: 'Table',
>>>>>>>>>>>>>     9: 'Text',
>>>>>>>>>>>>>     10: 'Title'
>>>>>>>>>>>>> }
>>>>>>>>>>>>> class DownloadPDFFromGCS(beam.DoFn):
>>>>>>>>>>>>>     """Download a PDF from Google Cloud Storage."""
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def __init__(self, temp_dir=None):
>>>>>>>>>>>>>         self.temp_dir = temp_dir or tempfile.gettempdir()
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def process(self, gcs_uri):
>>>>>>>>>>>>>         try:
>>>>>>>>>>>>>             # Parse GCS URI
>>>>>>>>>>>>>             if not gcs_uri.startswith("gs://"):
>>>>>>>>>>>>>                 raise ValueError(f"Invalid GCS URI: {gcs_uri}")
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Remove gs:// prefix and split into bucket and
>>>>>>>>>>>>> blob path
>>>>>>>>>>>>>             path_parts = gcs_uri[5:].split("/", 1)
>>>>>>>>>>>>>             bucket_name = path_parts[0]
>>>>>>>>>>>>>             blob_path = path_parts[1]
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Get filename from blob path
>>>>>>>>>>>>>             filename = os.path.basename(blob_path)
>>>>>>>>>>>>>             local_path = os.path.join(self.temp_dir, filename)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Create temp directory if it doesn't exist
>>>>>>>>>>>>>             os.makedirs(self.temp_dir, exist_ok=True)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             try:
>>>>>>>>>>>>>                 # Download using Beam's GcsIO
>>>>>>>>>>>>>                 with FileSystems.open(gcs_uri, 'rb') as
>>>>>>>>>>>>> gcs_file:
>>>>>>>>>>>>>                     with open(local_path, 'wb') as local_file:
>>>>>>>>>>>>>                         local_file.write(gcs_file.read())
>>>>>>>>>>>>>
>>>>>>>>>>>>>                 logger.info(f"Downloaded {gcs_uri} to
>>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>>
>>>>>>>>>>>>>                 # Return a dictionary with the local path and
>>>>>>>>>>>>> original URI
>>>>>>>>>>>>>                 yield {
>>>>>>>>>>>>>                     'local_path': local_path,
>>>>>>>>>>>>>                     'gcs_uri': gcs_uri,
>>>>>>>>>>>>>                     'filename': filename
>>>>>>>>>>>>>                 }
>>>>>>>>>>>>>             except Exception as e:
>>>>>>>>>>>>>                 logger.error(f"Error reading from GCS:
>>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>>                 # Try alternative download method
>>>>>>>>>>>>>                 logger.info(f"Trying alternative download
>>>>>>>>>>>>> method for {gcs_uri}")
>>>>>>>>>>>>>
>>>>>>>>>>>>>                 # For testing with local files
>>>>>>>>>>>>>                 if os.path.exists(gcs_uri.replace("gs://",
>>>>>>>>>>>>> "")):
>>>>>>>>>>>>>                     local_path = gcs_uri.replace("gs://", "")
>>>>>>>>>>>>>                     logger.info(f"Using local file:
>>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>>                     yield {
>>>>>>>>>>>>>                         'local_path': local_path,
>>>>>>>>>>>>>                         'gcs_uri': gcs_uri,
>>>>>>>>>>>>>                         'filename':
>>>>>>>>>>>>> os.path.basename(local_path)
>>>>>>>>>>>>>                     }
>>>>>>>>>>>>>                 else:
>>>>>>>>>>>>>                     # Try using gsutil command
>>>>>>>>>>>>>                     import subprocess
>>>>>>>>>>>>>                     try:
>>>>>>>>>>>>>                         subprocess.run(["gsutil", "cp",
>>>>>>>>>>>>> gcs_uri, local_path], check=True)
>>>>>>>>>>>>>                         logger.info(f"Downloaded {gcs_uri} to
>>>>>>>>>>>>> {local_path} using gsutil")
>>>>>>>>>>>>>                         yield {
>>>>>>>>>>>>>                             'local_path': local_path,
>>>>>>>>>>>>>                             'gcs_uri': gcs_uri,
>>>>>>>>>>>>>                             'filename': filename
>>>>>>>>>>>>>                         }
>>>>>>>>>>>>>                     except Exception as e2:
>>>>>>>>>>>>>                         logger.error(f"Failed to download
>>>>>>>>>>>>> using gsutil: {str(e2)}")
>>>>>>>>>>>>>
>>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>>             logger.error(f"Error downloading {gcs_uri}:
>>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>> class LoadPDFPages(beam.DoFn):
>>>>>>>>>>>>>     """Load PDF pages as images."""
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def __init__(self, dpi=200):
>>>>>>>>>>>>>         self.dpi = dpi
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def process(self, element):
>>>>>>>>>>>>>         doc = None
>>>>>>>>>>>>>         try:
>>>>>>>>>>>>>             # Make sure we have all required fields
>>>>>>>>>>>>>             if not isinstance(element, dict):
>>>>>>>>>>>>>                 logger.error(f"Expected dictionary, got
>>>>>>>>>>>>> {type(element)}")
>>>>>>>>>>>>>                 return
>>>>>>>>>>>>>
>>>>>>>>>>>>>             if 'local_path' not in element:
>>>>>>>>>>>>>                 logger.error("Missing 'local_path' in element")
>>>>>>>>>>>>>                 return
>>>>>>>>>>>>>
>>>>>>>>>>>>>             local_path = element['local_path']
>>>>>>>>>>>>>             gcs_uri = element.get('gcs_uri', '')
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Extract filename from local_path if not provided
>>>>>>>>>>>>>             filename = element.get('filename',
>>>>>>>>>>>>> os.path.basename(local_path))
>>>>>>>>>>>>>
>>>>>>>>>>>>>             logger.info(f"Loading PDF: {local_path},
>>>>>>>>>>>>> filename: {filename}")
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Check if file exists and is accessible
>>>>>>>>>>>>>             if not os.path.exists(local_path):
>>>>>>>>>>>>>                 logger.error(f"File not found: {local_path}")
>>>>>>>>>>>>>                 return
>>>>>>>>>>>>>
>>>>>>>>>>>>>             if not os.access(local_path, os.R_OK):
>>>>>>>>>>>>>                 logger.error(f"File not readable:
>>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>>                 return
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Open the PDF
>>>>>>>>>>>>>             try:
>>>>>>>>>>>>>                 doc = fitz.open(local_path)
>>>>>>>>>>>>>                 if doc.is_closed:
>>>>>>>>>>>>>                     logger.error(f"Failed to open PDF:
>>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>>                     return
>>>>>>>>>>>>>             except Exception as e:
>>>>>>>>>>>>>                 logger.error(f"Error opening PDF {local_path}:
>>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>>                 return
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Process each page
>>>>>>>>>>>>>             page_count = len(doc)
>>>>>>>>>>>>>             logger.info(f"Processing {page_count} pages from
>>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>>
>>>>>>>>>>>>>             for i in range(page_count):
>>>>>>>>>>>>>                 try:
>>>>>>>>>>>>>                     if doc.is_closed:
>>>>>>>>>>>>>                         logger.error(f"Document was closed
>>>>>>>>>>>>> unexpectedly while processing page {i}")
>>>>>>>>>>>>>                         break
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     page = doc[i]
>>>>>>>>>>>>>                     if page is None:
>>>>>>>>>>>>>                         logger.error(f"Failed to get page {i}
>>>>>>>>>>>>> from document")
>>>>>>>>>>>>>                         continue
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     # Use a higher resolution for better
>>>>>>>>>>>>> quality
>>>>>>>>>>>>>                     scale = self.dpi / 72.0
>>>>>>>>>>>>>                     mat = fitz.Matrix(scale, scale)
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     try:
>>>>>>>>>>>>>                         pix = page.get_pixmap(matrix=mat,
>>>>>>>>>>>>> alpha=False)
>>>>>>>>>>>>>                     except Exception as e:
>>>>>>>>>>>>>                         logger.error(f"Error getting pixmap
>>>>>>>>>>>>> for page {i}: {str(e)}")
>>>>>>>>>>>>>                         continue
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     # Check pixmap dimensions
>>>>>>>>>>>>>                     if pix.height <= 0 or pix.width <= 0 or
>>>>>>>>>>>>> pix.n <= 0:
>>>>>>>>>>>>>                         logger.error(f"Invalid pixmap
>>>>>>>>>>>>> dimensions: {pix.width}x{pix.height}x{pix.n}")
>>>>>>>>>>>>>                         continue
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     # Convert to numpy array
>>>>>>>>>>>>>                     try:
>>>>>>>>>>>>>                         arr = np.frombuffer(pix.samples,
>>>>>>>>>>>>> dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
>>>>>>>>>>>>>                     except Exception as e:
>>>>>>>>>>>>>                         logger.error(f"Error converting pixmap
>>>>>>>>>>>>> to numpy array: {str(e)}")
>>>>>>>>>>>>>                         continue
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     # Convert BGR to RGB if needed
>>>>>>>>>>>>>                     if pix.n == 3:  # RGB
>>>>>>>>>>>>>                         try:
>>>>>>>>>>>>>                             arr = cv2.cvtColor(arr,
>>>>>>>>>>>>> cv2.COLOR_BGR2RGB)
>>>>>>>>>>>>>                         except Exception as e:
>>>>>>>>>>>>>                             logger.error(f"Error converting
>>>>>>>>>>>>> BGR to RGB: {str(e)}")
>>>>>>>>>>>>>                             continue
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     # Store original size for later use
>>>>>>>>>>>>>                     original_size = (arr.shape[0],
>>>>>>>>>>>>> arr.shape[1])
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     # Create page info
>>>>>>>>>>>>>                     page_info = {
>>>>>>>>>>>>>                         'page_num': i,
>>>>>>>>>>>>>                         'image': arr,
>>>>>>>>>>>>>                         'original_size': original_size,
>>>>>>>>>>>>>                         'local_path': local_path,
>>>>>>>>>>>>>                         'gcs_uri': gcs_uri,
>>>>>>>>>>>>>                         'filename': filename
>>>>>>>>>>>>>                     }
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     # Use document ID and page number as key
>>>>>>>>>>>>>                     doc_id = os.path.splitext(filename)[0]
>>>>>>>>>>>>>                     key = f"{doc_id}_{i}"
>>>>>>>>>>>>>
>>>>>>>>>>>>>                     yield (key, page_info)
>>>>>>>>>>>>>                 except Exception as e:
>>>>>>>>>>>>>                     import traceback
>>>>>>>>>>>>>                     logger.error(f"Error processing page {i}:
>>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>>                     logger.error(traceback.format_exc())
>>>>>>>>>>>>>
>>>>>>>>>>>>>             logger.info(f"Loaded {len(doc)} pages from
>>>>>>>>>>>>> {local_path}")
>>>>>>>>>>>>>
>>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>>             import traceback
>>>>>>>>>>>>>             logger.error(f"Error loading PDF: {str(e)}")
>>>>>>>>>>>>>             logger.error(traceback.format_exc())
>>>>>>>>>>>>>         finally:
>>>>>>>>>>>>>             # Make sure to close the document only if it was
>>>>>>>>>>>>> successfully opened
>>>>>>>>>>>>>             if doc is not None:
>>>>>>>>>>>>>                 try:
>>>>>>>>>>>>>                     if not doc.is_closed:
>>>>>>>>>>>>>                         doc.close()
>>>>>>>>>>>>>                 except Exception as e:
>>>>>>>>>>>>>                     logger.debug(f"Error closing document:
>>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>>
>>>>>>>>>>>>> class PreprocessImage(beam.DoFn):
>>>>>>>>>>>>>     """Preprocess image for Triton server."""
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def __init__(self, size=1024):
>>>>>>>>>>>>>         self.size = size
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def letterbox(self, img, new_shape=1024,
>>>>>>>>>>>>> color=(114,114,114)):
>>>>>>>>>>>>>         """Resize and pad image to target size."""
>>>>>>>>>>>>>         h, w = img.shape[:2]
>>>>>>>>>>>>>         r = min(new_shape / h, new_shape / w)
>>>>>>>>>>>>>         nh, nw = int(round(h * r)), int(round(w * r))
>>>>>>>>>>>>>         pad_h, pad_w = new_shape - nh, new_shape - nw
>>>>>>>>>>>>>         top = pad_h // 2
>>>>>>>>>>>>>         bottom = pad_h - top
>>>>>>>>>>>>>         left = pad_w // 2
>>>>>>>>>>>>>         right = pad_w - left
>>>>>>>>>>>>>         img = cv2.resize(img, (nw, nh),
>>>>>>>>>>>>> interpolation=cv2.INTER_LINEAR)
>>>>>>>>>>>>>         img = cv2.copyMakeBorder(img, top, bottom, left,
>>>>>>>>>>>>> right, cv2.BORDER_CONSTANT, value=color)
>>>>>>>>>>>>>         return img, r, left, top
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def process(self, element):
>>>>>>>>>>>>>         try:
>>>>>>>>>>>>>             if not isinstance(element, tuple) or len(element)
>>>>>>>>>>>>> != 2:
>>>>>>>>>>>>>                 logger.error(f"Expected (key, value) tuple,
>>>>>>>>>>>>> got {type(element)}")
>>>>>>>>>>>>>                 return
>>>>>>>>>>>>>
>>>>>>>>>>>>>             key, page_info = element
>>>>>>>>>>>>>
>>>>>>>>>>>>>             if not isinstance(page_info, dict):
>>>>>>>>>>>>>                 logger.error(f"Expected dictionary for
>>>>>>>>>>>>> page_info, got {type(page_info)}")
>>>>>>>>>>>>>                 return
>>>>>>>>>>>>>
>>>>>>>>>>>>>             if 'image' not in page_info:
>>>>>>>>>>>>>                 logger.error("Missing 'image' in page_info")
>>>>>>>>>>>>>                 return
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Create a new dictionary to avoid modifying the
>>>>>>>>>>>>> input
>>>>>>>>>>>>>             new_page_info = dict(page_info)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Apply letterbox resize
>>>>>>>>>>>>>             img = new_page_info['image']
>>>>>>>>>>>>>             lb, r, left, top = self.letterbox(img,
>>>>>>>>>>>>> new_shape=self.size)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Convert to float32 and normalize to [0,1]
>>>>>>>>>>>>>             x = lb.astype(np.float32) / 255.0
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Convert to CHW format
>>>>>>>>>>>>>             x = np.transpose(x, (2, 0, 1))
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Add batch dimension
>>>>>>>>>>>>>             batched_img = np.expand_dims(x, axis=0)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Update page info
>>>>>>>>>>>>>             new_page_info['preprocessed_image'] = batched_img
>>>>>>>>>>>>>             new_page_info['letterbox_info'] = (r, left, top)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             yield (key, new_page_info)
>>>>>>>>>>>>>
>>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>>             import traceback
>>>>>>>>>>>>>             logger.error(f"Error preprocessing image:
>>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>>             logger.error(traceback.format_exc())
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> class ExtractBoxes(beam.DoFn):
>>>>>>>>>>>>>     """Extract bounding boxes from Triton response."""
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def __init__(self, conf_th=0.25, iou_th=0.7,
>>>>>>>>>>>>> model_size=1024):
>>>>>>>>>>>>>         self.conf_th = conf_th
>>>>>>>>>>>>>         self.iou_th = iou_th
>>>>>>>>>>>>>         self.model_size = model_size
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def _nms(self, boxes, scores, iou_th=0.7):
>>>>>>>>>>>>>         """Non-Maximum Suppression"""
>>>>>>>>>>>>>         if len(boxes) == 0:
>>>>>>>>>>>>>             return []
>>>>>>>>>>>>>
>>>>>>>>>>>>>         boxes = boxes.astype(np.float32)
>>>>>>>>>>>>>         x1, y1, x2, y2 = boxes.T
>>>>>>>>>>>>>         areas = (x2 - x1) * (y2 - y1)
>>>>>>>>>>>>>         order = scores.argsort()[::-1]
>>>>>>>>>>>>>
>>>>>>>>>>>>>         keep = []
>>>>>>>>>>>>>         while order.size > 0:
>>>>>>>>>>>>>             i = order[0]
>>>>>>>>>>>>>             keep.append(i)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             xx1 = np.maximum(x1[i], x1[order[1:]])
>>>>>>>>>>>>>             yy1 = np.maximum(y1[i], y1[order[1:]])
>>>>>>>>>>>>>             xx2 = np.minimum(x2[i], x2[order[1:]])
>>>>>>>>>>>>>             yy2 = np.minimum(y2[i], y2[order[1:]])
>>>>>>>>>>>>>
>>>>>>>>>>>>>             w = np.maximum(0.0, xx2 - xx1)
>>>>>>>>>>>>>             h = np.maximum(0.0, yy2 - yy1)
>>>>>>>>>>>>>             inter = w * h
>>>>>>>>>>>>>
>>>>>>>>>>>>>             iou = inter / (areas[i] + areas[order[1:]] - inter
>>>>>>>>>>>>> + 1e-9)
>>>>>>>>>>>>>             inds = np.where(iou <= iou_th)[0]
>>>>>>>>>>>>>             order = order[inds + 1]
>>>>>>>>>>>>>
>>>>>>>>>>>>>         return keep
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def process(self, page_info):
>>>>>>>>>>>>>         try:
>>>>>>>>>>>>>             triton_response = page_info['triton_response']
>>>>>>>>>>>>>             original_size = page_info['original_size']
>>>>>>>>>>>>>             r, left, top = page_info['letterbox_info']
>>>>>>>>>>>>>
>>>>>>>>>>>>>             if "outputs" not in triton_response or not
>>>>>>>>>>>>> triton_response["outputs"]:
>>>>>>>>>>>>>                 logger.error("Invalid response from Triton
>>>>>>>>>>>>> server")
>>>>>>>>>>>>>                 return []
>>>>>>>>>>>>>
>>>>>>>>>>>>>             out_meta = triton_response["outputs"][0]
>>>>>>>>>>>>>             shape = out_meta["shape"]
>>>>>>>>>>>>>             data = np.array(out_meta["data"],
>>>>>>>>>>>>> dtype=np.float32).reshape(shape)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             logger.info(f"Output shape: {shape}")
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # For YOLO output [B, C, P] where C is channels
>>>>>>>>>>>>> (box coords + objectness + classes)
>>>>>>>>>>>>>             B, C, P = shape
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Assuming 4 box coordinates + class probabilities
>>>>>>>>>>>>> (no objectness)
>>>>>>>>>>>>>             has_objectness = False
>>>>>>>>>>>>>             num_classes = C - 5 if has_objectness else C - 4
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Extract data
>>>>>>>>>>>>>             xywh = data[:, 0:4, :]
>>>>>>>>>>>>>             if has_objectness:
>>>>>>>>>>>>>                 obj = data[:, 4:5, :]
>>>>>>>>>>>>>                 cls = data[:, 5:5 + num_classes, :]
>>>>>>>>>>>>>             else:
>>>>>>>>>>>>>                 obj = None
>>>>>>>>>>>>>                 cls = data[:, 4:4 + num_classes, :]
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Process batch item (we only have one)
>>>>>>>>>>>>>             b = 0
>>>>>>>>>>>>>             h, w = original_size
>>>>>>>>>>>>>
>>>>>>>>>>>>>             xywh_b = xywh[b].T  # (P,4)
>>>>>>>>>>>>>             if obj is not None:
>>>>>>>>>>>>>                 obj_b = obj[b].T.squeeze(1)  # (P,)
>>>>>>>>>>>>>             else:
>>>>>>>>>>>>>                 obj_b = np.ones((P,), dtype=np.float32)
>>>>>>>>>>>>>             cls_b = cls[b].T  # (P,nc)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Get scores and labels
>>>>>>>>>>>>>             scores_all = (obj_b[:, None] * cls_b) if obj is
>>>>>>>>>>>>> not None else cls_b
>>>>>>>>>>>>>             labels = scores_all.argmax(axis=1)
>>>>>>>>>>>>>             scores = scores_all.max(axis=1)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Filter by confidence threshold
>>>>>>>>>>>>>             keep = scores >= self.conf_th
>>>>>>>>>>>>>             if not np.any(keep):
>>>>>>>>>>>>>                 logger.info(f"No detections above threshold
>>>>>>>>>>>>> {self.conf_th}")
>>>>>>>>>>>>>                 return []
>>>>>>>>>>>>>
>>>>>>>>>>>>>             xywh_k = xywh_b[keep]
>>>>>>>>>>>>>             scores_k = scores[keep]
>>>>>>>>>>>>>             labels_k = labels[keep]
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # xywh -> xyxy in model space
>>>>>>>>>>>>>             cx, cy, ww, hh = xywh_k.T
>>>>>>>>>>>>>             xyxy_model = np.stack([cx - ww / 2, cy - hh / 2,
>>>>>>>>>>>>> cx + ww / 2, cy + hh / 2], axis=1)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Apply NMS per class
>>>>>>>>>>>>>             final_boxes = []
>>>>>>>>>>>>>             final_scores = []
>>>>>>>>>>>>>             final_labels = []
>>>>>>>>>>>>>
>>>>>>>>>>>>>             for c in np.unique(labels_k):
>>>>>>>>>>>>>                 idxs = np.where(labels_k == c)[0]
>>>>>>>>>>>>>                 if idxs.size == 0:
>>>>>>>>>>>>>                     continue
>>>>>>>>>>>>>                 keep_idx = self._nms(xyxy_model[idxs],
>>>>>>>>>>>>> scores_k[idxs], iou_th=self.iou_th)
>>>>>>>>>>>>>                 final_boxes.append(xyxy_model[idxs][keep_idx])
>>>>>>>>>>>>>                 final_scores.append(scores_k[idxs][keep_idx])
>>>>>>>>>>>>>                 final_labels.append(np.full(len(keep_idx), c,
>>>>>>>>>>>>> dtype=int))
>>>>>>>>>>>>>
>>>>>>>>>>>>>             if not final_boxes:
>>>>>>>>>>>>>                 logger.info("No detections after NMS")
>>>>>>>>>>>>>                 return []
>>>>>>>>>>>>>
>>>>>>>>>>>>>             xyxy_model = np.vstack(final_boxes)
>>>>>>>>>>>>>             scores_k = np.concatenate(final_scores)
>>>>>>>>>>>>>             labels_k = np.concatenate(final_labels)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Map boxes from model space to original image
>>>>>>>>>>>>> space
>>>>>>>>>>>>>             xyxy_orig = xyxy_model.copy()
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Remove padding
>>>>>>>>>>>>>             xyxy_orig[:, [0, 2]] -= left
>>>>>>>>>>>>>             xyxy_orig[:, [1, 3]] -= top
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Scale back to original size
>>>>>>>>>>>>>             xyxy_orig /= r
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Clip to image boundaries
>>>>>>>>>>>>>             xyxy_orig[:, 0::2] = np.clip(xyxy_orig[:, 0::2],
>>>>>>>>>>>>> 0, w - 1)
>>>>>>>>>>>>>             xyxy_orig[:, 1::2] = np.clip(xyxy_orig[:, 1::2],
>>>>>>>>>>>>> 0, h - 1)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Format as requested: x_min, y_min, x_max, y_max,
>>>>>>>>>>>>> class, probability
>>>>>>>>>>>>>             boxes = []
>>>>>>>>>>>>>             for (x1, y1, x2, y2), label, score in
>>>>>>>>>>>>> zip(xyxy_orig, labels_k, scores_k):
>>>>>>>>>>>>>                 class_name = CLASS_ID_TO_NAME.get(int(label))
>>>>>>>>>>>>>                 box_info = {
>>>>>>>>>>>>>                     "page": page_info['page_num'],
>>>>>>>>>>>>>                     "x_min": float(x1),
>>>>>>>>>>>>>                     "y_min": float(y1),
>>>>>>>>>>>>>                     "x_max": float(x2),
>>>>>>>>>>>>>                     "y_max": float(y2),
>>>>>>>>>>>>>                     "class": int(label),
>>>>>>>>>>>>>                     "class_name": class_name,
>>>>>>>>>>>>>                     "probability": float(score),
>>>>>>>>>>>>>                     "filename": page_info['filename'],
>>>>>>>>>>>>>                     "local_path": page_info['local_path'],
>>>>>>>>>>>>>                     "gcs_uri": page_info['gcs_uri']
>>>>>>>>>>>>>                 }
>>>>>>>>>>>>>                 boxes.append(box_info)
>>>>>>>>>>>>>
>>>>>>>>>>>>>             logger.info(f"Extracted {len(boxes)} boxes from
>>>>>>>>>>>>> page {page_info['page_num']}")
>>>>>>>>>>>>>
>>>>>>>>>>>>>             return boxes
>>>>>>>>>>>>>
>>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>>             logger.error(f"Error extracting boxes: {str(e)}")
>>>>>>>>>>>>>             return []
>>>>>>>>>>>>>
>>>>>>>>>>>>> class PrepareForBigQuery(beam.DoFn):
>>>>>>>>>>>>>     """Prepare data for BigQuery insertion."""
>>>>>>>>>>>>>
>>>>>>>>>>>>>     def process(self, box_info):
>>>>>>>>>>>>>         try:
>>>>>>>>>>>>>             # Generate UUIDs for primary keys
>>>>>>>>>>>>>             v_note_id = str(uuid.uuid4())
>>>>>>>>>>>>>             page_ocr_id = str(uuid.uuid4())
>>>>>>>>>>>>>             class_prediction_id = str(uuid.uuid4())
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Create timestamp
>>>>>>>>>>>>>             processing_time =
>>>>>>>>>>>>> datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Create ocr_results row
>>>>>>>>>>>>>             ocr_results_row = {
>>>>>>>>>>>>>                 "v_note_id": v_note_id,
>>>>>>>>>>>>>                 "filename": box_info['filename'],
>>>>>>>>>>>>>                 "file_path": box_info['gcs_uri'],
>>>>>>>>>>>>>                 "processing_time": processing_time,
>>>>>>>>>>>>>                 "file_type": "pdf"
>>>>>>>>>>>>>             }
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Create page_ocr row
>>>>>>>>>>>>>             page_ocr_row = {
>>>>>>>>>>>>>                 "page_ocr_id": page_ocr_id,
>>>>>>>>>>>>>                 "v_note_id": v_note_id,
>>>>>>>>>>>>>                 "page_number": box_info['page']
>>>>>>>>>>>>>             }
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Create class_prediction row
>>>>>>>>>>>>>             class_prediction_row = {
>>>>>>>>>>>>>                 "class_prediction_id": class_prediction_id,
>>>>>>>>>>>>>                 "page_ocr_id": page_ocr_id,
>>>>>>>>>>>>>                 "xmin": box_info['x_min'],
>>>>>>>>>>>>>                 "ymin": box_info['y_min'],
>>>>>>>>>>>>>                 "xmax": box_info['x_max'],
>>>>>>>>>>>>>                 "ymax": box_info['y_max'],
>>>>>>>>>>>>>                 "class": box_info['class_name'] if
>>>>>>>>>>>>> box_info['class_name'] else str(box_info['class']),
>>>>>>>>>>>>>                 "confidence": box_info['probability']
>>>>>>>>>>>>>             }
>>>>>>>>>>>>>
>>>>>>>>>>>>>             # Return all three rows with table names
>>>>>>>>>>>>>             return [
>>>>>>>>>>>>>                 ('ocr_results', ocr_results_row),
>>>>>>>>>>>>>                 ('page_ocr', page_ocr_row),
>>>>>>>>>>>>>                 ('class_prediction', class_prediction_row)
>>>>>>>>>>>>>             ]
>>>>>>>>>>>>>
>>>>>>>>>>>>>         except Exception as e:
>>>>>>>>>>>>>             logger.error(f"Error preparing for BigQuery:
>>>>>>>>>>>>> {str(e)}")
>>>>>>>>>>>>>             return []
>>>>>>>>>>>>>
>>>>>>>>>>>>> model_handler = TensorRTEngineHandlerNumPy(
>>>>>>>>>>>>>   min_batch_size=1,
>>>>>>>>>>>>>   max_batch_size=1,
>>>>>>>>>>>>>   engine_path="gs://temp/yolov11l-doclaynet.engine",
>>>>>>>>>>>>> )
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> with beam.Pipeline(options=options) as pipeline:
>>>>>>>>>>>>>
>>>>>>>>>>>>>         # Create PCollection from input URIs
>>>>>>>>>>>>>         pdf_uris = (
>>>>>>>>>>>>>             pipeline
>>>>>>>>>>>>>             | "Create URIs" >> beam.Create(["tmp.pdf"])
>>>>>>>>>>>>>         )
>>>>>>>>>>>>>
>>>>>>>>>>>>>         # Download PDFs
>>>>>>>>>>>>>         local_pdfs = (
>>>>>>>>>>>>>             pdf_uris
>>>>>>>>>>>>>             | "Download PDFs" >>
>>>>>>>>>>>>> beam.ParDo(DownloadPDFFromGCS())
>>>>>>>>>>>>>         )
>>>>>>>>>>>>>
>>>>>>>>>>>>>          # Load PDF pages
>>>>>>>>>>>>>         pdf_pages = (
>>>>>>>>>>>>>             local_pdfs
>>>>>>>>>>>>>             | "Load PDF Pages" >> beam.ParDo(LoadPDFPages())
>>>>>>>>>>>>>             #| "Flatten Pages" >> beam.FlatMap(lambda x: x)
>>>>>>>>>>>>>         )
>>>>>>>>>>>>>
>>>>>>>>>>>>>         # Preprocess images
>>>>>>>>>>>>>         preprocessed_pages = (
>>>>>>>>>>>>>             pdf_pages
>>>>>>>>>>>>>             | "Preprocess Images" >>
>>>>>>>>>>>>> beam.ParDo(PreprocessImage())
>>>>>>>>>>>>>         )
>>>>>>>>>>>>>         inference_results = (
>>>>>>>>>>>>>             preprocessed_pages
>>>>>>>>>>>>>             | "Run Inference" >>
>>>>>>>>>>>>> RunInference(model_handler=model_handler)
>>>>>>>>>>>>>         )
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, 16 Sept 2025 at 21:23, XQ Hu <[email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can you share your commands and outputs?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Sep 16, 2025 at 9:02 PM Sai Shashank <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Okay I have changed the docker image but  to now to RUN the
>>>>>>>>>>>>>>> python command but it is still halting without are error or 
>>>>>>>>>>>>>>> warnings or
>>>>>>>>>>>>>>> errors
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, 16 Sept 2025 at 17:38, XQ Hu via dev <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The CMD is not necessary as it will be overridden by the
>>>>>>>>>>>>>>>> ENTRYPOINT just like your comment.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If you ssh to your Docker container like `docker run --rm
>>>>>>>>>>>>>>>> -it --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE`, can you 
>>>>>>>>>>>>>>>> run python and
>>>>>>>>>>>>>>>> some Beam pipelines with a direct runner in the container? 
>>>>>>>>>>>>>>>> This can help
>>>>>>>>>>>>>>>> test the environment works fine.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have one old Dockerfile that used to work with the old
>>>>>>>>>>>>>>>> Beam:
>>>>>>>>>>>>>>>> https://github.com/google/dataflow-ml-starter/blob/main/tensor_rt.Dockerfile
>>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Sep 16, 2025 at 4:56 PM Sai Shashank <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ---------- Forwarded message ---------
>>>>>>>>>>>>>>>>> From: Sai Shashank <[email protected]>
>>>>>>>>>>>>>>>>> Date: Tue, Sep 16, 2025 at 4:27 PM
>>>>>>>>>>>>>>>>> Subject: TensorRT inference not starting
>>>>>>>>>>>>>>>>> To: <[email protected]>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hey Everyone,
>>>>>>>>>>>>>>>>>                          I was trying to use tensorRT
>>>>>>>>>>>>>>>>> within the apache beam on dataflow but somehow , dataflow 
>>>>>>>>>>>>>>>>> didn't start like
>>>>>>>>>>>>>>>>> it did not even give me Worker logs. Below is the docker file 
>>>>>>>>>>>>>>>>> that , use to
>>>>>>>>>>>>>>>>> create a custom  image, at first I thought it is the version 
>>>>>>>>>>>>>>>>> mismatched but
>>>>>>>>>>>>>>>>> usually it gives me a harness error .
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ARG BUILD_IMAGE=nvcr.io/nvidia/tensorrt:25.08-py3
>>>>>>>>>>>>>>>>> FROM ${BUILD_IMAGE}
>>>>>>>>>>>>>>>>> ENV PATH="/usr/src/tensorrt/bin:${PATH}"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> WORKDIR /workspace
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> RUN apt-get update -y && apt-get install -y python3-venv
>>>>>>>>>>>>>>>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.67.0
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> COPY --from=apache/beam_python3.10_sdk:2.67.0
>>>>>>>>>>>>>>>>> /opt/apache/beam /opt/apache/beam
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> # Install additional dependencies
>>>>>>>>>>>>>>>>> RUN pip install --upgrade pip \
>>>>>>>>>>>>>>>>>     && pip install torch \
>>>>>>>>>>>>>>>>>     && pip install torchvision \
>>>>>>>>>>>>>>>>>     && pip install pillow>=8.0.0 \
>>>>>>>>>>>>>>>>>     && pip install transformers>=4.18.0 \
>>>>>>>>>>>>>>>>>     && pip install cuda-python \
>>>>>>>>>>>>>>>>>     && pip install opencv-python==4.7.0.72 \
>>>>>>>>>>>>>>>>>     && pip install PyMuPDF==1.22.5 \
>>>>>>>>>>>>>>>>>     && pip install requests==2.31.0
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> # Set the default command to run the inference script
>>>>>>>>>>>>>>>>> # This will be overridden by the Apache Beam boot script
>>>>>>>>>>>>>>>>> CMD ["python", "/workspace/inference.py"]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> # Use the Apache Beam boot script as the entrypoint
>>>>>>>>>>>>>>>>> ENTRYPOINT ["/opt/apache/beam/boot"]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>

Reply via email to