So finally, I was able to resolve the issue of docker image but now, I saw
this error ^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/tensorrt_inference.py",
line 132, in __init__
for i in range(self.engine.num_bindings):
^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'tensorrt.tensorrt.ICudaEngine' object has no attribute
'num_bindings'
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1562, in
apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 602, in
apache_beam.runners.common.DoFnInvoker.invoke_setup
File
"/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py",
line 1882, in setup
self._model = self._load_model()
^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py",
line 1848, in _load_model
model = self._shared_model_handle.acquire(load, tag=self._cur_tag)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
I have seen this error where the tensorrt version is older than 10.x . Is
there any undated tensorRT handler , or am I doing something wrong ?
On Fri, 19 Sept 2025 at 09:15, XQ Hu <[email protected]> wrote:
> GPU driver: DRIVER_VERSION=535.261.03
> From the log, the driver was installed correctly (make sure this can be
> used for your tensor RT.)
>
> "Error syncing pod, skipping" err="failed to \"StartContainer\" for
> \"sdk-0-0\" with ErrImagePull: \"failed to pull and unpack image \\\"
> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\\\
> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C%5C%5C>":
> failed to extract layer
> sha256:a848022a4558c435b349317630b139960b44ae09f218ab7f93f764ba4661607d:
> write
> /var/lib/containerd/io.containerd.snapshotter.v1.gcfs/snapshotter/snapshots/55/fs/usr/local/cuda-13.0/targets/x86_64-linux/lib/libcusparseLt.so.0.8.0.4:
> no space left on device: unknown\""
> pod="default/df-inference-pipeline-4-09182012-1a3n-harness-rvxc"
> podUID="3a9b74645db23e77932d981439f1d3cc"
>
> The Dataflow worker cannot unpack the image: no space left on device
>
> Try
> https://cloud.google.com/dataflow/docs/guides/configure-worker-vm#disk-size
>
>
>
>
> On Thu, Sep 18, 2025 at 11:27 PM Sai Shashank <[email protected]>
> wrote:
>
>> So, I was able the start, dataflow :
>> 2025-09-18_20_12_41-10973298801093076892
>> , but not it having this error: 2025-09-18 23:21:25.401 EDT
>> SDK harnesses are not healthy after 5 minutes, status: Waiting for 4 of 4
>> SDK Harnesses to register. I have noticed this error when there is a
>> mismatch of environments. As advice by you I try running Direct Runner in
>> the docker image and it was running perfectly. is there any tips which you
>> would give me to correct this error ?
>>
>>
>> On Wed, 17 Sept 2025 at 09:42, XQ Hu <[email protected]> wrote:
>>
>>> From the worker log,
>>>
>>> "Failed to read pods from URL" err="invalid pod:
>>> [spec.containers[3].image: Invalid value: \"
>>> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\
>>> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C>":
>>> must not have leading or trailing whitespace]"
>>>
>>> 2025-09-16_17_52_06-10817935125972705087
>>>
>>> Looks like you specify the image URL with the leading whitespace. Remove
>>> it and give it a try.
>>>
>>> And if you have any further questions about GPUs, I highly recommend you
>>> start the VM with L4 GPU and pull your image and ssh into it and run your
>>> pipeline locally with DirectRunner. That can make sure all your code works.
>>>
>>>
>>> On Wed, Sep 17, 2025 at 9:25 AM XQ Hu <[email protected]> wrote:
>>>
>>>> I saw it. Let me follow up internally.
>>>>
>>>> On Tue, Sep 16, 2025 at 10:10 PM Sai Shashank <[email protected]>
>>>> wrote:
>>>>
>>>>> I already I have open one but opening one more : case number is
>>>>> 63121285
>>>>>
>>>>> On Tue, Sep 16, 2025 at 10:05 PM XQ Hu <[email protected]> wrote:
>>>>>
>>>>>> Can you open a cloud support ticket? That can give us the permission
>>>>>> to access your job.
>>>>>>
>>>>>> On Tue, Sep 16, 2025, 9:57 PM Sai Shashank <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey , can we connect on my office mail? since I could share more
>>>>>>> stuff like pipeline options and other stuff there better and I work at
>>>>>>> CVS
>>>>>>> so that way it would be under compliance too
>>>>>>>
>>>>>>> On Tue, 16 Sept 2025 at 21:54, Sai Shashank <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> The code works without the custom image of TensorRT, I will only
>>>>>>>> get this:
>>>>>>>>
>>>>>>>> ject to benefit from faster worker startup and autoscaling. If you
>>>>>>>> experience container-startup related issues, pass the
>>>>>>>> "disable_image_streaming" experiment to disable image streaming for
>>>>>>>> the job.
>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:10.327Z:
>>>>>>>> JOB_MESSAGE_BASIC: Worker configuration: g2-standard-4 in us-east4-c.
>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.480Z:
>>>>>>>> JOB_MESSAGE_BASIC: Executing operation [10]: Create
>>>>>>>> URIs/Impulse+[10]: Create URIs/FlatMap(<lambda at core.py:3994>)+[10]:
>>>>>>>> Create URIs/Map(decode)+[10]: Download PDFs+[10]: Load PDF Pages+[10]:
>>>>>>>> Preprocess Images+[10]: Run
>>>>>>>> Inference/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)+[10]: Run
>>>>>>>> Inference/BeamML_RunInference
>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.543Z:
>>>>>>>> JOB_MESSAGE_BASIC: Starting 1 workers in us-east4...
>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:Job
>>>>>>>> 2025-09-16_17_52_06-10817935125972705087 is in state JOB_STATE_RUNNING
>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support
>>>>>>>> per-request timeout. Set the timeout when constructing the
>>>>>>>> httplib2.Http instance.
>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support
>>>>>>>> per-request timeout. Set the timeout when constructing the
>>>>>>>> httplib2.Http instance.
>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support
>>>>>>>> per-request timeout. Set the timeout when constructing the
>>>>>>>> httplib2.Http instance.
>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support
>>>>>>>> per-request timeout. Set the timeout when constructing the
>>>>>>>> httplib2.Http instance.
>>>>>>>>
>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support
>>>>>>>> per-request timeout. Set the timeout when constructing the
>>>>>>>> httplib2.Http
>>>>>>>> instance.
>>>>>>>>
>>>>>>>> just recurring and after an hour this message pops up Workflow
>>>>>>>> failed. Causes: The Dataflow job appears to be stuck because no worker
>>>>>>>> activity has been seen in the last 1h. For more information, see
>>>>>>>> https://cloud.google.com/dataflow/docs/guides/common-errors#error-syncing-pod.
>>>>>>>> You can also get help with Cloud Dataflow at
>>>>>>>> https://cloud.google.com/dataflow/support.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, 16 Sept 2025 at 21:35, XQ Hu <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Does the code work without using TensorRT? Any logs?
>>>>>>>>>
>>>>>>>>> On Tue, Sep 16, 2025 at 9:28 PM Sai Shashank <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> import apache_beam as beam
>>>>>>>>>> from apache_beam.ml.inference.tensorrt_inference import
>>>>>>>>>> TensorRTEngineHandlerNumPy
>>>>>>>>>> from apache_beam.ml.inference.base import RunInference
>>>>>>>>>>
>>>>>>>>>> #!/usr/bin/env python3
>>>>>>>>>> """
>>>>>>>>>> Apache Beam pipeline for processing PDFs with Triton server and
>>>>>>>>>> saving results to BigQuery.
>>>>>>>>>> This pipeline combines functionality from
>>>>>>>>>> test_triton_document.py, create_bigquery_tables.py,
>>>>>>>>>> and save_to_bigquery.py into a single workflow.
>>>>>>>>>> """
>>>>>>>>>>
>>>>>>>>>> import os
>>>>>>>>>> import sys
>>>>>>>>>> import json
>>>>>>>>>> import uuid
>>>>>>>>>> import argparse
>>>>>>>>>> import logging
>>>>>>>>>> import tempfile
>>>>>>>>>> import datetime
>>>>>>>>>> import requests
>>>>>>>>>> import numpy as np
>>>>>>>>>> import cv2
>>>>>>>>>> from PIL import Image
>>>>>>>>>> import fitz # PyMuPDF
>>>>>>>>>> from pathlib import Path
>>>>>>>>>> from typing import Dict, List, Tuple, Any, Optional, Iterator
>>>>>>>>>>
>>>>>>>>>> # Apache Beam imports
>>>>>>>>>> import apache_beam as beam
>>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions,
>>>>>>>>>> SetupOptions
>>>>>>>>>> from apache_beam.ml.inference.base import RemoteModelHandler,
>>>>>>>>>> PredictionResult
>>>>>>>>>> from apache_beam.ml.inference.utils import _convert_to_result
>>>>>>>>>> from apache_beam.ml.inference.base import RunInference
>>>>>>>>>> from apache_beam.io.gcp.bigquery import WriteToBigQuery
>>>>>>>>>> from apache_beam.io.filesystems import FileSystems
>>>>>>>>>> from apache_beam.io.gcp.gcsio import GcsIO
>>>>>>>>>>
>>>>>>>>>> # Google Cloud imports
>>>>>>>>>> from google.cloud import storage
>>>>>>>>>> from google.cloud import bigquery
>>>>>>>>>>
>>>>>>>>>> # Set up logging
>>>>>>>>>> logging.basicConfig(level=logging.INFO)
>>>>>>>>>> logger = logging.getLogger(__name__)
>>>>>>>>>>
>>>>>>>>>> # DocLayNet classes
>>>>>>>>>> CLASS_ID_TO_NAME = {
>>>>>>>>>> 0: 'Caption',
>>>>>>>>>> 1: 'Footnote',
>>>>>>>>>> 2: 'Formula',
>>>>>>>>>> 3: 'List-item',
>>>>>>>>>> 4: 'Page-footer',
>>>>>>>>>> 5: 'Page-header',
>>>>>>>>>> 6: 'Picture',
>>>>>>>>>> 7: 'Section-header',
>>>>>>>>>> 8: 'Table',
>>>>>>>>>> 9: 'Text',
>>>>>>>>>> 10: 'Title'
>>>>>>>>>> }
>>>>>>>>>> class DownloadPDFFromGCS(beam.DoFn):
>>>>>>>>>> """Download a PDF from Google Cloud Storage."""
>>>>>>>>>>
>>>>>>>>>> def __init__(self, temp_dir=None):
>>>>>>>>>> self.temp_dir = temp_dir or tempfile.gettempdir()
>>>>>>>>>>
>>>>>>>>>> def process(self, gcs_uri):
>>>>>>>>>> try:
>>>>>>>>>> # Parse GCS URI
>>>>>>>>>> if not gcs_uri.startswith("gs://"):
>>>>>>>>>> raise ValueError(f"Invalid GCS URI: {gcs_uri}")
>>>>>>>>>>
>>>>>>>>>> # Remove gs:// prefix and split into bucket and blob
>>>>>>>>>> path
>>>>>>>>>> path_parts = gcs_uri[5:].split("/", 1)
>>>>>>>>>> bucket_name = path_parts[0]
>>>>>>>>>> blob_path = path_parts[1]
>>>>>>>>>>
>>>>>>>>>> # Get filename from blob path
>>>>>>>>>> filename = os.path.basename(blob_path)
>>>>>>>>>> local_path = os.path.join(self.temp_dir, filename)
>>>>>>>>>>
>>>>>>>>>> # Create temp directory if it doesn't exist
>>>>>>>>>> os.makedirs(self.temp_dir, exist_ok=True)
>>>>>>>>>>
>>>>>>>>>> try:
>>>>>>>>>> # Download using Beam's GcsIO
>>>>>>>>>> with FileSystems.open(gcs_uri, 'rb') as gcs_file:
>>>>>>>>>> with open(local_path, 'wb') as local_file:
>>>>>>>>>> local_file.write(gcs_file.read())
>>>>>>>>>>
>>>>>>>>>> logger.info(f"Downloaded {gcs_uri} to
>>>>>>>>>> {local_path}")
>>>>>>>>>>
>>>>>>>>>> # Return a dictionary with the local path and
>>>>>>>>>> original URI
>>>>>>>>>> yield {
>>>>>>>>>> 'local_path': local_path,
>>>>>>>>>> 'gcs_uri': gcs_uri,
>>>>>>>>>> 'filename': filename
>>>>>>>>>> }
>>>>>>>>>> except Exception as e:
>>>>>>>>>> logger.error(f"Error reading from GCS: {str(e)}")
>>>>>>>>>> # Try alternative download method
>>>>>>>>>> logger.info(f"Trying alternative download method
>>>>>>>>>> for {gcs_uri}")
>>>>>>>>>>
>>>>>>>>>> # For testing with local files
>>>>>>>>>> if os.path.exists(gcs_uri.replace("gs://", "")):
>>>>>>>>>> local_path = gcs_uri.replace("gs://", "")
>>>>>>>>>> logger.info(f"Using local file:
>>>>>>>>>> {local_path}")
>>>>>>>>>> yield {
>>>>>>>>>> 'local_path': local_path,
>>>>>>>>>> 'gcs_uri': gcs_uri,
>>>>>>>>>> 'filename': os.path.basename(local_path)
>>>>>>>>>> }
>>>>>>>>>> else:
>>>>>>>>>> # Try using gsutil command
>>>>>>>>>> import subprocess
>>>>>>>>>> try:
>>>>>>>>>> subprocess.run(["gsutil", "cp", gcs_uri,
>>>>>>>>>> local_path], check=True)
>>>>>>>>>> logger.info(f"Downloaded {gcs_uri} to
>>>>>>>>>> {local_path} using gsutil")
>>>>>>>>>> yield {
>>>>>>>>>> 'local_path': local_path,
>>>>>>>>>> 'gcs_uri': gcs_uri,
>>>>>>>>>> 'filename': filename
>>>>>>>>>> }
>>>>>>>>>> except Exception as e2:
>>>>>>>>>> logger.error(f"Failed to download using
>>>>>>>>>> gsutil: {str(e2)}")
>>>>>>>>>>
>>>>>>>>>> except Exception as e:
>>>>>>>>>> logger.error(f"Error downloading {gcs_uri}: {str(e)}")
>>>>>>>>>> class LoadPDFPages(beam.DoFn):
>>>>>>>>>> """Load PDF pages as images."""
>>>>>>>>>>
>>>>>>>>>> def __init__(self, dpi=200):
>>>>>>>>>> self.dpi = dpi
>>>>>>>>>>
>>>>>>>>>> def process(self, element):
>>>>>>>>>> doc = None
>>>>>>>>>> try:
>>>>>>>>>> # Make sure we have all required fields
>>>>>>>>>> if not isinstance(element, dict):
>>>>>>>>>> logger.error(f"Expected dictionary, got
>>>>>>>>>> {type(element)}")
>>>>>>>>>> return
>>>>>>>>>>
>>>>>>>>>> if 'local_path' not in element:
>>>>>>>>>> logger.error("Missing 'local_path' in element")
>>>>>>>>>> return
>>>>>>>>>>
>>>>>>>>>> local_path = element['local_path']
>>>>>>>>>> gcs_uri = element.get('gcs_uri', '')
>>>>>>>>>>
>>>>>>>>>> # Extract filename from local_path if not provided
>>>>>>>>>> filename = element.get('filename',
>>>>>>>>>> os.path.basename(local_path))
>>>>>>>>>>
>>>>>>>>>> logger.info(f"Loading PDF: {local_path}, filename:
>>>>>>>>>> {filename}")
>>>>>>>>>>
>>>>>>>>>> # Check if file exists and is accessible
>>>>>>>>>> if not os.path.exists(local_path):
>>>>>>>>>> logger.error(f"File not found: {local_path}")
>>>>>>>>>> return
>>>>>>>>>>
>>>>>>>>>> if not os.access(local_path, os.R_OK):
>>>>>>>>>> logger.error(f"File not readable: {local_path}")
>>>>>>>>>> return
>>>>>>>>>>
>>>>>>>>>> # Open the PDF
>>>>>>>>>> try:
>>>>>>>>>> doc = fitz.open(local_path)
>>>>>>>>>> if doc.is_closed:
>>>>>>>>>> logger.error(f"Failed to open PDF:
>>>>>>>>>> {local_path}")
>>>>>>>>>> return
>>>>>>>>>> except Exception as e:
>>>>>>>>>> logger.error(f"Error opening PDF {local_path}:
>>>>>>>>>> {str(e)}")
>>>>>>>>>> return
>>>>>>>>>>
>>>>>>>>>> # Process each page
>>>>>>>>>> page_count = len(doc)
>>>>>>>>>> logger.info(f"Processing {page_count} pages from
>>>>>>>>>> {local_path}")
>>>>>>>>>>
>>>>>>>>>> for i in range(page_count):
>>>>>>>>>> try:
>>>>>>>>>> if doc.is_closed:
>>>>>>>>>> logger.error(f"Document was closed
>>>>>>>>>> unexpectedly while processing page {i}")
>>>>>>>>>> break
>>>>>>>>>>
>>>>>>>>>> page = doc[i]
>>>>>>>>>> if page is None:
>>>>>>>>>> logger.error(f"Failed to get page {i}
>>>>>>>>>> from document")
>>>>>>>>>> continue
>>>>>>>>>>
>>>>>>>>>> # Use a higher resolution for better quality
>>>>>>>>>> scale = self.dpi / 72.0
>>>>>>>>>> mat = fitz.Matrix(scale, scale)
>>>>>>>>>>
>>>>>>>>>> try:
>>>>>>>>>> pix = page.get_pixmap(matrix=mat,
>>>>>>>>>> alpha=False)
>>>>>>>>>> except Exception as e:
>>>>>>>>>> logger.error(f"Error getting pixmap for
>>>>>>>>>> page {i}: {str(e)}")
>>>>>>>>>> continue
>>>>>>>>>>
>>>>>>>>>> # Check pixmap dimensions
>>>>>>>>>> if pix.height <= 0 or pix.width <= 0 or pix.n
>>>>>>>>>> <= 0:
>>>>>>>>>> logger.error(f"Invalid pixmap dimensions:
>>>>>>>>>> {pix.width}x{pix.height}x{pix.n}")
>>>>>>>>>> continue
>>>>>>>>>>
>>>>>>>>>> # Convert to numpy array
>>>>>>>>>> try:
>>>>>>>>>> arr = np.frombuffer(pix.samples,
>>>>>>>>>> dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
>>>>>>>>>> except Exception as e:
>>>>>>>>>> logger.error(f"Error converting pixmap to
>>>>>>>>>> numpy array: {str(e)}")
>>>>>>>>>> continue
>>>>>>>>>>
>>>>>>>>>> # Convert BGR to RGB if needed
>>>>>>>>>> if pix.n == 3: # RGB
>>>>>>>>>> try:
>>>>>>>>>> arr = cv2.cvtColor(arr,
>>>>>>>>>> cv2.COLOR_BGR2RGB)
>>>>>>>>>> except Exception as e:
>>>>>>>>>> logger.error(f"Error converting BGR
>>>>>>>>>> to RGB: {str(e)}")
>>>>>>>>>> continue
>>>>>>>>>>
>>>>>>>>>> # Store original size for later use
>>>>>>>>>> original_size = (arr.shape[0], arr.shape[1])
>>>>>>>>>>
>>>>>>>>>> # Create page info
>>>>>>>>>> page_info = {
>>>>>>>>>> 'page_num': i,
>>>>>>>>>> 'image': arr,
>>>>>>>>>> 'original_size': original_size,
>>>>>>>>>> 'local_path': local_path,
>>>>>>>>>> 'gcs_uri': gcs_uri,
>>>>>>>>>> 'filename': filename
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> # Use document ID and page number as key
>>>>>>>>>> doc_id = os.path.splitext(filename)[0]
>>>>>>>>>> key = f"{doc_id}_{i}"
>>>>>>>>>>
>>>>>>>>>> yield (key, page_info)
>>>>>>>>>> except Exception as e:
>>>>>>>>>> import traceback
>>>>>>>>>> logger.error(f"Error processing page {i}:
>>>>>>>>>> {str(e)}")
>>>>>>>>>> logger.error(traceback.format_exc())
>>>>>>>>>>
>>>>>>>>>> logger.info(f"Loaded {len(doc)} pages from
>>>>>>>>>> {local_path}")
>>>>>>>>>>
>>>>>>>>>> except Exception as e:
>>>>>>>>>> import traceback
>>>>>>>>>> logger.error(f"Error loading PDF: {str(e)}")
>>>>>>>>>> logger.error(traceback.format_exc())
>>>>>>>>>> finally:
>>>>>>>>>> # Make sure to close the document only if it was
>>>>>>>>>> successfully opened
>>>>>>>>>> if doc is not None:
>>>>>>>>>> try:
>>>>>>>>>> if not doc.is_closed:
>>>>>>>>>> doc.close()
>>>>>>>>>> except Exception as e:
>>>>>>>>>> logger.debug(f"Error closing document:
>>>>>>>>>> {str(e)}")
>>>>>>>>>>
>>>>>>>>>> class PreprocessImage(beam.DoFn):
>>>>>>>>>> """Preprocess image for Triton server."""
>>>>>>>>>>
>>>>>>>>>> def __init__(self, size=1024):
>>>>>>>>>> self.size = size
>>>>>>>>>>
>>>>>>>>>> def letterbox(self, img, new_shape=1024, color=(114,114,114)):
>>>>>>>>>> """Resize and pad image to target size."""
>>>>>>>>>> h, w = img.shape[:2]
>>>>>>>>>> r = min(new_shape / h, new_shape / w)
>>>>>>>>>> nh, nw = int(round(h * r)), int(round(w * r))
>>>>>>>>>> pad_h, pad_w = new_shape - nh, new_shape - nw
>>>>>>>>>> top = pad_h // 2
>>>>>>>>>> bottom = pad_h - top
>>>>>>>>>> left = pad_w // 2
>>>>>>>>>> right = pad_w - left
>>>>>>>>>> img = cv2.resize(img, (nw, nh),
>>>>>>>>>> interpolation=cv2.INTER_LINEAR)
>>>>>>>>>> img = cv2.copyMakeBorder(img, top, bottom, left, right,
>>>>>>>>>> cv2.BORDER_CONSTANT, value=color)
>>>>>>>>>> return img, r, left, top
>>>>>>>>>>
>>>>>>>>>> def process(self, element):
>>>>>>>>>> try:
>>>>>>>>>> if not isinstance(element, tuple) or len(element) !=
>>>>>>>>>> 2:
>>>>>>>>>> logger.error(f"Expected (key, value) tuple, got
>>>>>>>>>> {type(element)}")
>>>>>>>>>> return
>>>>>>>>>>
>>>>>>>>>> key, page_info = element
>>>>>>>>>>
>>>>>>>>>> if not isinstance(page_info, dict):
>>>>>>>>>> logger.error(f"Expected dictionary for page_info,
>>>>>>>>>> got {type(page_info)}")
>>>>>>>>>> return
>>>>>>>>>>
>>>>>>>>>> if 'image' not in page_info:
>>>>>>>>>> logger.error("Missing 'image' in page_info")
>>>>>>>>>> return
>>>>>>>>>>
>>>>>>>>>> # Create a new dictionary to avoid modifying the input
>>>>>>>>>> new_page_info = dict(page_info)
>>>>>>>>>>
>>>>>>>>>> # Apply letterbox resize
>>>>>>>>>> img = new_page_info['image']
>>>>>>>>>> lb, r, left, top = self.letterbox(img,
>>>>>>>>>> new_shape=self.size)
>>>>>>>>>>
>>>>>>>>>> # Convert to float32 and normalize to [0,1]
>>>>>>>>>> x = lb.astype(np.float32) / 255.0
>>>>>>>>>>
>>>>>>>>>> # Convert to CHW format
>>>>>>>>>> x = np.transpose(x, (2, 0, 1))
>>>>>>>>>>
>>>>>>>>>> # Add batch dimension
>>>>>>>>>> batched_img = np.expand_dims(x, axis=0)
>>>>>>>>>>
>>>>>>>>>> # Update page info
>>>>>>>>>> new_page_info['preprocessed_image'] = batched_img
>>>>>>>>>> new_page_info['letterbox_info'] = (r, left, top)
>>>>>>>>>>
>>>>>>>>>> yield (key, new_page_info)
>>>>>>>>>>
>>>>>>>>>> except Exception as e:
>>>>>>>>>> import traceback
>>>>>>>>>> logger.error(f"Error preprocessing image: {str(e)}")
>>>>>>>>>> logger.error(traceback.format_exc())
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> class ExtractBoxes(beam.DoFn):
>>>>>>>>>> """Extract bounding boxes from Triton response."""
>>>>>>>>>>
>>>>>>>>>> def __init__(self, conf_th=0.25, iou_th=0.7, model_size=1024):
>>>>>>>>>> self.conf_th = conf_th
>>>>>>>>>> self.iou_th = iou_th
>>>>>>>>>> self.model_size = model_size
>>>>>>>>>>
>>>>>>>>>> def _nms(self, boxes, scores, iou_th=0.7):
>>>>>>>>>> """Non-Maximum Suppression"""
>>>>>>>>>> if len(boxes) == 0:
>>>>>>>>>> return []
>>>>>>>>>>
>>>>>>>>>> boxes = boxes.astype(np.float32)
>>>>>>>>>> x1, y1, x2, y2 = boxes.T
>>>>>>>>>> areas = (x2 - x1) * (y2 - y1)
>>>>>>>>>> order = scores.argsort()[::-1]
>>>>>>>>>>
>>>>>>>>>> keep = []
>>>>>>>>>> while order.size > 0:
>>>>>>>>>> i = order[0]
>>>>>>>>>> keep.append(i)
>>>>>>>>>>
>>>>>>>>>> xx1 = np.maximum(x1[i], x1[order[1:]])
>>>>>>>>>> yy1 = np.maximum(y1[i], y1[order[1:]])
>>>>>>>>>> xx2 = np.minimum(x2[i], x2[order[1:]])
>>>>>>>>>> yy2 = np.minimum(y2[i], y2[order[1:]])
>>>>>>>>>>
>>>>>>>>>> w = np.maximum(0.0, xx2 - xx1)
>>>>>>>>>> h = np.maximum(0.0, yy2 - yy1)
>>>>>>>>>> inter = w * h
>>>>>>>>>>
>>>>>>>>>> iou = inter / (areas[i] + areas[order[1:]] - inter +
>>>>>>>>>> 1e-9)
>>>>>>>>>> inds = np.where(iou <= iou_th)[0]
>>>>>>>>>> order = order[inds + 1]
>>>>>>>>>>
>>>>>>>>>> return keep
>>>>>>>>>>
>>>>>>>>>> def process(self, page_info):
>>>>>>>>>> try:
>>>>>>>>>> triton_response = page_info['triton_response']
>>>>>>>>>> original_size = page_info['original_size']
>>>>>>>>>> r, left, top = page_info['letterbox_info']
>>>>>>>>>>
>>>>>>>>>> if "outputs" not in triton_response or not
>>>>>>>>>> triton_response["outputs"]:
>>>>>>>>>> logger.error("Invalid response from Triton
>>>>>>>>>> server")
>>>>>>>>>> return []
>>>>>>>>>>
>>>>>>>>>> out_meta = triton_response["outputs"][0]
>>>>>>>>>> shape = out_meta["shape"]
>>>>>>>>>> data = np.array(out_meta["data"],
>>>>>>>>>> dtype=np.float32).reshape(shape)
>>>>>>>>>>
>>>>>>>>>> logger.info(f"Output shape: {shape}")
>>>>>>>>>>
>>>>>>>>>> # For YOLO output [B, C, P] where C is channels (box
>>>>>>>>>> coords + objectness + classes)
>>>>>>>>>> B, C, P = shape
>>>>>>>>>>
>>>>>>>>>> # Assuming 4 box coordinates + class probabilities
>>>>>>>>>> (no objectness)
>>>>>>>>>> has_objectness = False
>>>>>>>>>> num_classes = C - 5 if has_objectness else C - 4
>>>>>>>>>>
>>>>>>>>>> # Extract data
>>>>>>>>>> xywh = data[:, 0:4, :]
>>>>>>>>>> if has_objectness:
>>>>>>>>>> obj = data[:, 4:5, :]
>>>>>>>>>> cls = data[:, 5:5 + num_classes, :]
>>>>>>>>>> else:
>>>>>>>>>> obj = None
>>>>>>>>>> cls = data[:, 4:4 + num_classes, :]
>>>>>>>>>>
>>>>>>>>>> # Process batch item (we only have one)
>>>>>>>>>> b = 0
>>>>>>>>>> h, w = original_size
>>>>>>>>>>
>>>>>>>>>> xywh_b = xywh[b].T # (P,4)
>>>>>>>>>> if obj is not None:
>>>>>>>>>> obj_b = obj[b].T.squeeze(1) # (P,)
>>>>>>>>>> else:
>>>>>>>>>> obj_b = np.ones((P,), dtype=np.float32)
>>>>>>>>>> cls_b = cls[b].T # (P,nc)
>>>>>>>>>>
>>>>>>>>>> # Get scores and labels
>>>>>>>>>> scores_all = (obj_b[:, None] * cls_b) if obj is not
>>>>>>>>>> None else cls_b
>>>>>>>>>> labels = scores_all.argmax(axis=1)
>>>>>>>>>> scores = scores_all.max(axis=1)
>>>>>>>>>>
>>>>>>>>>> # Filter by confidence threshold
>>>>>>>>>> keep = scores >= self.conf_th
>>>>>>>>>> if not np.any(keep):
>>>>>>>>>> logger.info(f"No detections above threshold
>>>>>>>>>> {self.conf_th}")
>>>>>>>>>> return []
>>>>>>>>>>
>>>>>>>>>> xywh_k = xywh_b[keep]
>>>>>>>>>> scores_k = scores[keep]
>>>>>>>>>> labels_k = labels[keep]
>>>>>>>>>>
>>>>>>>>>> # xywh -> xyxy in model space
>>>>>>>>>> cx, cy, ww, hh = xywh_k.T
>>>>>>>>>> xyxy_model = np.stack([cx - ww / 2, cy - hh / 2, cx +
>>>>>>>>>> ww / 2, cy + hh / 2], axis=1)
>>>>>>>>>>
>>>>>>>>>> # Apply NMS per class
>>>>>>>>>> final_boxes = []
>>>>>>>>>> final_scores = []
>>>>>>>>>> final_labels = []
>>>>>>>>>>
>>>>>>>>>> for c in np.unique(labels_k):
>>>>>>>>>> idxs = np.where(labels_k == c)[0]
>>>>>>>>>> if idxs.size == 0:
>>>>>>>>>> continue
>>>>>>>>>> keep_idx = self._nms(xyxy_model[idxs],
>>>>>>>>>> scores_k[idxs], iou_th=self.iou_th)
>>>>>>>>>> final_boxes.append(xyxy_model[idxs][keep_idx])
>>>>>>>>>> final_scores.append(scores_k[idxs][keep_idx])
>>>>>>>>>> final_labels.append(np.full(len(keep_idx), c,
>>>>>>>>>> dtype=int))
>>>>>>>>>>
>>>>>>>>>> if not final_boxes:
>>>>>>>>>> logger.info("No detections after NMS")
>>>>>>>>>> return []
>>>>>>>>>>
>>>>>>>>>> xyxy_model = np.vstack(final_boxes)
>>>>>>>>>> scores_k = np.concatenate(final_scores)
>>>>>>>>>> labels_k = np.concatenate(final_labels)
>>>>>>>>>>
>>>>>>>>>> # Map boxes from model space to original image space
>>>>>>>>>> xyxy_orig = xyxy_model.copy()
>>>>>>>>>>
>>>>>>>>>> # Remove padding
>>>>>>>>>> xyxy_orig[:, [0, 2]] -= left
>>>>>>>>>> xyxy_orig[:, [1, 3]] -= top
>>>>>>>>>>
>>>>>>>>>> # Scale back to original size
>>>>>>>>>> xyxy_orig /= r
>>>>>>>>>>
>>>>>>>>>> # Clip to image boundaries
>>>>>>>>>> xyxy_orig[:, 0::2] = np.clip(xyxy_orig[:, 0::2], 0, w
>>>>>>>>>> - 1)
>>>>>>>>>> xyxy_orig[:, 1::2] = np.clip(xyxy_orig[:, 1::2], 0, h
>>>>>>>>>> - 1)
>>>>>>>>>>
>>>>>>>>>> # Format as requested: x_min, y_min, x_max, y_max,
>>>>>>>>>> class, probability
>>>>>>>>>> boxes = []
>>>>>>>>>> for (x1, y1, x2, y2), label, score in zip(xyxy_orig,
>>>>>>>>>> labels_k, scores_k):
>>>>>>>>>> class_name = CLASS_ID_TO_NAME.get(int(label))
>>>>>>>>>> box_info = {
>>>>>>>>>> "page": page_info['page_num'],
>>>>>>>>>> "x_min": float(x1),
>>>>>>>>>> "y_min": float(y1),
>>>>>>>>>> "x_max": float(x2),
>>>>>>>>>> "y_max": float(y2),
>>>>>>>>>> "class": int(label),
>>>>>>>>>> "class_name": class_name,
>>>>>>>>>> "probability": float(score),
>>>>>>>>>> "filename": page_info['filename'],
>>>>>>>>>> "local_path": page_info['local_path'],
>>>>>>>>>> "gcs_uri": page_info['gcs_uri']
>>>>>>>>>> }
>>>>>>>>>> boxes.append(box_info)
>>>>>>>>>>
>>>>>>>>>> logger.info(f"Extracted {len(boxes)} boxes from page
>>>>>>>>>> {page_info['page_num']}")
>>>>>>>>>>
>>>>>>>>>> return boxes
>>>>>>>>>>
>>>>>>>>>> except Exception as e:
>>>>>>>>>> logger.error(f"Error extracting boxes: {str(e)}")
>>>>>>>>>> return []
>>>>>>>>>>
>>>>>>>>>> class PrepareForBigQuery(beam.DoFn):
>>>>>>>>>> """Prepare data for BigQuery insertion."""
>>>>>>>>>>
>>>>>>>>>> def process(self, box_info):
>>>>>>>>>> try:
>>>>>>>>>> # Generate UUIDs for primary keys
>>>>>>>>>> v_note_id = str(uuid.uuid4())
>>>>>>>>>> page_ocr_id = str(uuid.uuid4())
>>>>>>>>>> class_prediction_id = str(uuid.uuid4())
>>>>>>>>>>
>>>>>>>>>> # Create timestamp
>>>>>>>>>> processing_time =
>>>>>>>>>> datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
>>>>>>>>>>
>>>>>>>>>> # Create ocr_results row
>>>>>>>>>> ocr_results_row = {
>>>>>>>>>> "v_note_id": v_note_id,
>>>>>>>>>> "filename": box_info['filename'],
>>>>>>>>>> "file_path": box_info['gcs_uri'],
>>>>>>>>>> "processing_time": processing_time,
>>>>>>>>>> "file_type": "pdf"
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> # Create page_ocr row
>>>>>>>>>> page_ocr_row = {
>>>>>>>>>> "page_ocr_id": page_ocr_id,
>>>>>>>>>> "v_note_id": v_note_id,
>>>>>>>>>> "page_number": box_info['page']
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> # Create class_prediction row
>>>>>>>>>> class_prediction_row = {
>>>>>>>>>> "class_prediction_id": class_prediction_id,
>>>>>>>>>> "page_ocr_id": page_ocr_id,
>>>>>>>>>> "xmin": box_info['x_min'],
>>>>>>>>>> "ymin": box_info['y_min'],
>>>>>>>>>> "xmax": box_info['x_max'],
>>>>>>>>>> "ymax": box_info['y_max'],
>>>>>>>>>> "class": box_info['class_name'] if
>>>>>>>>>> box_info['class_name'] else str(box_info['class']),
>>>>>>>>>> "confidence": box_info['probability']
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> # Return all three rows with table names
>>>>>>>>>> return [
>>>>>>>>>> ('ocr_results', ocr_results_row),
>>>>>>>>>> ('page_ocr', page_ocr_row),
>>>>>>>>>> ('class_prediction', class_prediction_row)
>>>>>>>>>> ]
>>>>>>>>>>
>>>>>>>>>> except Exception as e:
>>>>>>>>>> logger.error(f"Error preparing for BigQuery:
>>>>>>>>>> {str(e)}")
>>>>>>>>>> return []
>>>>>>>>>>
>>>>>>>>>> model_handler = TensorRTEngineHandlerNumPy(
>>>>>>>>>> min_batch_size=1,
>>>>>>>>>> max_batch_size=1,
>>>>>>>>>> engine_path="gs://temp/yolov11l-doclaynet.engine",
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> with beam.Pipeline(options=options) as pipeline:
>>>>>>>>>>
>>>>>>>>>> # Create PCollection from input URIs
>>>>>>>>>> pdf_uris = (
>>>>>>>>>> pipeline
>>>>>>>>>> | "Create URIs" >> beam.Create(["tmp.pdf"])
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>> # Download PDFs
>>>>>>>>>> local_pdfs = (
>>>>>>>>>> pdf_uris
>>>>>>>>>> | "Download PDFs" >> beam.ParDo(DownloadPDFFromGCS())
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>> # Load PDF pages
>>>>>>>>>> pdf_pages = (
>>>>>>>>>> local_pdfs
>>>>>>>>>> | "Load PDF Pages" >> beam.ParDo(LoadPDFPages())
>>>>>>>>>> #| "Flatten Pages" >> beam.FlatMap(lambda x: x)
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>> # Preprocess images
>>>>>>>>>> preprocessed_pages = (
>>>>>>>>>> pdf_pages
>>>>>>>>>> | "Preprocess Images" >> beam.ParDo(PreprocessImage())
>>>>>>>>>> )
>>>>>>>>>> inference_results = (
>>>>>>>>>> preprocessed_pages
>>>>>>>>>> | "Run Inference" >>
>>>>>>>>>> RunInference(model_handler=model_handler)
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>> On Tue, 16 Sept 2025 at 21:23, XQ Hu <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you share your commands and outputs?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Sep 16, 2025 at 9:02 PM Sai Shashank <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Okay I have changed the docker image but to now to RUN the
>>>>>>>>>>>> python command but it is still halting without are error or
>>>>>>>>>>>> warnings or
>>>>>>>>>>>> errors
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, 16 Sept 2025 at 17:38, XQ Hu via dev <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The CMD is not necessary as it will be overridden by the
>>>>>>>>>>>>> ENTRYPOINT just like your comment.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you ssh to your Docker container like `docker run --rm -it
>>>>>>>>>>>>> --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE`, can you run
>>>>>>>>>>>>> python and
>>>>>>>>>>>>> some Beam pipelines with a direct runner in the container? This
>>>>>>>>>>>>> can help
>>>>>>>>>>>>> test the environment works fine.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have one old Dockerfile that used to work with the old Beam:
>>>>>>>>>>>>> https://github.com/google/dataflow-ml-starter/blob/main/tensor_rt.Dockerfile
>>>>>>>>>>>>> .
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Sep 16, 2025 at 4:56 PM Sai Shashank <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ---------- Forwarded message ---------
>>>>>>>>>>>>>> From: Sai Shashank <[email protected]>
>>>>>>>>>>>>>> Date: Tue, Sep 16, 2025 at 4:27 PM
>>>>>>>>>>>>>> Subject: TensorRT inference not starting
>>>>>>>>>>>>>> To: <[email protected]>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey Everyone,
>>>>>>>>>>>>>> I was trying to use tensorRT within
>>>>>>>>>>>>>> the apache beam on dataflow but somehow , dataflow didn't start
>>>>>>>>>>>>>> like it did
>>>>>>>>>>>>>> not even give me Worker logs. Below is the docker file that ,
>>>>>>>>>>>>>> use to create
>>>>>>>>>>>>>> a custom image, at first I thought it is the version mismatched
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>> usually it gives me a harness error .
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ARG BUILD_IMAGE=nvcr.io/nvidia/tensorrt:25.08-py3
>>>>>>>>>>>>>> FROM ${BUILD_IMAGE}
>>>>>>>>>>>>>> ENV PATH="/usr/src/tensorrt/bin:${PATH}"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> WORKDIR /workspace
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> RUN apt-get update -y && apt-get install -y python3-venv
>>>>>>>>>>>>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.67.0
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> COPY --from=apache/beam_python3.10_sdk:2.67.0
>>>>>>>>>>>>>> /opt/apache/beam /opt/apache/beam
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> # Install additional dependencies
>>>>>>>>>>>>>> RUN pip install --upgrade pip \
>>>>>>>>>>>>>> && pip install torch \
>>>>>>>>>>>>>> && pip install torchvision \
>>>>>>>>>>>>>> && pip install pillow>=8.0.0 \
>>>>>>>>>>>>>> && pip install transformers>=4.18.0 \
>>>>>>>>>>>>>> && pip install cuda-python \
>>>>>>>>>>>>>> && pip install opencv-python==4.7.0.72 \
>>>>>>>>>>>>>> && pip install PyMuPDF==1.22.5 \
>>>>>>>>>>>>>> && pip install requests==2.31.0
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> # Set the default command to run the inference script
>>>>>>>>>>>>>> # This will be overridden by the Apache Beam boot script
>>>>>>>>>>>>>> CMD ["python", "/workspace/inference.py"]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> # Use the Apache Beam boot script as the entrypoint
>>>>>>>>>>>>>> ENTRYPOINT ["/opt/apache/beam/boot"]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>