Hi,
I'm sending this out to get some feedback on proposed fixes for
https://github.com/apache/beam/issues/34705 since it would be good to be
consistent across SDKs and would affect the beam FnApi.

Currently StateReponse (code
<https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L819>)
contains an error string field to communicate errors populating a
response.  When a state response contains an error, the error is generally
raised as an exception that stops bundle processing and is logged.  Java
sdk code
<https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java#L191>
Python sdk code
<http://google3/third_party/py/apache_beam/runners/worker/sdk_worker.py>

However there are cases where such errors are expected if initiated by the
runner. For example, runners may be performing load-balancing or hedging
and wish to cancel processing that is no longer valid or necessary.  In
such cases the current logging is excessive and concerning to users.

*Proposal 1:*
Add a typed exception ProcessingCancelledException to sdks instead of using
generic exception types, ie IllegalStateException/RuntimeException.  This
exception can be handled and logged differently than other exceptions
encountered during processing. Prior art is the Cloud Dataflow runner use
of KeyTokenInvalidException
<https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/KeyTokenInvalidException.java>.
If we add explicit cancellation of processing in the fnapi in the future we
could reuse the same exception.

This still doesn't provide a distinction between a state response error
that occurred due to an sdk bug (such as an illegal request) or when the
runner wants to cancel processing.  To address this we could modify the
FnApi StateResponse to include additional information to distinguish.

*Proposal 2:*
Add a boolean indicating if the error is runner_initiated to state
response.  This is nice in that the new field is safe to be unknown and
ignored by old sdks.

message StateResponse {
   string error;
   // If true, error should be non-empty but is expected due to the runner.
   // Sdks may choose to skip or reduce logging severity of error.
   boolean cancelled;
   ...
}

Instead of a boolean we could use an enum which might be more extensible
but I'm unsure what additional states we would want to add in the future so
it seems premature.

I'd appreciate any feedback or concerns.

Thanks,
Sam

Reply via email to