We are following this through the GCP support case now. On Tue, Sep 16, 2025 at 9:54 PM Sai Shashank <[email protected]> wrote:
> The code works without the custom image of TensorRT, I will only get this: > > ject to benefit from faster worker startup and autoscaling. If you experience > container-startup related issues, pass the "disable_image_streaming" > experiment to disable image streaming for the job. > INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:10.327Z: > JOB_MESSAGE_BASIC: Worker configuration: g2-standard-4 in us-east4-c. > INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.480Z: > JOB_MESSAGE_BASIC: Executing operation [10]: Create URIs/Impulse+[10]: Create > URIs/FlatMap(<lambda at core.py:3994>)+[10]: Create URIs/Map(decode)+[10]: > Download PDFs+[10]: Load PDF Pages+[10]: Preprocess Images+[10]: Run > Inference/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)+[10]: Run > Inference/BeamML_RunInference > INFO:apache_beam.runners.dataflow.dataflow_runner:2025-09-17T00:52:11.543Z: > JOB_MESSAGE_BASIC: Starting 1 workers in us-east4... > INFO:apache_beam.runners.dataflow.dataflow_runner:Job > 2025-09-16_17_52_06-10817935125972705087 is in state JOB_STATE_RUNNING > WARNING:google_auth_httplib2:httplib2 transport does not support per-request > timeout. Set the timeout when constructing the httplib2.Http instance. > WARNING:google_auth_httplib2:httplib2 transport does not support per-request > timeout. Set the timeout when constructing the httplib2.Http instance. > WARNING:google_auth_httplib2:httplib2 transport does not support per-request > timeout. Set the timeout when constructing the httplib2.Http instance. > WARNING:google_auth_httplib2:httplib2 transport does not support per-request > timeout. Set the timeout when constructing the httplib2.Http instance. > > WARNING:google_auth_httplib2:httplib2 transport does not support > per-request timeout. Set the timeout when constructing the httplib2.Http > instance. > > just recurring and after an hour this message pops up Workflow failed. > Causes: The Dataflow job appears to be stuck because no worker activity has > been seen in the last 1h. For more information, see > https://cloud.google.com/dataflow/docs/guides/common-errors#error-syncing-pod. > You can also get help with Cloud Dataflow at > https://cloud.google.com/dataflow/support. > > > On Tue, 16 Sept 2025 at 21:35, XQ Hu <[email protected]> wrote: > >> Does the code work without using TensorRT? Any logs? >> >> On Tue, Sep 16, 2025 at 9:28 PM Sai Shashank <[email protected]> >> wrote: >> >>> import apache_beam as beam >>> from apache_beam.ml.inference.tensorrt_inference import >>> TensorRTEngineHandlerNumPy >>> from apache_beam.ml.inference.base import RunInference >>> >>> #!/usr/bin/env python3 >>> """ >>> Apache Beam pipeline for processing PDFs with Triton server and saving >>> results to BigQuery. >>> This pipeline combines functionality from test_triton_document.py, >>> create_bigquery_tables.py, >>> and save_to_bigquery.py into a single workflow. >>> """ >>> >>> import os >>> import sys >>> import json >>> import uuid >>> import argparse >>> import logging >>> import tempfile >>> import datetime >>> import requests >>> import numpy as np >>> import cv2 >>> from PIL import Image >>> import fitz # PyMuPDF >>> from pathlib import Path >>> from typing import Dict, List, Tuple, Any, Optional, Iterator >>> >>> # Apache Beam imports >>> import apache_beam as beam >>> from apache_beam.options.pipeline_options import PipelineOptions, >>> SetupOptions >>> from apache_beam.ml.inference.base import RemoteModelHandler, >>> PredictionResult >>> from apache_beam.ml.inference.utils import _convert_to_result >>> from apache_beam.ml.inference.base import RunInference >>> from apache_beam.io.gcp.bigquery import WriteToBigQuery >>> from apache_beam.io.filesystems import FileSystems >>> from apache_beam.io.gcp.gcsio import GcsIO >>> >>> # Google Cloud imports >>> from google.cloud import storage >>> from google.cloud import bigquery >>> >>> # Set up logging >>> logging.basicConfig(level=logging.INFO) >>> logger = logging.getLogger(__name__) >>> >>> # DocLayNet classes >>> CLASS_ID_TO_NAME = { >>> 0: 'Caption', >>> 1: 'Footnote', >>> 2: 'Formula', >>> 3: 'List-item', >>> 4: 'Page-footer', >>> 5: 'Page-header', >>> 6: 'Picture', >>> 7: 'Section-header', >>> 8: 'Table', >>> 9: 'Text', >>> 10: 'Title' >>> } >>> class DownloadPDFFromGCS(beam.DoFn): >>> """Download a PDF from Google Cloud Storage.""" >>> >>> def __init__(self, temp_dir=None): >>> self.temp_dir = temp_dir or tempfile.gettempdir() >>> >>> def process(self, gcs_uri): >>> try: >>> # Parse GCS URI >>> if not gcs_uri.startswith("gs://"): >>> raise ValueError(f"Invalid GCS URI: {gcs_uri}") >>> >>> # Remove gs:// prefix and split into bucket and blob path >>> path_parts = gcs_uri[5:].split("/", 1) >>> bucket_name = path_parts[0] >>> blob_path = path_parts[1] >>> >>> # Get filename from blob path >>> filename = os.path.basename(blob_path) >>> local_path = os.path.join(self.temp_dir, filename) >>> >>> # Create temp directory if it doesn't exist >>> os.makedirs(self.temp_dir, exist_ok=True) >>> >>> try: >>> # Download using Beam's GcsIO >>> with FileSystems.open(gcs_uri, 'rb') as gcs_file: >>> with open(local_path, 'wb') as local_file: >>> local_file.write(gcs_file.read()) >>> >>> logger.info(f"Downloaded {gcs_uri} to {local_path}") >>> >>> # Return a dictionary with the local path and original >>> URI >>> yield { >>> 'local_path': local_path, >>> 'gcs_uri': gcs_uri, >>> 'filename': filename >>> } >>> except Exception as e: >>> logger.error(f"Error reading from GCS: {str(e)}") >>> # Try alternative download method >>> logger.info(f"Trying alternative download method for >>> {gcs_uri}") >>> >>> # For testing with local files >>> if os.path.exists(gcs_uri.replace("gs://", "")): >>> local_path = gcs_uri.replace("gs://", "") >>> logger.info(f"Using local file: {local_path}") >>> yield { >>> 'local_path': local_path, >>> 'gcs_uri': gcs_uri, >>> 'filename': os.path.basename(local_path) >>> } >>> else: >>> # Try using gsutil command >>> import subprocess >>> try: >>> subprocess.run(["gsutil", "cp", gcs_uri, >>> local_path], check=True) >>> logger.info(f"Downloaded {gcs_uri} to >>> {local_path} using gsutil") >>> yield { >>> 'local_path': local_path, >>> 'gcs_uri': gcs_uri, >>> 'filename': filename >>> } >>> except Exception as e2: >>> logger.error(f"Failed to download using gsutil: >>> {str(e2)}") >>> >>> except Exception as e: >>> logger.error(f"Error downloading {gcs_uri}: {str(e)}") >>> class LoadPDFPages(beam.DoFn): >>> """Load PDF pages as images.""" >>> >>> def __init__(self, dpi=200): >>> self.dpi = dpi >>> >>> def process(self, element): >>> doc = None >>> try: >>> # Make sure we have all required fields >>> if not isinstance(element, dict): >>> logger.error(f"Expected dictionary, got {type(element)}") >>> return >>> >>> if 'local_path' not in element: >>> logger.error("Missing 'local_path' in element") >>> return >>> >>> local_path = element['local_path'] >>> gcs_uri = element.get('gcs_uri', '') >>> >>> # Extract filename from local_path if not provided >>> filename = element.get('filename', >>> os.path.basename(local_path)) >>> >>> logger.info(f"Loading PDF: {local_path}, filename: >>> {filename}") >>> >>> # Check if file exists and is accessible >>> if not os.path.exists(local_path): >>> logger.error(f"File not found: {local_path}") >>> return >>> >>> if not os.access(local_path, os.R_OK): >>> logger.error(f"File not readable: {local_path}") >>> return >>> >>> # Open the PDF >>> try: >>> doc = fitz.open(local_path) >>> if doc.is_closed: >>> logger.error(f"Failed to open PDF: {local_path}") >>> return >>> except Exception as e: >>> logger.error(f"Error opening PDF {local_path}: {str(e)}") >>> return >>> >>> # Process each page >>> page_count = len(doc) >>> logger.info(f"Processing {page_count} pages from >>> {local_path}") >>> >>> for i in range(page_count): >>> try: >>> if doc.is_closed: >>> logger.error(f"Document was closed unexpectedly >>> while processing page {i}") >>> break >>> >>> page = doc[i] >>> if page is None: >>> logger.error(f"Failed to get page {i} from >>> document") >>> continue >>> >>> # Use a higher resolution for better quality >>> scale = self.dpi / 72.0 >>> mat = fitz.Matrix(scale, scale) >>> >>> try: >>> pix = page.get_pixmap(matrix=mat, alpha=False) >>> except Exception as e: >>> logger.error(f"Error getting pixmap for page >>> {i}: {str(e)}") >>> continue >>> >>> # Check pixmap dimensions >>> if pix.height <= 0 or pix.width <= 0 or pix.n <= 0: >>> logger.error(f"Invalid pixmap dimensions: >>> {pix.width}x{pix.height}x{pix.n}") >>> continue >>> >>> # Convert to numpy array >>> try: >>> arr = np.frombuffer(pix.samples, >>> dtype=np.uint8).reshape(pix.height, pix.width, pix.n) >>> except Exception as e: >>> logger.error(f"Error converting pixmap to numpy >>> array: {str(e)}") >>> continue >>> >>> # Convert BGR to RGB if needed >>> if pix.n == 3: # RGB >>> try: >>> arr = cv2.cvtColor(arr, cv2.COLOR_BGR2RGB) >>> except Exception as e: >>> logger.error(f"Error converting BGR to RGB: >>> {str(e)}") >>> continue >>> >>> # Store original size for later use >>> original_size = (arr.shape[0], arr.shape[1]) >>> >>> # Create page info >>> page_info = { >>> 'page_num': i, >>> 'image': arr, >>> 'original_size': original_size, >>> 'local_path': local_path, >>> 'gcs_uri': gcs_uri, >>> 'filename': filename >>> } >>> >>> # Use document ID and page number as key >>> doc_id = os.path.splitext(filename)[0] >>> key = f"{doc_id}_{i}" >>> >>> yield (key, page_info) >>> except Exception as e: >>> import traceback >>> logger.error(f"Error processing page {i}: {str(e)}") >>> logger.error(traceback.format_exc()) >>> >>> logger.info(f"Loaded {len(doc)} pages from {local_path}") >>> >>> except Exception as e: >>> import traceback >>> logger.error(f"Error loading PDF: {str(e)}") >>> logger.error(traceback.format_exc()) >>> finally: >>> # Make sure to close the document only if it was >>> successfully opened >>> if doc is not None: >>> try: >>> if not doc.is_closed: >>> doc.close() >>> except Exception as e: >>> logger.debug(f"Error closing document: {str(e)}") >>> >>> class PreprocessImage(beam.DoFn): >>> """Preprocess image for Triton server.""" >>> >>> def __init__(self, size=1024): >>> self.size = size >>> >>> def letterbox(self, img, new_shape=1024, color=(114,114,114)): >>> """Resize and pad image to target size.""" >>> h, w = img.shape[:2] >>> r = min(new_shape / h, new_shape / w) >>> nh, nw = int(round(h * r)), int(round(w * r)) >>> pad_h, pad_w = new_shape - nh, new_shape - nw >>> top = pad_h // 2 >>> bottom = pad_h - top >>> left = pad_w // 2 >>> right = pad_w - left >>> img = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR) >>> img = cv2.copyMakeBorder(img, top, bottom, left, right, >>> cv2.BORDER_CONSTANT, value=color) >>> return img, r, left, top >>> >>> def process(self, element): >>> try: >>> if not isinstance(element, tuple) or len(element) != 2: >>> logger.error(f"Expected (key, value) tuple, got >>> {type(element)}") >>> return >>> >>> key, page_info = element >>> >>> if not isinstance(page_info, dict): >>> logger.error(f"Expected dictionary for page_info, got >>> {type(page_info)}") >>> return >>> >>> if 'image' not in page_info: >>> logger.error("Missing 'image' in page_info") >>> return >>> >>> # Create a new dictionary to avoid modifying the input >>> new_page_info = dict(page_info) >>> >>> # Apply letterbox resize >>> img = new_page_info['image'] >>> lb, r, left, top = self.letterbox(img, new_shape=self.size) >>> >>> # Convert to float32 and normalize to [0,1] >>> x = lb.astype(np.float32) / 255.0 >>> >>> # Convert to CHW format >>> x = np.transpose(x, (2, 0, 1)) >>> >>> # Add batch dimension >>> batched_img = np.expand_dims(x, axis=0) >>> >>> # Update page info >>> new_page_info['preprocessed_image'] = batched_img >>> new_page_info['letterbox_info'] = (r, left, top) >>> >>> yield (key, new_page_info) >>> >>> except Exception as e: >>> import traceback >>> logger.error(f"Error preprocessing image: {str(e)}") >>> logger.error(traceback.format_exc()) >>> >>> >>> >>> class ExtractBoxes(beam.DoFn): >>> """Extract bounding boxes from Triton response.""" >>> >>> def __init__(self, conf_th=0.25, iou_th=0.7, model_size=1024): >>> self.conf_th = conf_th >>> self.iou_th = iou_th >>> self.model_size = model_size >>> >>> def _nms(self, boxes, scores, iou_th=0.7): >>> """Non-Maximum Suppression""" >>> if len(boxes) == 0: >>> return [] >>> >>> boxes = boxes.astype(np.float32) >>> x1, y1, x2, y2 = boxes.T >>> areas = (x2 - x1) * (y2 - y1) >>> order = scores.argsort()[::-1] >>> >>> keep = [] >>> while order.size > 0: >>> i = order[0] >>> keep.append(i) >>> >>> xx1 = np.maximum(x1[i], x1[order[1:]]) >>> yy1 = np.maximum(y1[i], y1[order[1:]]) >>> xx2 = np.minimum(x2[i], x2[order[1:]]) >>> yy2 = np.minimum(y2[i], y2[order[1:]]) >>> >>> w = np.maximum(0.0, xx2 - xx1) >>> h = np.maximum(0.0, yy2 - yy1) >>> inter = w * h >>> >>> iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-9) >>> inds = np.where(iou <= iou_th)[0] >>> order = order[inds + 1] >>> >>> return keep >>> >>> def process(self, page_info): >>> try: >>> triton_response = page_info['triton_response'] >>> original_size = page_info['original_size'] >>> r, left, top = page_info['letterbox_info'] >>> >>> if "outputs" not in triton_response or not >>> triton_response["outputs"]: >>> logger.error("Invalid response from Triton server") >>> return [] >>> >>> out_meta = triton_response["outputs"][0] >>> shape = out_meta["shape"] >>> data = np.array(out_meta["data"], >>> dtype=np.float32).reshape(shape) >>> >>> logger.info(f"Output shape: {shape}") >>> >>> # For YOLO output [B, C, P] where C is channels (box coords >>> + objectness + classes) >>> B, C, P = shape >>> >>> # Assuming 4 box coordinates + class probabilities (no >>> objectness) >>> has_objectness = False >>> num_classes = C - 5 if has_objectness else C - 4 >>> >>> # Extract data >>> xywh = data[:, 0:4, :] >>> if has_objectness: >>> obj = data[:, 4:5, :] >>> cls = data[:, 5:5 + num_classes, :] >>> else: >>> obj = None >>> cls = data[:, 4:4 + num_classes, :] >>> >>> # Process batch item (we only have one) >>> b = 0 >>> h, w = original_size >>> >>> xywh_b = xywh[b].T # (P,4) >>> if obj is not None: >>> obj_b = obj[b].T.squeeze(1) # (P,) >>> else: >>> obj_b = np.ones((P,), dtype=np.float32) >>> cls_b = cls[b].T # (P,nc) >>> >>> # Get scores and labels >>> scores_all = (obj_b[:, None] * cls_b) if obj is not None >>> else cls_b >>> labels = scores_all.argmax(axis=1) >>> scores = scores_all.max(axis=1) >>> >>> # Filter by confidence threshold >>> keep = scores >= self.conf_th >>> if not np.any(keep): >>> logger.info(f"No detections above threshold >>> {self.conf_th}") >>> return [] >>> >>> xywh_k = xywh_b[keep] >>> scores_k = scores[keep] >>> labels_k = labels[keep] >>> >>> # xywh -> xyxy in model space >>> cx, cy, ww, hh = xywh_k.T >>> xyxy_model = np.stack([cx - ww / 2, cy - hh / 2, cx + ww / >>> 2, cy + hh / 2], axis=1) >>> >>> # Apply NMS per class >>> final_boxes = [] >>> final_scores = [] >>> final_labels = [] >>> >>> for c in np.unique(labels_k): >>> idxs = np.where(labels_k == c)[0] >>> if idxs.size == 0: >>> continue >>> keep_idx = self._nms(xyxy_model[idxs], scores_k[idxs], >>> iou_th=self.iou_th) >>> final_boxes.append(xyxy_model[idxs][keep_idx]) >>> final_scores.append(scores_k[idxs][keep_idx]) >>> final_labels.append(np.full(len(keep_idx), c, dtype=int)) >>> >>> if not final_boxes: >>> logger.info("No detections after NMS") >>> return [] >>> >>> xyxy_model = np.vstack(final_boxes) >>> scores_k = np.concatenate(final_scores) >>> labels_k = np.concatenate(final_labels) >>> >>> # Map boxes from model space to original image space >>> xyxy_orig = xyxy_model.copy() >>> >>> # Remove padding >>> xyxy_orig[:, [0, 2]] -= left >>> xyxy_orig[:, [1, 3]] -= top >>> >>> # Scale back to original size >>> xyxy_orig /= r >>> >>> # Clip to image boundaries >>> xyxy_orig[:, 0::2] = np.clip(xyxy_orig[:, 0::2], 0, w - 1) >>> xyxy_orig[:, 1::2] = np.clip(xyxy_orig[:, 1::2], 0, h - 1) >>> >>> # Format as requested: x_min, y_min, x_max, y_max, class, >>> probability >>> boxes = [] >>> for (x1, y1, x2, y2), label, score in zip(xyxy_orig, >>> labels_k, scores_k): >>> class_name = CLASS_ID_TO_NAME.get(int(label)) >>> box_info = { >>> "page": page_info['page_num'], >>> "x_min": float(x1), >>> "y_min": float(y1), >>> "x_max": float(x2), >>> "y_max": float(y2), >>> "class": int(label), >>> "class_name": class_name, >>> "probability": float(score), >>> "filename": page_info['filename'], >>> "local_path": page_info['local_path'], >>> "gcs_uri": page_info['gcs_uri'] >>> } >>> boxes.append(box_info) >>> >>> logger.info(f"Extracted {len(boxes)} boxes from page >>> {page_info['page_num']}") >>> >>> return boxes >>> >>> except Exception as e: >>> logger.error(f"Error extracting boxes: {str(e)}") >>> return [] >>> >>> class PrepareForBigQuery(beam.DoFn): >>> """Prepare data for BigQuery insertion.""" >>> >>> def process(self, box_info): >>> try: >>> # Generate UUIDs for primary keys >>> v_note_id = str(uuid.uuid4()) >>> page_ocr_id = str(uuid.uuid4()) >>> class_prediction_id = str(uuid.uuid4()) >>> >>> # Create timestamp >>> processing_time = datetime.datetime.now().strftime("%Y-%m-%d >>> %H:%M:%S") >>> >>> # Create ocr_results row >>> ocr_results_row = { >>> "v_note_id": v_note_id, >>> "filename": box_info['filename'], >>> "file_path": box_info['gcs_uri'], >>> "processing_time": processing_time, >>> "file_type": "pdf" >>> } >>> >>> # Create page_ocr row >>> page_ocr_row = { >>> "page_ocr_id": page_ocr_id, >>> "v_note_id": v_note_id, >>> "page_number": box_info['page'] >>> } >>> >>> # Create class_prediction row >>> class_prediction_row = { >>> "class_prediction_id": class_prediction_id, >>> "page_ocr_id": page_ocr_id, >>> "xmin": box_info['x_min'], >>> "ymin": box_info['y_min'], >>> "xmax": box_info['x_max'], >>> "ymax": box_info['y_max'], >>> "class": box_info['class_name'] if >>> box_info['class_name'] else str(box_info['class']), >>> "confidence": box_info['probability'] >>> } >>> >>> # Return all three rows with table names >>> return [ >>> ('ocr_results', ocr_results_row), >>> ('page_ocr', page_ocr_row), >>> ('class_prediction', class_prediction_row) >>> ] >>> >>> except Exception as e: >>> logger.error(f"Error preparing for BigQuery: {str(e)}") >>> return [] >>> >>> model_handler = TensorRTEngineHandlerNumPy( >>> min_batch_size=1, >>> max_batch_size=1, >>> engine_path="gs://temp/yolov11l-doclaynet.engine", >>> ) >>> >>> >>> with beam.Pipeline(options=options) as pipeline: >>> >>> # Create PCollection from input URIs >>> pdf_uris = ( >>> pipeline >>> | "Create URIs" >> beam.Create(["tmp.pdf"]) >>> ) >>> >>> # Download PDFs >>> local_pdfs = ( >>> pdf_uris >>> | "Download PDFs" >> beam.ParDo(DownloadPDFFromGCS()) >>> ) >>> >>> # Load PDF pages >>> pdf_pages = ( >>> local_pdfs >>> | "Load PDF Pages" >> beam.ParDo(LoadPDFPages()) >>> #| "Flatten Pages" >> beam.FlatMap(lambda x: x) >>> ) >>> >>> # Preprocess images >>> preprocessed_pages = ( >>> pdf_pages >>> | "Preprocess Images" >> beam.ParDo(PreprocessImage()) >>> ) >>> inference_results = ( >>> preprocessed_pages >>> | "Run Inference" >> >>> RunInference(model_handler=model_handler) >>> ) >>> >>> On Tue, 16 Sept 2025 at 21:23, XQ Hu <[email protected]> wrote: >>> >>>> Can you share your commands and outputs? >>>> >>>> On Tue, Sep 16, 2025 at 9:02 PM Sai Shashank <[email protected]> >>>> wrote: >>>> >>>>> Okay I have changed the docker image but to now to RUN the python >>>>> command but it is still halting without are error or warnings or errors >>>>> >>>>> On Tue, 16 Sept 2025 at 17:38, XQ Hu via dev <[email protected]> >>>>> wrote: >>>>> >>>>>> The CMD is not necessary as it will be overridden by the ENTRYPOINT >>>>>> just like your comment. >>>>>> >>>>>> If you ssh to your Docker container like `docker run --rm -it >>>>>> --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE`, can you run python and >>>>>> some Beam pipelines with a direct runner in the container? This can help >>>>>> test the environment works fine. >>>>>> >>>>>> I have one old Dockerfile that used to work with the old Beam: >>>>>> https://github.com/google/dataflow-ml-starter/blob/main/tensor_rt.Dockerfile >>>>>> . >>>>>> >>>>>> On Tue, Sep 16, 2025 at 4:56 PM Sai Shashank < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> ---------- Forwarded message --------- >>>>>>> From: Sai Shashank <[email protected]> >>>>>>> Date: Tue, Sep 16, 2025 at 4:27 PM >>>>>>> Subject: TensorRT inference not starting >>>>>>> To: <[email protected]> >>>>>>> >>>>>>> >>>>>>> Hey Everyone, >>>>>>> I was trying to use tensorRT within the >>>>>>> apache beam on dataflow but somehow , dataflow didn't start like it did >>>>>>> not >>>>>>> even give me Worker logs. Below is the docker file that , use to create >>>>>>> a >>>>>>> custom image, at first I thought it is the version mismatched but >>>>>>> usually >>>>>>> it gives me a harness error . >>>>>>> >>>>>>> ARG BUILD_IMAGE=nvcr.io/nvidia/tensorrt:25.08-py3 >>>>>>> FROM ${BUILD_IMAGE} >>>>>>> ENV PATH="/usr/src/tensorrt/bin:${PATH}" >>>>>>> >>>>>>> WORKDIR /workspace >>>>>>> >>>>>>> RUN apt-get update -y && apt-get install -y python3-venv >>>>>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.67.0 >>>>>>> >>>>>>> COPY --from=apache/beam_python3.10_sdk:2.67.0 /opt/apache/beam >>>>>>> /opt/apache/beam >>>>>>> >>>>>>> # Install additional dependencies >>>>>>> RUN pip install --upgrade pip \ >>>>>>> && pip install torch \ >>>>>>> && pip install torchvision \ >>>>>>> && pip install pillow>=8.0.0 \ >>>>>>> && pip install transformers>=4.18.0 \ >>>>>>> && pip install cuda-python \ >>>>>>> && pip install opencv-python==4.7.0.72 \ >>>>>>> && pip install PyMuPDF==1.22.5 \ >>>>>>> && pip install requests==2.31.0 >>>>>>> >>>>>>> # Set the default command to run the inference script >>>>>>> # This will be overridden by the Apache Beam boot script >>>>>>> CMD ["python", "/workspace/inference.py"] >>>>>>> >>>>>>> # Use the Apache Beam boot script as the entrypoint >>>>>>> ENTRYPOINT ["/opt/apache/beam/boot"] >>>>>>> >>>>>>> >>>>>>>
