[ 
https://issues.apache.org/jira/browse/FLINK-28747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816124#comment-17816124
 ] 

Nathan Taylor Armstrong Lewis commented on FLINK-28747:
-------------------------------------------------------

In Protobuf 3, there is an `optional` label. An unset field could then be 
distinguished from a field that was set to the default value.

Would adding {{optional}} to 
https://github.com/apache/flink-statefun/blob/accd75ea0109845c4b4c0ddd74021147af1439d4/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto#L28
 be enough to provide the SDKs with a way to distinguish between a valid empty 
string key vs. an invalid unset key? I'm guessing there would have to be other 
changes elsewhere since that file is for the egress and I don't see any 
equivalent protobuf file for kafka ingress messages.

> "target_id can not be missing" in HTTP statefun request
> -------------------------------------------------------
>
>                 Key: FLINK-28747
>                 URL: https://issues.apache.org/jira/browse/FLINK-28747
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-3.0.0, statefun-3.2.0, statefun-3.1.1
>            Reporter: Stephan Weinwurm
>            Priority: Major
>
> Hi all,
> We've suddenly started to see the following exception in our HTTP statefun 
> functions endpoints:
> {code}Traceback (most recent call last):
>   File 
> "/src/.venv/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py", 
> line 403, in run_asgi
>     result = await app(self.scope, self.receive, self.send)
>   File 
> "/src/.venv/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", 
> line 78, in __call__
>     return await self.app(scope, receive, send)
>   File "/src/worker/baseplate_asgi/asgi/baseplate_asgi_middleware.py", line 
> 37, in __call__
>     await span_processor.execute()
>   File "/src/worker/baseplate_asgi/asgi/asgi_http_span_processor.py", line 
> 61, in execute
>     raise e
>   File "/src/worker/baseplate_asgi/asgi/asgi_http_span_processor.py", line 
> 57, in execute
>     await self.app(self.scope, self.receive, self.send)
>   File "/src/.venv/lib/python3.9/site-packages/starlette/applications.py", 
> line 124, in __call__
>     await self.middleware_stack(scope, receive, send)
>   File 
> "/src/.venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 
> 184, in __call__
>     raise exc
>   File 
> "/src/.venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 
> 162, in __call__
>     await self.app(scope, receive, _send)
>   File 
> "/src/.venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py", 
> line 75, in __call__
>     raise exc
>   File 
> "/src/.venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py", 
> line 64, in __call__
>     await self.app(scope, receive, sender)
>   File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line 
> 680, in __call__
>     await route.handle(scope, receive, send)
>   File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line 
> 275, in handle
>     await self.app(scope, receive, send)
>   File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line 
> 65, in app
>     response = await func(request)
>   File "/src/worker/baseplate_statefun/server/asgi/make_statefun_handler.py", 
> line 25, in statefun_handler
>     result = await handler.handle_async(request_body)
>   File "/src/.venv/lib/python3.9/site-packages/statefun/request_reply_v3.py", 
> line 262, in handle_async
>     msg = Message(target_typename=sdk_address.typename, 
> target_id=sdk_address.id,
>   File "/src/.venv/lib/python3.9/site-packages/statefun/messages.py", line 
> 42, in __init__
>     raise ValueError("target_id can not be missing"){code}
> Interestingly, this has started to happen in three separate Flink deployments 
> at the very same time. The only thing in common between the three deployments 
> is that they consume the same Kafka topics.
> No deployments have happened when the issue started happening which was on 
> July 28th 3:05PM. We have since been continuously seeing the error.
> We were also able to extract the request that Flink sends to the HTTP 
> statefun endpoint:
> {code}{'invocation': {'target': {'namespace': 'com.x.dummy', 'type': 
> 'dummy'}, 'invocations': [{'argument': {'typename': 
> 'type.googleapis.com/v2_event.Event', 'has_value': True, 'value': 
> '-redicated-'}}]}}
> {code}
> As you can see, no `id` field is present in the `invocation.target` object or 
> the `target_id` was an empty string.
>  
> This is our module.yaml from one of the Flink deployments:
>  
> {code}
> version: "3.0"
> module:
> meta:
> type: remote
> spec:
> endpoints:
>  - endpoint:
> meta:
> kind: io.statefun.endpoints.v1/http
> spec:
> functions: com.x.dummy/dummy
> urlPathTemplate: [http://x-worker-dummy.x-functions:9090/statefun]
> timeouts:
> call: 2 min
> read: 2 min
> write: 2 min
> maxNumBatchRequests: 100
> ingresses:
>  - ingress:
> meta:
> type: io.statefun.kafka/ingress
> id: com.x/ingress
> spec:
> address: x-kafka-0.x.ue1.x.net:9092
> consumerGroupId: x-worker-dummy
> topics:
>  - topic: v2_post_events
> valueType: type.googleapis.com/v2_event.Event
> targets:
>  - com.x.dummy/dummy
> startupPosition:
> type: group-offsets
> autoOffsetResetPosition: earliest
> {code}
>  
> Can you please help us investigate as this is critically impacting our prod 
> setup?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to