So, can I make an issue and raise PR for the updated TensorRT handler? On Sat, 20 Sept 2025 at 08:46, XQ Hu <[email protected]> wrote:
> This is what Beam tests use > https://github.com/apache/beam/blob/master/sdks/python/test-suites/containers/tensorrt_runinference/tensor_rt.dockerfile#L17 > > nvcr.io/nvidia/tensorrt:23.05-py3 > > From the latest doc: > https://docs.nvidia.com/deeplearning/tensorrt/latest/_static/python-api/infer/Core/Engine.html#icudaengine > and https://github.com/NVIDIA/TensorRT/issues/4216, num_io_tensors should > be used now. > > You can either use the tensorRT version Beam supports now or you can > define your own TensorRTEngine model handler with the new tensorRT. > > > On Sat, Sep 20, 2025 at 1:10 AM Sai Shashank <[email protected]> > wrote: > >> So finally, I was able to resolve the issue of docker image but now, I >> saw this error ^^^^^^^^^^^^^^^^^^^^^^ >> File >> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/tensorrt_inference.py", >> line 132, in __init__ >> for i in range(self.engine.num_bindings): >> ^^^^^^^^^^^^^^^^^^^^^^^^ >> AttributeError: 'tensorrt.tensorrt.ICudaEngine' object has no attribute >> 'num_bindings' >> Traceback (most recent call last): >> File "apache_beam/runners/common.py", line 1562, in >> apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method >> File "apache_beam/runners/common.py", line 602, in >> apache_beam.runners.common.DoFnInvoker.invoke_setup >> File >> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py", >> line 1882, in setup >> self._model = self._load_model() >> ^^^^^^^^^^^^^^^^^^ >> File >> "/usr/local/lib/python3.12/dist-packages/apache_beam/ml/inference/base.py", >> line 1848, in _load_model >> model = self._shared_model_handle.acquire(load, tag=self._cur_tag) >> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ >> >> I have seen this error where the tensorrt version is older than 10.x . >> Is there any undated tensorRT handler , or am I doing something wrong ? >> >> On Fri, 19 Sept 2025 at 09:15, XQ Hu <[email protected]> wrote: >> >>> GPU driver: DRIVER_VERSION=535.261.03 >>> From the log, the driver was installed correctly (make sure this can be >>> used for your tensor RT.) >>> >>> "Error syncing pod, skipping" err="failed to \"StartContainer\" for >>> \"sdk-0-0\" with ErrImagePull: \"failed to pull and unpack image \\\" >>> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\\\ >>> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C%5C%5C>": >>> failed to extract layer >>> sha256:a848022a4558c435b349317630b139960b44ae09f218ab7f93f764ba4661607d: >>> write >>> /var/lib/containerd/io.containerd.snapshotter.v1.gcfs/snapshotter/snapshots/55/fs/usr/local/cuda-13.0/targets/x86_64-linux/lib/libcusparseLt.so.0.8.0.4: >>> no space left on device: unknown\"" >>> pod="default/df-inference-pipeline-4-09182012-1a3n-harness-rvxc" >>> podUID="3a9b74645db23e77932d981439f1d3cc" >>> >>> The Dataflow worker cannot unpack the image: no space left on device >>> >>> Try >>> https://cloud.google.com/dataflow/docs/guides/configure-worker-vm#disk-size >>> >>> >>> >>> >>> On Thu, Sep 18, 2025 at 11:27 PM Sai Shashank <[email protected]> >>> wrote: >>> >>>> So, I was able the start, dataflow : >>>> 2025-09-18_20_12_41-10973298801093076892 >>>> , but not it having this error: 2025-09-18 23:21:25.401 EDT >>>> SDK harnesses are not healthy after 5 minutes, status: Waiting for 4 of >>>> 4 SDK Harnesses to register. I have noticed this error when there is a >>>> mismatch of environments. As advice by you I try running Direct Runner in >>>> the docker image and it was running perfectly. is there any tips which you >>>> would give me to correct this error ? >>>> >>>> >>>> On Wed, 17 Sept 2025 at 09:42, XQ Hu <[email protected]> wrote: >>>> >>>>> From the worker log, >>>>> >>>>> "Failed to read pods from URL" err="invalid pod: >>>>> [spec.containers[3].image: Invalid value: \" >>>>> us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest\ >>>>> <http://us-east4-docker.pkg.dev/anbc-dev-suspecting/suspecting-docker/tensorrt_ss:latest%5C>": >>>>> must not have leading or trailing whitespace]" >>>>> >>>>> 2025-09-16_17_52_06-10817935125972705087 >>>>> >>>>> Looks like you specify the image URL with the leading whitespace. >>>>> Remove it and give it a try. >>>>> >>>>> And if you have any further questions about GPUs, I highly recommend >>>>> you start the VM with L4 GPU and pull your image and ssh into it and run >>>>> your pipeline locally with DirectRunner. That can make sure all your code >>>>> works. >>>>> >>>>> >>>>> On Wed, Sep 17, 2025 at 9:25 AM XQ Hu <[email protected]> wrote: >>>>> >>>>>> I saw it. Let me follow up internally. >>>>>> >>>>>> On Tue, Sep 16, 2025 at 10:10 PM Sai Shashank < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I already I have open one but opening one more : case number is >>>>>>> 63121285 >>>>>>> >>>>>>> On Tue, Sep 16, 2025 at 10:05 PM XQ Hu <[email protected]> wrote: >>>>>>> >>>>>>>> Can you open a cloud support ticket? That can give us the >>>>>>>> permission to access your job. >>>>>>>> >>>>>>>> On Tue, Sep 16, 2025, 9:57 PM Sai Shashank < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hey , can we connect on my office mail? since I could share more >>>>>>>>> stuff like pipeline options and other stuff there better and I work >>>>>>>>> at CVS >>>>>>>>> so that way it would be under compliance too >>>>>>>>> >>>>>>>>> On Tue, 16 Sept 2025 at 21:54, Sai Shashank < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> The code works without the custom image of TensorRT, I will only >>>>>>>>>> get this: >>>>>>>>>> >>>>>>>>>> ject to benefit from faster worker startup and autoscaling. If you >>>>>>>>>> experience container-startup related issues, pass the >>>>>>>>>> "disable_image_streaming" experiment to disable image streaming for >>>>>>>>>> the job. >>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:10.327Z: >>>>>>>>>> JOB_MESSAGE_BASIC: Worker configuration: g2-standard-4 in >>>>>>>>>> us-east4-c. >>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.480Z: >>>>>>>>>> JOB_MESSAGE_BASIC: Executing operation [10]: Create >>>>>>>>>> URIs/Impulse+[10]: Create URIs/FlatMap(<lambda at >>>>>>>>>> core.py:3994>)+[10]: Create URIs/Map(decode)+[10]: Download >>>>>>>>>> PDFs+[10]: Load PDF Pages+[10]: Preprocess Images+[10]: Run >>>>>>>>>> Inference/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)+[10]: Run >>>>>>>>>> Inference/BeamML_RunInference >>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.543Z: >>>>>>>>>> JOB_MESSAGE_BASIC: Starting 1 workers in us-east4... >>>>>>>>>> INFO:apache_beam.runners.dataflow.dataflow_runner:Job >>>>>>>>>> 2025-09-16_17_52_06-10817935125972705087 is in state >>>>>>>>>> JOB_STATE_RUNNING >>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>> httplib2.Http instance. >>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>> httplib2.Http instance. >>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>> httplib2.Http instance. >>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>> httplib2.Http instance. >>>>>>>>>> >>>>>>>>>> WARNING:google_auth_httplib2:httplib2 transport does not support >>>>>>>>>> per-request timeout. Set the timeout when constructing the >>>>>>>>>> httplib2.Http >>>>>>>>>> instance. >>>>>>>>>> >>>>>>>>>> just recurring and after an hour this message pops up Workflow >>>>>>>>>> failed. Causes: The Dataflow job appears to be stuck because no >>>>>>>>>> worker >>>>>>>>>> activity has been seen in the last 1h. For more information, see >>>>>>>>>> https://cloud.google.com/dataflow/docs/guides/common-errors#error-syncing-pod. >>>>>>>>>> You can also get help with Cloud Dataflow at >>>>>>>>>> https://cloud.google.com/dataflow/support. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, 16 Sept 2025 at 21:35, XQ Hu <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Does the code work without using TensorRT? Any logs? >>>>>>>>>>> >>>>>>>>>>> On Tue, Sep 16, 2025 at 9:28 PM Sai Shashank < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> import apache_beam as beam >>>>>>>>>>>> from apache_beam.ml.inference.tensorrt_inference import >>>>>>>>>>>> TensorRTEngineHandlerNumPy >>>>>>>>>>>> from apache_beam.ml.inference.base import RunInference >>>>>>>>>>>> >>>>>>>>>>>> #!/usr/bin/env python3 >>>>>>>>>>>> """ >>>>>>>>>>>> Apache Beam pipeline for processing PDFs with Triton server and >>>>>>>>>>>> saving results to BigQuery. >>>>>>>>>>>> This pipeline combines functionality from >>>>>>>>>>>> test_triton_document.py, create_bigquery_tables.py, >>>>>>>>>>>> and save_to_bigquery.py into a single workflow. >>>>>>>>>>>> """ >>>>>>>>>>>> >>>>>>>>>>>> import os >>>>>>>>>>>> import sys >>>>>>>>>>>> import json >>>>>>>>>>>> import uuid >>>>>>>>>>>> import argparse >>>>>>>>>>>> import logging >>>>>>>>>>>> import tempfile >>>>>>>>>>>> import datetime >>>>>>>>>>>> import requests >>>>>>>>>>>> import numpy as np >>>>>>>>>>>> import cv2 >>>>>>>>>>>> from PIL import Image >>>>>>>>>>>> import fitz # PyMuPDF >>>>>>>>>>>> from pathlib import Path >>>>>>>>>>>> from typing import Dict, List, Tuple, Any, Optional, Iterator >>>>>>>>>>>> >>>>>>>>>>>> # Apache Beam imports >>>>>>>>>>>> import apache_beam as beam >>>>>>>>>>>> from apache_beam.options.pipeline_options import >>>>>>>>>>>> PipelineOptions, SetupOptions >>>>>>>>>>>> from apache_beam.ml.inference.base import RemoteModelHandler, >>>>>>>>>>>> PredictionResult >>>>>>>>>>>> from apache_beam.ml.inference.utils import _convert_to_result >>>>>>>>>>>> from apache_beam.ml.inference.base import RunInference >>>>>>>>>>>> from apache_beam.io.gcp.bigquery import WriteToBigQuery >>>>>>>>>>>> from apache_beam.io.filesystems import FileSystems >>>>>>>>>>>> from apache_beam.io.gcp.gcsio import GcsIO >>>>>>>>>>>> >>>>>>>>>>>> # Google Cloud imports >>>>>>>>>>>> from google.cloud import storage >>>>>>>>>>>> from google.cloud import bigquery >>>>>>>>>>>> >>>>>>>>>>>> # Set up logging >>>>>>>>>>>> logging.basicConfig(level=logging.INFO) >>>>>>>>>>>> logger = logging.getLogger(__name__) >>>>>>>>>>>> >>>>>>>>>>>> # DocLayNet classes >>>>>>>>>>>> CLASS_ID_TO_NAME = { >>>>>>>>>>>> 0: 'Caption', >>>>>>>>>>>> 1: 'Footnote', >>>>>>>>>>>> 2: 'Formula', >>>>>>>>>>>> 3: 'List-item', >>>>>>>>>>>> 4: 'Page-footer', >>>>>>>>>>>> 5: 'Page-header', >>>>>>>>>>>> 6: 'Picture', >>>>>>>>>>>> 7: 'Section-header', >>>>>>>>>>>> 8: 'Table', >>>>>>>>>>>> 9: 'Text', >>>>>>>>>>>> 10: 'Title' >>>>>>>>>>>> } >>>>>>>>>>>> class DownloadPDFFromGCS(beam.DoFn): >>>>>>>>>>>> """Download a PDF from Google Cloud Storage.""" >>>>>>>>>>>> >>>>>>>>>>>> def __init__(self, temp_dir=None): >>>>>>>>>>>> self.temp_dir = temp_dir or tempfile.gettempdir() >>>>>>>>>>>> >>>>>>>>>>>> def process(self, gcs_uri): >>>>>>>>>>>> try: >>>>>>>>>>>> # Parse GCS URI >>>>>>>>>>>> if not gcs_uri.startswith("gs://"): >>>>>>>>>>>> raise ValueError(f"Invalid GCS URI: {gcs_uri}") >>>>>>>>>>>> >>>>>>>>>>>> # Remove gs:// prefix and split into bucket and >>>>>>>>>>>> blob path >>>>>>>>>>>> path_parts = gcs_uri[5:].split("/", 1) >>>>>>>>>>>> bucket_name = path_parts[0] >>>>>>>>>>>> blob_path = path_parts[1] >>>>>>>>>>>> >>>>>>>>>>>> # Get filename from blob path >>>>>>>>>>>> filename = os.path.basename(blob_path) >>>>>>>>>>>> local_path = os.path.join(self.temp_dir, filename) >>>>>>>>>>>> >>>>>>>>>>>> # Create temp directory if it doesn't exist >>>>>>>>>>>> os.makedirs(self.temp_dir, exist_ok=True) >>>>>>>>>>>> >>>>>>>>>>>> try: >>>>>>>>>>>> # Download using Beam's GcsIO >>>>>>>>>>>> with FileSystems.open(gcs_uri, 'rb') as >>>>>>>>>>>> gcs_file: >>>>>>>>>>>> with open(local_path, 'wb') as local_file: >>>>>>>>>>>> local_file.write(gcs_file.read()) >>>>>>>>>>>> >>>>>>>>>>>> logger.info(f"Downloaded {gcs_uri} to >>>>>>>>>>>> {local_path}") >>>>>>>>>>>> >>>>>>>>>>>> # Return a dictionary with the local path and >>>>>>>>>>>> original URI >>>>>>>>>>>> yield { >>>>>>>>>>>> 'local_path': local_path, >>>>>>>>>>>> 'gcs_uri': gcs_uri, >>>>>>>>>>>> 'filename': filename >>>>>>>>>>>> } >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> logger.error(f"Error reading from GCS: >>>>>>>>>>>> {str(e)}") >>>>>>>>>>>> # Try alternative download method >>>>>>>>>>>> logger.info(f"Trying alternative download >>>>>>>>>>>> method for {gcs_uri}") >>>>>>>>>>>> >>>>>>>>>>>> # For testing with local files >>>>>>>>>>>> if os.path.exists(gcs_uri.replace("gs://", "")): >>>>>>>>>>>> local_path = gcs_uri.replace("gs://", "") >>>>>>>>>>>> logger.info(f"Using local file: >>>>>>>>>>>> {local_path}") >>>>>>>>>>>> yield { >>>>>>>>>>>> 'local_path': local_path, >>>>>>>>>>>> 'gcs_uri': gcs_uri, >>>>>>>>>>>> 'filename': os.path.basename(local_path) >>>>>>>>>>>> } >>>>>>>>>>>> else: >>>>>>>>>>>> # Try using gsutil command >>>>>>>>>>>> import subprocess >>>>>>>>>>>> try: >>>>>>>>>>>> subprocess.run(["gsutil", "cp", >>>>>>>>>>>> gcs_uri, local_path], check=True) >>>>>>>>>>>> logger.info(f"Downloaded {gcs_uri} to >>>>>>>>>>>> {local_path} using gsutil") >>>>>>>>>>>> yield { >>>>>>>>>>>> 'local_path': local_path, >>>>>>>>>>>> 'gcs_uri': gcs_uri, >>>>>>>>>>>> 'filename': filename >>>>>>>>>>>> } >>>>>>>>>>>> except Exception as e2: >>>>>>>>>>>> logger.error(f"Failed to download using >>>>>>>>>>>> gsutil: {str(e2)}") >>>>>>>>>>>> >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> logger.error(f"Error downloading {gcs_uri}: >>>>>>>>>>>> {str(e)}") >>>>>>>>>>>> class LoadPDFPages(beam.DoFn): >>>>>>>>>>>> """Load PDF pages as images.""" >>>>>>>>>>>> >>>>>>>>>>>> def __init__(self, dpi=200): >>>>>>>>>>>> self.dpi = dpi >>>>>>>>>>>> >>>>>>>>>>>> def process(self, element): >>>>>>>>>>>> doc = None >>>>>>>>>>>> try: >>>>>>>>>>>> # Make sure we have all required fields >>>>>>>>>>>> if not isinstance(element, dict): >>>>>>>>>>>> logger.error(f"Expected dictionary, got >>>>>>>>>>>> {type(element)}") >>>>>>>>>>>> return >>>>>>>>>>>> >>>>>>>>>>>> if 'local_path' not in element: >>>>>>>>>>>> logger.error("Missing 'local_path' in element") >>>>>>>>>>>> return >>>>>>>>>>>> >>>>>>>>>>>> local_path = element['local_path'] >>>>>>>>>>>> gcs_uri = element.get('gcs_uri', '') >>>>>>>>>>>> >>>>>>>>>>>> # Extract filename from local_path if not provided >>>>>>>>>>>> filename = element.get('filename', >>>>>>>>>>>> os.path.basename(local_path)) >>>>>>>>>>>> >>>>>>>>>>>> logger.info(f"Loading PDF: {local_path}, filename: >>>>>>>>>>>> {filename}") >>>>>>>>>>>> >>>>>>>>>>>> # Check if file exists and is accessible >>>>>>>>>>>> if not os.path.exists(local_path): >>>>>>>>>>>> logger.error(f"File not found: {local_path}") >>>>>>>>>>>> return >>>>>>>>>>>> >>>>>>>>>>>> if not os.access(local_path, os.R_OK): >>>>>>>>>>>> logger.error(f"File not readable: {local_path}") >>>>>>>>>>>> return >>>>>>>>>>>> >>>>>>>>>>>> # Open the PDF >>>>>>>>>>>> try: >>>>>>>>>>>> doc = fitz.open(local_path) >>>>>>>>>>>> if doc.is_closed: >>>>>>>>>>>> logger.error(f"Failed to open PDF: >>>>>>>>>>>> {local_path}") >>>>>>>>>>>> return >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> logger.error(f"Error opening PDF {local_path}: >>>>>>>>>>>> {str(e)}") >>>>>>>>>>>> return >>>>>>>>>>>> >>>>>>>>>>>> # Process each page >>>>>>>>>>>> page_count = len(doc) >>>>>>>>>>>> logger.info(f"Processing {page_count} pages from >>>>>>>>>>>> {local_path}") >>>>>>>>>>>> >>>>>>>>>>>> for i in range(page_count): >>>>>>>>>>>> try: >>>>>>>>>>>> if doc.is_closed: >>>>>>>>>>>> logger.error(f"Document was closed >>>>>>>>>>>> unexpectedly while processing page {i}") >>>>>>>>>>>> break >>>>>>>>>>>> >>>>>>>>>>>> page = doc[i] >>>>>>>>>>>> if page is None: >>>>>>>>>>>> logger.error(f"Failed to get page {i} >>>>>>>>>>>> from document") >>>>>>>>>>>> continue >>>>>>>>>>>> >>>>>>>>>>>> # Use a higher resolution for better quality >>>>>>>>>>>> scale = self.dpi / 72.0 >>>>>>>>>>>> mat = fitz.Matrix(scale, scale) >>>>>>>>>>>> >>>>>>>>>>>> try: >>>>>>>>>>>> pix = page.get_pixmap(matrix=mat, >>>>>>>>>>>> alpha=False) >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> logger.error(f"Error getting pixmap for >>>>>>>>>>>> page {i}: {str(e)}") >>>>>>>>>>>> continue >>>>>>>>>>>> >>>>>>>>>>>> # Check pixmap dimensions >>>>>>>>>>>> if pix.height <= 0 or pix.width <= 0 or >>>>>>>>>>>> pix.n <= 0: >>>>>>>>>>>> logger.error(f"Invalid pixmap >>>>>>>>>>>> dimensions: {pix.width}x{pix.height}x{pix.n}") >>>>>>>>>>>> continue >>>>>>>>>>>> >>>>>>>>>>>> # Convert to numpy array >>>>>>>>>>>> try: >>>>>>>>>>>> arr = np.frombuffer(pix.samples, >>>>>>>>>>>> dtype=np.uint8).reshape(pix.height, pix.width, pix.n) >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> logger.error(f"Error converting pixmap >>>>>>>>>>>> to numpy array: {str(e)}") >>>>>>>>>>>> continue >>>>>>>>>>>> >>>>>>>>>>>> # Convert BGR to RGB if needed >>>>>>>>>>>> if pix.n == 3: # RGB >>>>>>>>>>>> try: >>>>>>>>>>>> arr = cv2.cvtColor(arr, >>>>>>>>>>>> cv2.COLOR_BGR2RGB) >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> logger.error(f"Error converting BGR >>>>>>>>>>>> to RGB: {str(e)}") >>>>>>>>>>>> continue >>>>>>>>>>>> >>>>>>>>>>>> # Store original size for later use >>>>>>>>>>>> original_size = (arr.shape[0], arr.shape[1]) >>>>>>>>>>>> >>>>>>>>>>>> # Create page info >>>>>>>>>>>> page_info = { >>>>>>>>>>>> 'page_num': i, >>>>>>>>>>>> 'image': arr, >>>>>>>>>>>> 'original_size': original_size, >>>>>>>>>>>> 'local_path': local_path, >>>>>>>>>>>> 'gcs_uri': gcs_uri, >>>>>>>>>>>> 'filename': filename >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> # Use document ID and page number as key >>>>>>>>>>>> doc_id = os.path.splitext(filename)[0] >>>>>>>>>>>> key = f"{doc_id}_{i}" >>>>>>>>>>>> >>>>>>>>>>>> yield (key, page_info) >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> import traceback >>>>>>>>>>>> logger.error(f"Error processing page {i}: >>>>>>>>>>>> {str(e)}") >>>>>>>>>>>> logger.error(traceback.format_exc()) >>>>>>>>>>>> >>>>>>>>>>>> logger.info(f"Loaded {len(doc)} pages from >>>>>>>>>>>> {local_path}") >>>>>>>>>>>> >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> import traceback >>>>>>>>>>>> logger.error(f"Error loading PDF: {str(e)}") >>>>>>>>>>>> logger.error(traceback.format_exc()) >>>>>>>>>>>> finally: >>>>>>>>>>>> # Make sure to close the document only if it was >>>>>>>>>>>> successfully opened >>>>>>>>>>>> if doc is not None: >>>>>>>>>>>> try: >>>>>>>>>>>> if not doc.is_closed: >>>>>>>>>>>> doc.close() >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> logger.debug(f"Error closing document: >>>>>>>>>>>> {str(e)}") >>>>>>>>>>>> >>>>>>>>>>>> class PreprocessImage(beam.DoFn): >>>>>>>>>>>> """Preprocess image for Triton server.""" >>>>>>>>>>>> >>>>>>>>>>>> def __init__(self, size=1024): >>>>>>>>>>>> self.size = size >>>>>>>>>>>> >>>>>>>>>>>> def letterbox(self, img, new_shape=1024, >>>>>>>>>>>> color=(114,114,114)): >>>>>>>>>>>> """Resize and pad image to target size.""" >>>>>>>>>>>> h, w = img.shape[:2] >>>>>>>>>>>> r = min(new_shape / h, new_shape / w) >>>>>>>>>>>> nh, nw = int(round(h * r)), int(round(w * r)) >>>>>>>>>>>> pad_h, pad_w = new_shape - nh, new_shape - nw >>>>>>>>>>>> top = pad_h // 2 >>>>>>>>>>>> bottom = pad_h - top >>>>>>>>>>>> left = pad_w // 2 >>>>>>>>>>>> right = pad_w - left >>>>>>>>>>>> img = cv2.resize(img, (nw, nh), >>>>>>>>>>>> interpolation=cv2.INTER_LINEAR) >>>>>>>>>>>> img = cv2.copyMakeBorder(img, top, bottom, left, right, >>>>>>>>>>>> cv2.BORDER_CONSTANT, value=color) >>>>>>>>>>>> return img, r, left, top >>>>>>>>>>>> >>>>>>>>>>>> def process(self, element): >>>>>>>>>>>> try: >>>>>>>>>>>> if not isinstance(element, tuple) or len(element) >>>>>>>>>>>> != 2: >>>>>>>>>>>> logger.error(f"Expected (key, value) tuple, got >>>>>>>>>>>> {type(element)}") >>>>>>>>>>>> return >>>>>>>>>>>> >>>>>>>>>>>> key, page_info = element >>>>>>>>>>>> >>>>>>>>>>>> if not isinstance(page_info, dict): >>>>>>>>>>>> logger.error(f"Expected dictionary for >>>>>>>>>>>> page_info, got {type(page_info)}") >>>>>>>>>>>> return >>>>>>>>>>>> >>>>>>>>>>>> if 'image' not in page_info: >>>>>>>>>>>> logger.error("Missing 'image' in page_info") >>>>>>>>>>>> return >>>>>>>>>>>> >>>>>>>>>>>> # Create a new dictionary to avoid modifying the >>>>>>>>>>>> input >>>>>>>>>>>> new_page_info = dict(page_info) >>>>>>>>>>>> >>>>>>>>>>>> # Apply letterbox resize >>>>>>>>>>>> img = new_page_info['image'] >>>>>>>>>>>> lb, r, left, top = self.letterbox(img, >>>>>>>>>>>> new_shape=self.size) >>>>>>>>>>>> >>>>>>>>>>>> # Convert to float32 and normalize to [0,1] >>>>>>>>>>>> x = lb.astype(np.float32) / 255.0 >>>>>>>>>>>> >>>>>>>>>>>> # Convert to CHW format >>>>>>>>>>>> x = np.transpose(x, (2, 0, 1)) >>>>>>>>>>>> >>>>>>>>>>>> # Add batch dimension >>>>>>>>>>>> batched_img = np.expand_dims(x, axis=0) >>>>>>>>>>>> >>>>>>>>>>>> # Update page info >>>>>>>>>>>> new_page_info['preprocessed_image'] = batched_img >>>>>>>>>>>> new_page_info['letterbox_info'] = (r, left, top) >>>>>>>>>>>> >>>>>>>>>>>> yield (key, new_page_info) >>>>>>>>>>>> >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> import traceback >>>>>>>>>>>> logger.error(f"Error preprocessing image: {str(e)}") >>>>>>>>>>>> logger.error(traceback.format_exc()) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> class ExtractBoxes(beam.DoFn): >>>>>>>>>>>> """Extract bounding boxes from Triton response.""" >>>>>>>>>>>> >>>>>>>>>>>> def __init__(self, conf_th=0.25, iou_th=0.7, >>>>>>>>>>>> model_size=1024): >>>>>>>>>>>> self.conf_th = conf_th >>>>>>>>>>>> self.iou_th = iou_th >>>>>>>>>>>> self.model_size = model_size >>>>>>>>>>>> >>>>>>>>>>>> def _nms(self, boxes, scores, iou_th=0.7): >>>>>>>>>>>> """Non-Maximum Suppression""" >>>>>>>>>>>> if len(boxes) == 0: >>>>>>>>>>>> return [] >>>>>>>>>>>> >>>>>>>>>>>> boxes = boxes.astype(np.float32) >>>>>>>>>>>> x1, y1, x2, y2 = boxes.T >>>>>>>>>>>> areas = (x2 - x1) * (y2 - y1) >>>>>>>>>>>> order = scores.argsort()[::-1] >>>>>>>>>>>> >>>>>>>>>>>> keep = [] >>>>>>>>>>>> while order.size > 0: >>>>>>>>>>>> i = order[0] >>>>>>>>>>>> keep.append(i) >>>>>>>>>>>> >>>>>>>>>>>> xx1 = np.maximum(x1[i], x1[order[1:]]) >>>>>>>>>>>> yy1 = np.maximum(y1[i], y1[order[1:]]) >>>>>>>>>>>> xx2 = np.minimum(x2[i], x2[order[1:]]) >>>>>>>>>>>> yy2 = np.minimum(y2[i], y2[order[1:]]) >>>>>>>>>>>> >>>>>>>>>>>> w = np.maximum(0.0, xx2 - xx1) >>>>>>>>>>>> h = np.maximum(0.0, yy2 - yy1) >>>>>>>>>>>> inter = w * h >>>>>>>>>>>> >>>>>>>>>>>> iou = inter / (areas[i] + areas[order[1:]] - inter >>>>>>>>>>>> + 1e-9) >>>>>>>>>>>> inds = np.where(iou <= iou_th)[0] >>>>>>>>>>>> order = order[inds + 1] >>>>>>>>>>>> >>>>>>>>>>>> return keep >>>>>>>>>>>> >>>>>>>>>>>> def process(self, page_info): >>>>>>>>>>>> try: >>>>>>>>>>>> triton_response = page_info['triton_response'] >>>>>>>>>>>> original_size = page_info['original_size'] >>>>>>>>>>>> r, left, top = page_info['letterbox_info'] >>>>>>>>>>>> >>>>>>>>>>>> if "outputs" not in triton_response or not >>>>>>>>>>>> triton_response["outputs"]: >>>>>>>>>>>> logger.error("Invalid response from Triton >>>>>>>>>>>> server") >>>>>>>>>>>> return [] >>>>>>>>>>>> >>>>>>>>>>>> out_meta = triton_response["outputs"][0] >>>>>>>>>>>> shape = out_meta["shape"] >>>>>>>>>>>> data = np.array(out_meta["data"], >>>>>>>>>>>> dtype=np.float32).reshape(shape) >>>>>>>>>>>> >>>>>>>>>>>> logger.info(f"Output shape: {shape}") >>>>>>>>>>>> >>>>>>>>>>>> # For YOLO output [B, C, P] where C is channels >>>>>>>>>>>> (box coords + objectness + classes) >>>>>>>>>>>> B, C, P = shape >>>>>>>>>>>> >>>>>>>>>>>> # Assuming 4 box coordinates + class probabilities >>>>>>>>>>>> (no objectness) >>>>>>>>>>>> has_objectness = False >>>>>>>>>>>> num_classes = C - 5 if has_objectness else C - 4 >>>>>>>>>>>> >>>>>>>>>>>> # Extract data >>>>>>>>>>>> xywh = data[:, 0:4, :] >>>>>>>>>>>> if has_objectness: >>>>>>>>>>>> obj = data[:, 4:5, :] >>>>>>>>>>>> cls = data[:, 5:5 + num_classes, :] >>>>>>>>>>>> else: >>>>>>>>>>>> obj = None >>>>>>>>>>>> cls = data[:, 4:4 + num_classes, :] >>>>>>>>>>>> >>>>>>>>>>>> # Process batch item (we only have one) >>>>>>>>>>>> b = 0 >>>>>>>>>>>> h, w = original_size >>>>>>>>>>>> >>>>>>>>>>>> xywh_b = xywh[b].T # (P,4) >>>>>>>>>>>> if obj is not None: >>>>>>>>>>>> obj_b = obj[b].T.squeeze(1) # (P,) >>>>>>>>>>>> else: >>>>>>>>>>>> obj_b = np.ones((P,), dtype=np.float32) >>>>>>>>>>>> cls_b = cls[b].T # (P,nc) >>>>>>>>>>>> >>>>>>>>>>>> # Get scores and labels >>>>>>>>>>>> scores_all = (obj_b[:, None] * cls_b) if obj is not >>>>>>>>>>>> None else cls_b >>>>>>>>>>>> labels = scores_all.argmax(axis=1) >>>>>>>>>>>> scores = scores_all.max(axis=1) >>>>>>>>>>>> >>>>>>>>>>>> # Filter by confidence threshold >>>>>>>>>>>> keep = scores >= self.conf_th >>>>>>>>>>>> if not np.any(keep): >>>>>>>>>>>> logger.info(f"No detections above threshold >>>>>>>>>>>> {self.conf_th}") >>>>>>>>>>>> return [] >>>>>>>>>>>> >>>>>>>>>>>> xywh_k = xywh_b[keep] >>>>>>>>>>>> scores_k = scores[keep] >>>>>>>>>>>> labels_k = labels[keep] >>>>>>>>>>>> >>>>>>>>>>>> # xywh -> xyxy in model space >>>>>>>>>>>> cx, cy, ww, hh = xywh_k.T >>>>>>>>>>>> xyxy_model = np.stack([cx - ww / 2, cy - hh / 2, cx >>>>>>>>>>>> + ww / 2, cy + hh / 2], axis=1) >>>>>>>>>>>> >>>>>>>>>>>> # Apply NMS per class >>>>>>>>>>>> final_boxes = [] >>>>>>>>>>>> final_scores = [] >>>>>>>>>>>> final_labels = [] >>>>>>>>>>>> >>>>>>>>>>>> for c in np.unique(labels_k): >>>>>>>>>>>> idxs = np.where(labels_k == c)[0] >>>>>>>>>>>> if idxs.size == 0: >>>>>>>>>>>> continue >>>>>>>>>>>> keep_idx = self._nms(xyxy_model[idxs], >>>>>>>>>>>> scores_k[idxs], iou_th=self.iou_th) >>>>>>>>>>>> final_boxes.append(xyxy_model[idxs][keep_idx]) >>>>>>>>>>>> final_scores.append(scores_k[idxs][keep_idx]) >>>>>>>>>>>> final_labels.append(np.full(len(keep_idx), c, >>>>>>>>>>>> dtype=int)) >>>>>>>>>>>> >>>>>>>>>>>> if not final_boxes: >>>>>>>>>>>> logger.info("No detections after NMS") >>>>>>>>>>>> return [] >>>>>>>>>>>> >>>>>>>>>>>> xyxy_model = np.vstack(final_boxes) >>>>>>>>>>>> scores_k = np.concatenate(final_scores) >>>>>>>>>>>> labels_k = np.concatenate(final_labels) >>>>>>>>>>>> >>>>>>>>>>>> # Map boxes from model space to original image space >>>>>>>>>>>> xyxy_orig = xyxy_model.copy() >>>>>>>>>>>> >>>>>>>>>>>> # Remove padding >>>>>>>>>>>> xyxy_orig[:, [0, 2]] -= left >>>>>>>>>>>> xyxy_orig[:, [1, 3]] -= top >>>>>>>>>>>> >>>>>>>>>>>> # Scale back to original size >>>>>>>>>>>> xyxy_orig /= r >>>>>>>>>>>> >>>>>>>>>>>> # Clip to image boundaries >>>>>>>>>>>> xyxy_orig[:, 0::2] = np.clip(xyxy_orig[:, 0::2], 0, >>>>>>>>>>>> w - 1) >>>>>>>>>>>> xyxy_orig[:, 1::2] = np.clip(xyxy_orig[:, 1::2], 0, >>>>>>>>>>>> h - 1) >>>>>>>>>>>> >>>>>>>>>>>> # Format as requested: x_min, y_min, x_max, y_max, >>>>>>>>>>>> class, probability >>>>>>>>>>>> boxes = [] >>>>>>>>>>>> for (x1, y1, x2, y2), label, score in >>>>>>>>>>>> zip(xyxy_orig, labels_k, scores_k): >>>>>>>>>>>> class_name = CLASS_ID_TO_NAME.get(int(label)) >>>>>>>>>>>> box_info = { >>>>>>>>>>>> "page": page_info['page_num'], >>>>>>>>>>>> "x_min": float(x1), >>>>>>>>>>>> "y_min": float(y1), >>>>>>>>>>>> "x_max": float(x2), >>>>>>>>>>>> "y_max": float(y2), >>>>>>>>>>>> "class": int(label), >>>>>>>>>>>> "class_name": class_name, >>>>>>>>>>>> "probability": float(score), >>>>>>>>>>>> "filename": page_info['filename'], >>>>>>>>>>>> "local_path": page_info['local_path'], >>>>>>>>>>>> "gcs_uri": page_info['gcs_uri'] >>>>>>>>>>>> } >>>>>>>>>>>> boxes.append(box_info) >>>>>>>>>>>> >>>>>>>>>>>> logger.info(f"Extracted {len(boxes)} boxes from >>>>>>>>>>>> page {page_info['page_num']}") >>>>>>>>>>>> >>>>>>>>>>>> return boxes >>>>>>>>>>>> >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> logger.error(f"Error extracting boxes: {str(e)}") >>>>>>>>>>>> return [] >>>>>>>>>>>> >>>>>>>>>>>> class PrepareForBigQuery(beam.DoFn): >>>>>>>>>>>> """Prepare data for BigQuery insertion.""" >>>>>>>>>>>> >>>>>>>>>>>> def process(self, box_info): >>>>>>>>>>>> try: >>>>>>>>>>>> # Generate UUIDs for primary keys >>>>>>>>>>>> v_note_id = str(uuid.uuid4()) >>>>>>>>>>>> page_ocr_id = str(uuid.uuid4()) >>>>>>>>>>>> class_prediction_id = str(uuid.uuid4()) >>>>>>>>>>>> >>>>>>>>>>>> # Create timestamp >>>>>>>>>>>> processing_time = >>>>>>>>>>>> datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") >>>>>>>>>>>> >>>>>>>>>>>> # Create ocr_results row >>>>>>>>>>>> ocr_results_row = { >>>>>>>>>>>> "v_note_id": v_note_id, >>>>>>>>>>>> "filename": box_info['filename'], >>>>>>>>>>>> "file_path": box_info['gcs_uri'], >>>>>>>>>>>> "processing_time": processing_time, >>>>>>>>>>>> "file_type": "pdf" >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> # Create page_ocr row >>>>>>>>>>>> page_ocr_row = { >>>>>>>>>>>> "page_ocr_id": page_ocr_id, >>>>>>>>>>>> "v_note_id": v_note_id, >>>>>>>>>>>> "page_number": box_info['page'] >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> # Create class_prediction row >>>>>>>>>>>> class_prediction_row = { >>>>>>>>>>>> "class_prediction_id": class_prediction_id, >>>>>>>>>>>> "page_ocr_id": page_ocr_id, >>>>>>>>>>>> "xmin": box_info['x_min'], >>>>>>>>>>>> "ymin": box_info['y_min'], >>>>>>>>>>>> "xmax": box_info['x_max'], >>>>>>>>>>>> "ymax": box_info['y_max'], >>>>>>>>>>>> "class": box_info['class_name'] if >>>>>>>>>>>> box_info['class_name'] else str(box_info['class']), >>>>>>>>>>>> "confidence": box_info['probability'] >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> # Return all three rows with table names >>>>>>>>>>>> return [ >>>>>>>>>>>> ('ocr_results', ocr_results_row), >>>>>>>>>>>> ('page_ocr', page_ocr_row), >>>>>>>>>>>> ('class_prediction', class_prediction_row) >>>>>>>>>>>> ] >>>>>>>>>>>> >>>>>>>>>>>> except Exception as e: >>>>>>>>>>>> logger.error(f"Error preparing for BigQuery: >>>>>>>>>>>> {str(e)}") >>>>>>>>>>>> return [] >>>>>>>>>>>> >>>>>>>>>>>> model_handler = TensorRTEngineHandlerNumPy( >>>>>>>>>>>> min_batch_size=1, >>>>>>>>>>>> max_batch_size=1, >>>>>>>>>>>> engine_path="gs://temp/yolov11l-doclaynet.engine", >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> with beam.Pipeline(options=options) as pipeline: >>>>>>>>>>>> >>>>>>>>>>>> # Create PCollection from input URIs >>>>>>>>>>>> pdf_uris = ( >>>>>>>>>>>> pipeline >>>>>>>>>>>> | "Create URIs" >> beam.Create(["tmp.pdf"]) >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> # Download PDFs >>>>>>>>>>>> local_pdfs = ( >>>>>>>>>>>> pdf_uris >>>>>>>>>>>> | "Download PDFs" >> >>>>>>>>>>>> beam.ParDo(DownloadPDFFromGCS()) >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> # Load PDF pages >>>>>>>>>>>> pdf_pages = ( >>>>>>>>>>>> local_pdfs >>>>>>>>>>>> | "Load PDF Pages" >> beam.ParDo(LoadPDFPages()) >>>>>>>>>>>> #| "Flatten Pages" >> beam.FlatMap(lambda x: x) >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> # Preprocess images >>>>>>>>>>>> preprocessed_pages = ( >>>>>>>>>>>> pdf_pages >>>>>>>>>>>> | "Preprocess Images" >> >>>>>>>>>>>> beam.ParDo(PreprocessImage()) >>>>>>>>>>>> ) >>>>>>>>>>>> inference_results = ( >>>>>>>>>>>> preprocessed_pages >>>>>>>>>>>> | "Run Inference" >> >>>>>>>>>>>> RunInference(model_handler=model_handler) >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> On Tue, 16 Sept 2025 at 21:23, XQ Hu <[email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Can you share your commands and outputs? >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Sep 16, 2025 at 9:02 PM Sai Shashank < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Okay I have changed the docker image but to now to RUN the >>>>>>>>>>>>>> python command but it is still halting without are error or >>>>>>>>>>>>>> warnings or >>>>>>>>>>>>>> errors >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, 16 Sept 2025 at 17:38, XQ Hu via dev < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> The CMD is not necessary as it will be overridden by the >>>>>>>>>>>>>>> ENTRYPOINT just like your comment. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If you ssh to your Docker container like `docker run --rm >>>>>>>>>>>>>>> -it --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE`, can you >>>>>>>>>>>>>>> run python and >>>>>>>>>>>>>>> some Beam pipelines with a direct runner in the container? This >>>>>>>>>>>>>>> can help >>>>>>>>>>>>>>> test the environment works fine. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have one old Dockerfile that used to work with the old >>>>>>>>>>>>>>> Beam: >>>>>>>>>>>>>>> https://github.com/google/dataflow-ml-starter/blob/main/tensor_rt.Dockerfile >>>>>>>>>>>>>>> . >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Sep 16, 2025 at 4:56 PM Sai Shashank < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ---------- Forwarded message --------- >>>>>>>>>>>>>>>> From: Sai Shashank <[email protected]> >>>>>>>>>>>>>>>> Date: Tue, Sep 16, 2025 at 4:27 PM >>>>>>>>>>>>>>>> Subject: TensorRT inference not starting >>>>>>>>>>>>>>>> To: <[email protected]> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hey Everyone, >>>>>>>>>>>>>>>> I was trying to use tensorRT >>>>>>>>>>>>>>>> within the apache beam on dataflow but somehow , dataflow >>>>>>>>>>>>>>>> didn't start like >>>>>>>>>>>>>>>> it did not even give me Worker logs. Below is the docker file >>>>>>>>>>>>>>>> that , use to >>>>>>>>>>>>>>>> create a custom image, at first I thought it is the version >>>>>>>>>>>>>>>> mismatched but >>>>>>>>>>>>>>>> usually it gives me a harness error . >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ARG BUILD_IMAGE=nvcr.io/nvidia/tensorrt:25.08-py3 >>>>>>>>>>>>>>>> FROM ${BUILD_IMAGE} >>>>>>>>>>>>>>>> ENV PATH="/usr/src/tensorrt/bin:${PATH}" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> WORKDIR /workspace >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> RUN apt-get update -y && apt-get install -y python3-venv >>>>>>>>>>>>>>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.67.0 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> COPY --from=apache/beam_python3.10_sdk:2.67.0 >>>>>>>>>>>>>>>> /opt/apache/beam /opt/apache/beam >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> # Install additional dependencies >>>>>>>>>>>>>>>> RUN pip install --upgrade pip \ >>>>>>>>>>>>>>>> && pip install torch \ >>>>>>>>>>>>>>>> && pip install torchvision \ >>>>>>>>>>>>>>>> && pip install pillow>=8.0.0 \ >>>>>>>>>>>>>>>> && pip install transformers>=4.18.0 \ >>>>>>>>>>>>>>>> && pip install cuda-python \ >>>>>>>>>>>>>>>> && pip install opencv-python==4.7.0.72 \ >>>>>>>>>>>>>>>> && pip install PyMuPDF==1.22.5 \ >>>>>>>>>>>>>>>> && pip install requests==2.31.0 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> # Set the default command to run the inference script >>>>>>>>>>>>>>>> # This will be overridden by the Apache Beam boot script >>>>>>>>>>>>>>>> CMD ["python", "/workspace/inference.py"] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> # Use the Apache Beam boot script as the entrypoint >>>>>>>>>>>>>>>> ENTRYPOINT ["/opt/apache/beam/boot"] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>
