Hi all
 i have an ADK agent running on CloudRun and wanted to invoke it via
dataflow
I have the following pipeline and DoFn but i am getting this exception

Anyone could advise?

Kind regards
Marco

EOF when reading a line [while running 'ClouodagentRun-ptransform-32']
Traceback (most recent call last): File "apache_beam/runners/common.py",
line 1498, in apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 685, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
"/template/shareloader/modules/obb_utils.py", line 726, in process return
runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
"/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return
self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in
run_until_complete return future.result() ^^^^^^^^^^^^^^^ File
"/template/shareloader/modules/obb_utils.py", line 713, in amain await
self.chat(client, self.SESSION_ID) File
"/template/shareloader/modules/obb_utils.py", line 690, in chat raise e
File "/template/shareloader/modules/obb_utils.py", line 683, in chat
user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
"/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread
return await loop.run_in_executor(None, func_call)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
"/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line

def run_gcloud_agent(pipeline, debugSink):
    from shareloader.modules.obb_utils import AsyncCloudRunAgent
    (pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical
analysis for today's stock picks and give me your recommendations"])
            | 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent())
             |  debugSink
            )


class AsyncCloudRunAgent(AsyncProcess):

    def __init__(self):
        # --- Configuration (Dynamic) ---
        self.APP_URL =
"https://stock-agent-service-682143946483.us-central1.run.app";
        self.USER_ID = "user_123"
        # Generate a single session ID for the entire conversation loop
        self.SESSION_ID = f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        self.APP_NAME = "stock_agent"



    # --- Authentication Function (ASYNC) ---

    async def get_auth_token(self) -> str:
        """
        Programmatically fetches an ID token for the Cloud Run service audience.
        'target_audience' should be the URL of your Cloud Run service.
        """
        # Run the synchronous google-auth call in a thread to keep it
async-friendly
        loop = asyncio.get_event_loop()

        def fetch_token():
            auth_req = google.auth.transport.requests.Request()
            # This automatically uses the Dataflow Worker Service Account
            return id_token.fetch_id_token(auth_req, self.APP_URL)

        try:
            token = await loop.run_in_executor(None, fetch_token)
            return token
        except Exception as e:
            raise RuntimeError(f"Failed to fetch ID token: {e}")
    # --- API Interaction Functions (ASYNC) ---

    async def make_request(self, client: httpx.AsyncClient, method:
str, endpoint: str, data: Dict[str, Any] = None) -> httpx.Response:
        """Helper function for authenticated asynchronous requests
using httpx."""
        token = await self.get_auth_token()
        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }
        url = f"{self.APP_URL}{endpoint}"

        try:
            if method.upper() == 'POST':
                response = await client.post(url, headers=headers, json=data)
            elif method.upper() == 'DELETE':
                response = await client.delete(url, headers=headers)
            else:
                raise ValueError(f"Unsupported method: {method}")

            response.raise_for_status()
            return response
        except httpx.HTTPStatusError as errh:
            print(f"\nāŒ **HTTP ERROR:** Status {response.status_code}
for {url}")
            print(f"āŒ **Server Response (Raw):**\n{response.text}")
            raise
        except httpx.RequestError as err:
            print(f"\nāŒ An unexpected request error occurred: {err}")
            raise

    async def run_agent_request(self, client: httpx.AsyncClient,
session_id: str, message: str):
        """Executes a single POST request to the /run_sse endpoint."""

        print(f"\n[User] -> Sending message: '{message}'")

        run_data = {
            "app_name": self.APP_NAME,
            "user_id": self.USER_ID,
            "session_id": session_id,
            "new_message": {"role": "user", "parts": [{"text": message}]},
            "streaming": False
        }

        try:
            response = await self.make_request(client, "POST",
"/run_sse", data=run_data)
            current_status = response.status_code
            # print(f"**Request Status Code:** {current_status}")

            raw_text = response.text.strip()

            # Multi-line SSE parsing logic
            data_lines = [
                line.strip()
                for line in raw_text.split('\n')
                if line.strip().startswith("data:")
            ]

            if not data_lines:
                raise json.JSONDecodeError("No 'data:' lines found in
200 response.", raw_text, 0)

            last_data_line = data_lines[-1]
            json_payload = last_data_line[len("data:"):].strip()
            agent_response = json.loads(json_payload)

            # Extract the final text
            final_text = agent_response.get('content',
{}).get('parts', [{}])[0].get('text', 'Agent response structure not
recognized.')

            print(f"[Agent] -> {final_text}")

        except json.JSONDecodeError as e:
            print(f"\n🚨 **JSON PARSING FAILED**!")
            print(f"   Error: {e}")
            print("   --- RAW SERVER CONTENT ---")
            print(raw_text)
            print("   --------------------------")

        except Exception as e:
            print(f"āŒ Agent request failed: {e}")

    # --- Interactive Chat Loop ---

    async def chat(self, client: httpx.AsyncClient, session_id: str):
        """Runs the main conversation loop, handling user input
asynchronously."""
        print("--- šŸ’¬ Start Chatting ---")

        try:
            # Use asyncio.to_thread to run blocking input() without
freezing the event loop
            user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")

            # Send the message to the agent
            await self.run_agent_request(client, session_id, user_input)

        except Exception as e:
            print(f"An unexpected error occurred in the loop: {e}")
            raise e

    # --- Main Logic (ASYNC) ---

    async def amain(self, element):
        """Main asynchronous function to set up the session and start
the loop."""
        print(f"\nšŸ¤– Starting Interactive Client with Session ID:
**{self.SESSION_ID}**")
        session_data = {"state": {"preferred_language": "English",
"visit_count": 5}}
        current_session_endpoint =
f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}"

        # httpx.AsyncClient is used as a context manager to manage connections
        async with httpx.AsyncClient(timeout=30.0) as client:

            # 1. Create Session
            print("\n## 1. Creating Session")
            try:
                await self.make_request(client, "POST",
current_session_endpoint, data=session_data)
                print(f"āœ… Session created successfully. Status 200.")
            except Exception as e:
                print(f"āŒ Could not start session: {e}")
                return

            # 2. Start the Interactive Loop
            await self.chat(client, self.SESSION_ID)

            # 3. Cleanup: Delete Session (Best Practice)
            print(f"\n## 3. Deleting Session: {self.SESSION_ID}")
            try:
                await self.make_request(client, "DELETE",
current_session_endpoint)
                print("āœ… Session deleted successfully.")
            except Exception as e:
                print(f"āš ļø Warning: Failed to delete session. {e}")

    def process(self, element: str):
        logging.info(f'Input elements:{element}')
        with asyncio.Runner() as runner:
            return runner.run(self.amain(element))

Reply via email to