We are following this through the GCP support case now.

On Tue, Sep 16, 2025 at 9:54 PM Sai Shashank <[email protected]>
wrote:

> The code works without the custom image of TensorRT, I will only get this:
>
> ject to benefit from faster worker startup and autoscaling. If you experience 
> container-startup related issues, pass the "disable_image_streaming" 
> experiment to disable image streaming for the job.
> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:10.327Z: 
> JOB_MESSAGE_BASIC: Worker configuration: g2-standard-4 in us-east4-c.
> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.480Z: 
> JOB_MESSAGE_BASIC: Executing operation [10]: Create URIs/Impulse+[10]: Create 
> URIs/FlatMap(<lambda at core.py:3994>)+[10]: Create URIs/Map(decode)+[10]: 
> Download PDFs+[10]: Load PDF Pages+[10]: Preprocess Images+[10]: Run 
> Inference/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)+[10]: Run 
> Inference/BeamML_RunInference
> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.543Z: 
> JOB_MESSAGE_BASIC: Starting 1 workers in us-east4...
> INFO:apache_beam.runners.dataflow.dataflow_runner:Job 
> 2025-09-16_17_52_06-10817935125972705087 is in state JOB_STATE_RUNNING
> WARNING:google_auth_httplib2:httplib2 transport does not support per-request 
> timeout. Set the timeout when constructing the httplib2.Http instance.
> WARNING:google_auth_httplib2:httplib2 transport does not support per-request 
> timeout. Set the timeout when constructing the httplib2.Http instance.
> WARNING:google_auth_httplib2:httplib2 transport does not support per-request 
> timeout. Set the timeout when constructing the httplib2.Http instance.
> WARNING:google_auth_httplib2:httplib2 transport does not support per-request 
> timeout. Set the timeout when constructing the httplib2.Http instance.
>
> WARNING:google_auth_httplib2:httplib2 transport does not support
> per-request timeout. Set the timeout when constructing the httplib2.Http
> instance.
>
> just recurring and after an hour this message pops up  Workflow failed.
> Causes: The Dataflow job appears to be stuck because no worker activity has
> been seen in the last 1h. For more information, see
> https://cloud.google.com/dataflow/docs/guides/common-errors#error-syncing-pod.
> You can also get help with Cloud Dataflow at
> https://cloud.google.com/dataflow/support.
>
>
> On Tue, 16 Sept 2025 at 21:35, XQ Hu <[email protected]> wrote:
>
>> Does the code work without using  TensorRT? Any logs?
>>
>> On Tue, Sep 16, 2025 at 9:28 PM Sai Shashank <[email protected]>
>> wrote:
>>
>>> import apache_beam as beam
>>> from apache_beam.ml.inference.tensorrt_inference import
>>> TensorRTEngineHandlerNumPy
>>> from apache_beam.ml.inference.base import RunInference
>>>
>>> #!/usr/bin/env python3
>>> """
>>> Apache Beam pipeline for processing PDFs with Triton server and saving
>>> results to BigQuery.
>>> This pipeline combines functionality from test_triton_document.py,
>>> create_bigquery_tables.py,
>>> and save_to_bigquery.py into a single workflow.
>>> """
>>>
>>> import os
>>> import sys
>>> import json
>>> import uuid
>>> import argparse
>>> import logging
>>> import tempfile
>>> import datetime
>>> import requests
>>> import numpy as np
>>> import cv2
>>> from PIL import Image
>>> import fitz  # PyMuPDF
>>> from pathlib import Path
>>> from typing import Dict, List, Tuple, Any, Optional, Iterator
>>>
>>> # Apache Beam imports
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions,
>>> SetupOptions
>>> from apache_beam.ml.inference.base import RemoteModelHandler,
>>> PredictionResult
>>> from apache_beam.ml.inference.utils import _convert_to_result
>>> from apache_beam.ml.inference.base import RunInference
>>> from apache_beam.io.gcp.bigquery import WriteToBigQuery
>>> from apache_beam.io.filesystems import FileSystems
>>> from apache_beam.io.gcp.gcsio import GcsIO
>>>
>>> # Google Cloud imports
>>> from google.cloud import storage
>>> from google.cloud import bigquery
>>>
>>> # Set up logging
>>> logging.basicConfig(level=logging.INFO)
>>> logger = logging.getLogger(__name__)
>>>
>>> # DocLayNet classes
>>> CLASS_ID_TO_NAME = {
>>>     0: 'Caption',
>>>     1: 'Footnote',
>>>     2: 'Formula',
>>>     3: 'List-item',
>>>     4: 'Page-footer',
>>>     5: 'Page-header',
>>>     6: 'Picture',
>>>     7: 'Section-header',
>>>     8: 'Table',
>>>     9: 'Text',
>>>     10: 'Title'
>>> }
>>> class DownloadPDFFromGCS(beam.DoFn):
>>>     """Download a PDF from Google Cloud Storage."""
>>>
>>>     def __init__(self, temp_dir=None):
>>>         self.temp_dir = temp_dir or tempfile.gettempdir()
>>>
>>>     def process(self, gcs_uri):
>>>         try:
>>>             # Parse GCS URI
>>>             if not gcs_uri.startswith("gs://"):
>>>                 raise ValueError(f"Invalid GCS URI: {gcs_uri}")
>>>
>>>             # Remove gs:// prefix and split into bucket and blob path
>>>             path_parts = gcs_uri[5:].split("/", 1)
>>>             bucket_name = path_parts[0]
>>>             blob_path = path_parts[1]
>>>
>>>             # Get filename from blob path
>>>             filename = os.path.basename(blob_path)
>>>             local_path = os.path.join(self.temp_dir, filename)
>>>
>>>             # Create temp directory if it doesn't exist
>>>             os.makedirs(self.temp_dir, exist_ok=True)
>>>
>>>             try:
>>>                 # Download using Beam's GcsIO
>>>                 with FileSystems.open(gcs_uri, 'rb') as gcs_file:
>>>                     with open(local_path, 'wb') as local_file:
>>>                         local_file.write(gcs_file.read())
>>>
>>>                 logger.info(f"Downloaded {gcs_uri} to {local_path}")
>>>
>>>                 # Return a dictionary with the local path and original
>>> URI
>>>                 yield {
>>>                     'local_path': local_path,
>>>                     'gcs_uri': gcs_uri,
>>>                     'filename': filename
>>>                 }
>>>             except Exception as e:
>>>                 logger.error(f"Error reading from GCS: {str(e)}")
>>>                 # Try alternative download method
>>>                 logger.info(f"Trying alternative download method for
>>> {gcs_uri}")
>>>
>>>                 # For testing with local files
>>>                 if os.path.exists(gcs_uri.replace("gs://", "")):
>>>                     local_path = gcs_uri.replace("gs://", "")
>>>                     logger.info(f"Using local file: {local_path}")
>>>                     yield {
>>>                         'local_path': local_path,
>>>                         'gcs_uri': gcs_uri,
>>>                         'filename': os.path.basename(local_path)
>>>                     }
>>>                 else:
>>>                     # Try using gsutil command
>>>                     import subprocess
>>>                     try:
>>>                         subprocess.run(["gsutil", "cp", gcs_uri,
>>> local_path], check=True)
>>>                         logger.info(f"Downloaded {gcs_uri} to
>>> {local_path} using gsutil")
>>>                         yield {
>>>                             'local_path': local_path,
>>>                             'gcs_uri': gcs_uri,
>>>                             'filename': filename
>>>                         }
>>>                     except Exception as e2:
>>>                         logger.error(f"Failed to download using gsutil:
>>> {str(e2)}")
>>>
>>>         except Exception as e:
>>>             logger.error(f"Error downloading {gcs_uri}: {str(e)}")
>>> class LoadPDFPages(beam.DoFn):
>>>     """Load PDF pages as images."""
>>>
>>>     def __init__(self, dpi=200):
>>>         self.dpi = dpi
>>>
>>>     def process(self, element):
>>>         doc = None
>>>         try:
>>>             # Make sure we have all required fields
>>>             if not isinstance(element, dict):
>>>                 logger.error(f"Expected dictionary, got {type(element)}")
>>>                 return
>>>
>>>             if 'local_path' not in element:
>>>                 logger.error("Missing 'local_path' in element")
>>>                 return
>>>
>>>             local_path = element['local_path']
>>>             gcs_uri = element.get('gcs_uri', '')
>>>
>>>             # Extract filename from local_path if not provided
>>>             filename = element.get('filename',
>>> os.path.basename(local_path))
>>>
>>>             logger.info(f"Loading PDF: {local_path}, filename:
>>> {filename}")
>>>
>>>             # Check if file exists and is accessible
>>>             if not os.path.exists(local_path):
>>>                 logger.error(f"File not found: {local_path}")
>>>                 return
>>>
>>>             if not os.access(local_path, os.R_OK):
>>>                 logger.error(f"File not readable: {local_path}")
>>>                 return
>>>
>>>             # Open the PDF
>>>             try:
>>>                 doc = fitz.open(local_path)
>>>                 if doc.is_closed:
>>>                     logger.error(f"Failed to open PDF: {local_path}")
>>>                     return
>>>             except Exception as e:
>>>                 logger.error(f"Error opening PDF {local_path}: {str(e)}")
>>>                 return
>>>
>>>             # Process each page
>>>             page_count = len(doc)
>>>             logger.info(f"Processing {page_count} pages from
>>> {local_path}")
>>>
>>>             for i in range(page_count):
>>>                 try:
>>>                     if doc.is_closed:
>>>                         logger.error(f"Document was closed unexpectedly
>>> while processing page {i}")
>>>                         break
>>>
>>>                     page = doc[i]
>>>                     if page is None:
>>>                         logger.error(f"Failed to get page {i} from
>>> document")
>>>                         continue
>>>
>>>                     # Use a higher resolution for better quality
>>>                     scale = self.dpi / 72.0
>>>                     mat = fitz.Matrix(scale, scale)
>>>
>>>                     try:
>>>                         pix = page.get_pixmap(matrix=mat, alpha=False)
>>>                     except Exception as e:
>>>                         logger.error(f"Error getting pixmap for page
>>> {i}: {str(e)}")
>>>                         continue
>>>
>>>                     # Check pixmap dimensions
>>>                     if pix.height <= 0 or pix.width <= 0 or pix.n <= 0:
>>>                         logger.error(f"Invalid pixmap dimensions:
>>> {pix.width}x{pix.height}x{pix.n}")
>>>                         continue
>>>
>>>                     # Convert to numpy array
>>>                     try:
>>>                         arr = np.frombuffer(pix.samples,
>>> dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
>>>                     except Exception as e:
>>>                         logger.error(f"Error converting pixmap to numpy
>>> array: {str(e)}")
>>>                         continue
>>>
>>>                     # Convert BGR to RGB if needed
>>>                     if pix.n == 3:  # RGB
>>>                         try:
>>>                             arr = cv2.cvtColor(arr, cv2.COLOR_BGR2RGB)
>>>                         except Exception as e:
>>>                             logger.error(f"Error converting BGR to RGB:
>>> {str(e)}")
>>>                             continue
>>>
>>>                     # Store original size for later use
>>>                     original_size = (arr.shape[0], arr.shape[1])
>>>
>>>                     # Create page info
>>>                     page_info = {
>>>                         'page_num': i,
>>>                         'image': arr,
>>>                         'original_size': original_size,
>>>                         'local_path': local_path,
>>>                         'gcs_uri': gcs_uri,
>>>                         'filename': filename
>>>                     }
>>>
>>>                     # Use document ID and page number as key
>>>                     doc_id = os.path.splitext(filename)[0]
>>>                     key = f"{doc_id}_{i}"
>>>
>>>                     yield (key, page_info)
>>>                 except Exception as e:
>>>                     import traceback
>>>                     logger.error(f"Error processing page {i}: {str(e)}")
>>>                     logger.error(traceback.format_exc())
>>>
>>>             logger.info(f"Loaded {len(doc)} pages from {local_path}")
>>>
>>>         except Exception as e:
>>>             import traceback
>>>             logger.error(f"Error loading PDF: {str(e)}")
>>>             logger.error(traceback.format_exc())
>>>         finally:
>>>             # Make sure to close the document only if it was
>>> successfully opened
>>>             if doc is not None:
>>>                 try:
>>>                     if not doc.is_closed:
>>>                         doc.close()
>>>                 except Exception as e:
>>>                     logger.debug(f"Error closing document: {str(e)}")
>>>
>>> class PreprocessImage(beam.DoFn):
>>>     """Preprocess image for Triton server."""
>>>
>>>     def __init__(self, size=1024):
>>>         self.size = size
>>>
>>>     def letterbox(self, img, new_shape=1024, color=(114,114,114)):
>>>         """Resize and pad image to target size."""
>>>         h, w = img.shape[:2]
>>>         r = min(new_shape / h, new_shape / w)
>>>         nh, nw = int(round(h * r)), int(round(w * r))
>>>         pad_h, pad_w = new_shape - nh, new_shape - nw
>>>         top = pad_h // 2
>>>         bottom = pad_h - top
>>>         left = pad_w // 2
>>>         right = pad_w - left
>>>         img = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR)
>>>         img = cv2.copyMakeBorder(img, top, bottom, left, right,
>>> cv2.BORDER_CONSTANT, value=color)
>>>         return img, r, left, top
>>>
>>>     def process(self, element):
>>>         try:
>>>             if not isinstance(element, tuple) or len(element) != 2:
>>>                 logger.error(f"Expected (key, value) tuple, got
>>> {type(element)}")
>>>                 return
>>>
>>>             key, page_info = element
>>>
>>>             if not isinstance(page_info, dict):
>>>                 logger.error(f"Expected dictionary for page_info, got
>>> {type(page_info)}")
>>>                 return
>>>
>>>             if 'image' not in page_info:
>>>                 logger.error("Missing 'image' in page_info")
>>>                 return
>>>
>>>             # Create a new dictionary to avoid modifying the input
>>>             new_page_info = dict(page_info)
>>>
>>>             # Apply letterbox resize
>>>             img = new_page_info['image']
>>>             lb, r, left, top = self.letterbox(img, new_shape=self.size)
>>>
>>>             # Convert to float32 and normalize to [0,1]
>>>             x = lb.astype(np.float32) / 255.0
>>>
>>>             # Convert to CHW format
>>>             x = np.transpose(x, (2, 0, 1))
>>>
>>>             # Add batch dimension
>>>             batched_img = np.expand_dims(x, axis=0)
>>>
>>>             # Update page info
>>>             new_page_info['preprocessed_image'] = batched_img
>>>             new_page_info['letterbox_info'] = (r, left, top)
>>>
>>>             yield (key, new_page_info)
>>>
>>>         except Exception as e:
>>>             import traceback
>>>             logger.error(f"Error preprocessing image: {str(e)}")
>>>             logger.error(traceback.format_exc())
>>>
>>>
>>>
>>> class ExtractBoxes(beam.DoFn):
>>>     """Extract bounding boxes from Triton response."""
>>>
>>>     def __init__(self, conf_th=0.25, iou_th=0.7, model_size=1024):
>>>         self.conf_th = conf_th
>>>         self.iou_th = iou_th
>>>         self.model_size = model_size
>>>
>>>     def _nms(self, boxes, scores, iou_th=0.7):
>>>         """Non-Maximum Suppression"""
>>>         if len(boxes) == 0:
>>>             return []
>>>
>>>         boxes = boxes.astype(np.float32)
>>>         x1, y1, x2, y2 = boxes.T
>>>         areas = (x2 - x1) * (y2 - y1)
>>>         order = scores.argsort()[::-1]
>>>
>>>         keep = []
>>>         while order.size > 0:
>>>             i = order[0]
>>>             keep.append(i)
>>>
>>>             xx1 = np.maximum(x1[i], x1[order[1:]])
>>>             yy1 = np.maximum(y1[i], y1[order[1:]])
>>>             xx2 = np.minimum(x2[i], x2[order[1:]])
>>>             yy2 = np.minimum(y2[i], y2[order[1:]])
>>>
>>>             w = np.maximum(0.0, xx2 - xx1)
>>>             h = np.maximum(0.0, yy2 - yy1)
>>>             inter = w * h
>>>
>>>             iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-9)
>>>             inds = np.where(iou <= iou_th)[0]
>>>             order = order[inds + 1]
>>>
>>>         return keep
>>>
>>>     def process(self, page_info):
>>>         try:
>>>             triton_response = page_info['triton_response']
>>>             original_size = page_info['original_size']
>>>             r, left, top = page_info['letterbox_info']
>>>
>>>             if "outputs" not in triton_response or not
>>> triton_response["outputs"]:
>>>                 logger.error("Invalid response from Triton server")
>>>                 return []
>>>
>>>             out_meta = triton_response["outputs"][0]
>>>             shape = out_meta["shape"]
>>>             data = np.array(out_meta["data"],
>>> dtype=np.float32).reshape(shape)
>>>
>>>             logger.info(f"Output shape: {shape}")
>>>
>>>             # For YOLO output [B, C, P] where C is channels (box coords
>>> + objectness + classes)
>>>             B, C, P = shape
>>>
>>>             # Assuming 4 box coordinates + class probabilities (no
>>> objectness)
>>>             has_objectness = False
>>>             num_classes = C - 5 if has_objectness else C - 4
>>>
>>>             # Extract data
>>>             xywh = data[:, 0:4, :]
>>>             if has_objectness:
>>>                 obj = data[:, 4:5, :]
>>>                 cls = data[:, 5:5 + num_classes, :]
>>>             else:
>>>                 obj = None
>>>                 cls = data[:, 4:4 + num_classes, :]
>>>
>>>             # Process batch item (we only have one)
>>>             b = 0
>>>             h, w = original_size
>>>
>>>             xywh_b = xywh[b].T  # (P,4)
>>>             if obj is not None:
>>>                 obj_b = obj[b].T.squeeze(1)  # (P,)
>>>             else:
>>>                 obj_b = np.ones((P,), dtype=np.float32)
>>>             cls_b = cls[b].T  # (P,nc)
>>>
>>>             # Get scores and labels
>>>             scores_all = (obj_b[:, None] * cls_b) if obj is not None
>>> else cls_b
>>>             labels = scores_all.argmax(axis=1)
>>>             scores = scores_all.max(axis=1)
>>>
>>>             # Filter by confidence threshold
>>>             keep = scores >= self.conf_th
>>>             if not np.any(keep):
>>>                 logger.info(f"No detections above threshold
>>> {self.conf_th}")
>>>                 return []
>>>
>>>             xywh_k = xywh_b[keep]
>>>             scores_k = scores[keep]
>>>             labels_k = labels[keep]
>>>
>>>             # xywh -> xyxy in model space
>>>             cx, cy, ww, hh = xywh_k.T
>>>             xyxy_model = np.stack([cx - ww / 2, cy - hh / 2, cx + ww /
>>> 2, cy + hh / 2], axis=1)
>>>
>>>             # Apply NMS per class
>>>             final_boxes = []
>>>             final_scores = []
>>>             final_labels = []
>>>
>>>             for c in np.unique(labels_k):
>>>                 idxs = np.where(labels_k == c)[0]
>>>                 if idxs.size == 0:
>>>                     continue
>>>                 keep_idx = self._nms(xyxy_model[idxs], scores_k[idxs],
>>> iou_th=self.iou_th)
>>>                 final_boxes.append(xyxy_model[idxs][keep_idx])
>>>                 final_scores.append(scores_k[idxs][keep_idx])
>>>                 final_labels.append(np.full(len(keep_idx), c, dtype=int))
>>>
>>>             if not final_boxes:
>>>                 logger.info("No detections after NMS")
>>>                 return []
>>>
>>>             xyxy_model = np.vstack(final_boxes)
>>>             scores_k = np.concatenate(final_scores)
>>>             labels_k = np.concatenate(final_labels)
>>>
>>>             # Map boxes from model space to original image space
>>>             xyxy_orig = xyxy_model.copy()
>>>
>>>             # Remove padding
>>>             xyxy_orig[:, [0, 2]] -= left
>>>             xyxy_orig[:, [1, 3]] -= top
>>>
>>>             # Scale back to original size
>>>             xyxy_orig /= r
>>>
>>>             # Clip to image boundaries
>>>             xyxy_orig[:, 0::2] = np.clip(xyxy_orig[:, 0::2], 0, w - 1)
>>>             xyxy_orig[:, 1::2] = np.clip(xyxy_orig[:, 1::2], 0, h - 1)
>>>
>>>             # Format as requested: x_min, y_min, x_max, y_max, class,
>>> probability
>>>             boxes = []
>>>             for (x1, y1, x2, y2), label, score in zip(xyxy_orig,
>>> labels_k, scores_k):
>>>                 class_name = CLASS_ID_TO_NAME.get(int(label))
>>>                 box_info = {
>>>                     "page": page_info['page_num'],
>>>                     "x_min": float(x1),
>>>                     "y_min": float(y1),
>>>                     "x_max": float(x2),
>>>                     "y_max": float(y2),
>>>                     "class": int(label),
>>>                     "class_name": class_name,
>>>                     "probability": float(score),
>>>                     "filename": page_info['filename'],
>>>                     "local_path": page_info['local_path'],
>>>                     "gcs_uri": page_info['gcs_uri']
>>>                 }
>>>                 boxes.append(box_info)
>>>
>>>             logger.info(f"Extracted {len(boxes)} boxes from page
>>> {page_info['page_num']}")
>>>
>>>             return boxes
>>>
>>>         except Exception as e:
>>>             logger.error(f"Error extracting boxes: {str(e)}")
>>>             return []
>>>
>>> class PrepareForBigQuery(beam.DoFn):
>>>     """Prepare data for BigQuery insertion."""
>>>
>>>     def process(self, box_info):
>>>         try:
>>>             # Generate UUIDs for primary keys
>>>             v_note_id = str(uuid.uuid4())
>>>             page_ocr_id = str(uuid.uuid4())
>>>             class_prediction_id = str(uuid.uuid4())
>>>
>>>             # Create timestamp
>>>             processing_time = datetime.datetime.now().strftime("%Y-%m-%d
>>> %H:%M:%S")
>>>
>>>             # Create ocr_results row
>>>             ocr_results_row = {
>>>                 "v_note_id": v_note_id,
>>>                 "filename": box_info['filename'],
>>>                 "file_path": box_info['gcs_uri'],
>>>                 "processing_time": processing_time,
>>>                 "file_type": "pdf"
>>>             }
>>>
>>>             # Create page_ocr row
>>>             page_ocr_row = {
>>>                 "page_ocr_id": page_ocr_id,
>>>                 "v_note_id": v_note_id,
>>>                 "page_number": box_info['page']
>>>             }
>>>
>>>             # Create class_prediction row
>>>             class_prediction_row = {
>>>                 "class_prediction_id": class_prediction_id,
>>>                 "page_ocr_id": page_ocr_id,
>>>                 "xmin": box_info['x_min'],
>>>                 "ymin": box_info['y_min'],
>>>                 "xmax": box_info['x_max'],
>>>                 "ymax": box_info['y_max'],
>>>                 "class": box_info['class_name'] if
>>> box_info['class_name'] else str(box_info['class']),
>>>                 "confidence": box_info['probability']
>>>             }
>>>
>>>             # Return all three rows with table names
>>>             return [
>>>                 ('ocr_results', ocr_results_row),
>>>                 ('page_ocr', page_ocr_row),
>>>                 ('class_prediction', class_prediction_row)
>>>             ]
>>>
>>>         except Exception as e:
>>>             logger.error(f"Error preparing for BigQuery: {str(e)}")
>>>             return []
>>>
>>> model_handler = TensorRTEngineHandlerNumPy(
>>>   min_batch_size=1,
>>>   max_batch_size=1,
>>>   engine_path="gs://temp/yolov11l-doclaynet.engine",
>>> )
>>>
>>>
>>> with beam.Pipeline(options=options) as pipeline:
>>>
>>>         # Create PCollection from input URIs
>>>         pdf_uris = (
>>>             pipeline
>>>             | "Create URIs" >> beam.Create(["tmp.pdf"])
>>>         )
>>>
>>>         # Download PDFs
>>>         local_pdfs = (
>>>             pdf_uris
>>>             | "Download PDFs" >> beam.ParDo(DownloadPDFFromGCS())
>>>         )
>>>
>>>          # Load PDF pages
>>>         pdf_pages = (
>>>             local_pdfs
>>>             | "Load PDF Pages" >> beam.ParDo(LoadPDFPages())
>>>             #| "Flatten Pages" >> beam.FlatMap(lambda x: x)
>>>         )
>>>
>>>         # Preprocess images
>>>         preprocessed_pages = (
>>>             pdf_pages
>>>             | "Preprocess Images" >> beam.ParDo(PreprocessImage())
>>>         )
>>>         inference_results = (
>>>             preprocessed_pages
>>>             | "Run Inference" >>
>>> RunInference(model_handler=model_handler)
>>>         )
>>>
>>> On Tue, 16 Sept 2025 at 21:23, XQ Hu <[email protected]> wrote:
>>>
>>>> Can you share your commands and outputs?
>>>>
>>>> On Tue, Sep 16, 2025 at 9:02 PM Sai Shashank <[email protected]>
>>>> wrote:
>>>>
>>>>> Okay I have changed the docker image but  to now to RUN the python
>>>>> command but it is still halting without are error or warnings or errors
>>>>>
>>>>> On Tue, 16 Sept 2025 at 17:38, XQ Hu via dev <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> The CMD is not necessary as it will be overridden by the ENTRYPOINT
>>>>>> just like your comment.
>>>>>>
>>>>>> If you ssh to your Docker container like `docker run --rm -it
>>>>>> --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE`, can you run python and
>>>>>> some Beam pipelines with a direct runner in the container? This can help
>>>>>> test the environment works fine.
>>>>>>
>>>>>> I have one old Dockerfile that used to work with the old Beam:
>>>>>> https://github.com/google/dataflow-ml-starter/blob/main/tensor_rt.Dockerfile
>>>>>> .
>>>>>>
>>>>>> On Tue, Sep 16, 2025 at 4:56 PM Sai Shashank <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ---------- Forwarded message ---------
>>>>>>> From: Sai Shashank <[email protected]>
>>>>>>> Date: Tue, Sep 16, 2025 at 4:27 PM
>>>>>>> Subject: TensorRT inference not starting
>>>>>>> To: <[email protected]>
>>>>>>>
>>>>>>>
>>>>>>> Hey Everyone,
>>>>>>>                          I was trying to use tensorRT within the
>>>>>>> apache beam on dataflow but somehow , dataflow didn't start like it did 
>>>>>>> not
>>>>>>> even give me Worker logs. Below is the docker file that , use to create 
>>>>>>> a
>>>>>>> custom  image, at first I thought it is the version mismatched but 
>>>>>>> usually
>>>>>>> it gives me a harness error .
>>>>>>>
>>>>>>> ARG BUILD_IMAGE=nvcr.io/nvidia/tensorrt:25.08-py3
>>>>>>> FROM ${BUILD_IMAGE}
>>>>>>> ENV PATH="/usr/src/tensorrt/bin:${PATH}"
>>>>>>>
>>>>>>> WORKDIR /workspace
>>>>>>>
>>>>>>> RUN apt-get update -y && apt-get install -y python3-venv
>>>>>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.67.0
>>>>>>>
>>>>>>> COPY --from=apache/beam_python3.10_sdk:2.67.0 /opt/apache/beam
>>>>>>> /opt/apache/beam
>>>>>>>
>>>>>>> # Install additional dependencies
>>>>>>> RUN pip install --upgrade pip \
>>>>>>>     && pip install torch \
>>>>>>>     && pip install torchvision \
>>>>>>>     && pip install pillow>=8.0.0 \
>>>>>>>     && pip install transformers>=4.18.0 \
>>>>>>>     && pip install cuda-python \
>>>>>>>     && pip install opencv-python==4.7.0.72 \
>>>>>>>     && pip install PyMuPDF==1.22.5 \
>>>>>>>     && pip install requests==2.31.0
>>>>>>>
>>>>>>> # Set the default command to run the inference script
>>>>>>> # This will be overridden by the Apache Beam boot script
>>>>>>> CMD ["python", "/workspace/inference.py"]
>>>>>>>
>>>>>>> # Use the Apache Beam boot script as the entrypoint
>>>>>>> ENTRYPOINT ["/opt/apache/beam/boot"]
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to