Hi all
i have an ADK agent running on CloudRun and wanted to invoke it via
dataflow
I have the following pipeline and DoFn but i am getting this exception
Anyone could advise?
Kind regards
Marco
EOF when reading a line [while running 'ClouodagentRun-ptransform-32']
Traceback (most recent call last): File "apache_beam/runners/common.py",
line 1498, in apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 685, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
"/template/shareloader/modules/obb_utils.py", line 726, in process return
runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
"/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return
self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in
run_until_complete return future.result() ^^^^^^^^^^^^^^^ File
"/template/shareloader/modules/obb_utils.py", line 713, in amain await
self.chat(client, self.SESSION_ID) File
"/template/shareloader/modules/obb_utils.py", line 690, in chat raise e
File "/template/shareloader/modules/obb_utils.py", line 683, in chat
user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
"/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread
return await loop.run_in_executor(None, func_call)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
"/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line
def run_gcloud_agent(pipeline, debugSink):
from shareloader.modules.obb_utils import AsyncCloudRunAgent
(pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical
analysis for today's stock picks and give me your recommendations"])
| 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent())
| debugSink
)
class AsyncCloudRunAgent(AsyncProcess):
def __init__(self):
# --- Configuration (Dynamic) ---
self.APP_URL =
"https://stock-agent-service-682143946483.us-central1.run.app"
self.USER_ID = "user_123"
# Generate a single session ID for the entire conversation loop
self.SESSION_ID = f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}"
self.APP_NAME = "stock_agent"
# --- Authentication Function (ASYNC) ---
async def get_auth_token(self) -> str:
"""
Programmatically fetches an ID token for the Cloud Run service audience.
'target_audience' should be the URL of your Cloud Run service.
"""
# Run the synchronous google-auth call in a thread to keep it
async-friendly
loop = asyncio.get_event_loop()
def fetch_token():
auth_req = google.auth.transport.requests.Request()
# This automatically uses the Dataflow Worker Service Account
return id_token.fetch_id_token(auth_req, self.APP_URL)
try:
token = await loop.run_in_executor(None, fetch_token)
return token
except Exception as e:
raise RuntimeError(f"Failed to fetch ID token: {e}")
# --- API Interaction Functions (ASYNC) ---
async def make_request(self, client: httpx.AsyncClient, method:
str, endpoint: str, data: Dict[str, Any] = None) -> httpx.Response:
"""Helper function for authenticated asynchronous requests
using httpx."""
token = await self.get_auth_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
url = f"{self.APP_URL}{endpoint}"
try:
if method.upper() == 'POST':
response = await client.post(url, headers=headers, json=data)
elif method.upper() == 'DELETE':
response = await client.delete(url, headers=headers)
else:
raise ValueError(f"Unsupported method: {method}")
response.raise_for_status()
return response
except httpx.HTTPStatusError as errh:
print(f"\nā **HTTP ERROR:** Status {response.status_code}
for {url}")
print(f"ā **Server Response (Raw):**\n{response.text}")
raise
except httpx.RequestError as err:
print(f"\nā An unexpected request error occurred: {err}")
raise
async def run_agent_request(self, client: httpx.AsyncClient,
session_id: str, message: str):
"""Executes a single POST request to the /run_sse endpoint."""
print(f"\n[User] -> Sending message: '{message}'")
run_data = {
"app_name": self.APP_NAME,
"user_id": self.USER_ID,
"session_id": session_id,
"new_message": {"role": "user", "parts": [{"text": message}]},
"streaming": False
}
try:
response = await self.make_request(client, "POST",
"/run_sse", data=run_data)
current_status = response.status_code
# print(f"**Request Status Code:** {current_status}")
raw_text = response.text.strip()
# Multi-line SSE parsing logic
data_lines = [
line.strip()
for line in raw_text.split('\n')
if line.strip().startswith("data:")
]
if not data_lines:
raise json.JSONDecodeError("No 'data:' lines found in
200 response.", raw_text, 0)
last_data_line = data_lines[-1]
json_payload = last_data_line[len("data:"):].strip()
agent_response = json.loads(json_payload)
# Extract the final text
final_text = agent_response.get('content',
{}).get('parts', [{}])[0].get('text', 'Agent response structure not
recognized.')
print(f"[Agent] -> {final_text}")
except json.JSONDecodeError as e:
print(f"\nšØ **JSON PARSING FAILED**!")
print(f" Error: {e}")
print(" --- RAW SERVER CONTENT ---")
print(raw_text)
print(" --------------------------")
except Exception as e:
print(f"ā Agent request failed: {e}")
# --- Interactive Chat Loop ---
async def chat(self, client: httpx.AsyncClient, session_id: str):
"""Runs the main conversation loop, handling user input
asynchronously."""
print("--- š¬ Start Chatting ---")
try:
# Use asyncio.to_thread to run blocking input() without
freezing the event loop
user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")
# Send the message to the agent
await self.run_agent_request(client, session_id, user_input)
except Exception as e:
print(f"An unexpected error occurred in the loop: {e}")
raise e
# --- Main Logic (ASYNC) ---
async def amain(self, element):
"""Main asynchronous function to set up the session and start
the loop."""
print(f"\nš¤ Starting Interactive Client with Session ID:
**{self.SESSION_ID}**")
session_data = {"state": {"preferred_language": "English",
"visit_count": 5}}
current_session_endpoint =
f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}"
# httpx.AsyncClient is used as a context manager to manage connections
async with httpx.AsyncClient(timeout=30.0) as client:
# 1. Create Session
print("\n## 1. Creating Session")
try:
await self.make_request(client, "POST",
current_session_endpoint, data=session_data)
print(f"ā
Session created successfully. Status 200.")
except Exception as e:
print(f"ā Could not start session: {e}")
return
# 2. Start the Interactive Loop
await self.chat(client, self.SESSION_ID)
# 3. Cleanup: Delete Session (Best Practice)
print(f"\n## 3. Deleting Session: {self.SESSION_ID}")
try:
await self.make_request(client, "DELETE",
current_session_endpoint)
print("ā
Session deleted successfully.")
except Exception as e:
print(f"ā ļø Warning: Failed to delete session. {e}")
def process(self, element: str):
logging.info(f'Input elements:{element}')
with asyncio.Runner() as runner:
return runner.run(self.amain(element))