pnowojski commented on code in PR #23550:
URL: https://github.com/apache/flink/pull/23550#discussion_r1369745726
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -368,7 +373,8 @@ public void setStreamOperator(StreamOperator<?> operator) {
public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
if (factory != null) {
- toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
+ toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
+ config.setString(SERIALIZED_UDF_CLASS_NAME,
factory.getClass().getName());
Review Comment:
I've missed that it's a final class, thanks for pointing this out :) In that
case, I think my only issue with this code is as you said, that it's fragile.
If someone makes `SinkWriterOperatorFactory` non final and implement a subclass
in one place of the system, suddenly completely different part of the code can
fail, maybe even silently.
However this could be protected via some `checkState()` in the
`setStreamOperatorFactory`, that would asserting that factory is not a subclass
of `SinkWriterOperatorFactory`.
As this would be a bit hacky, I would move:
```
SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName)
```
check, into the `boolean StreamConfig#isSinkWriterOperatorFactory(Class<...>
...)` method. It doesn't fit there very well, BUT at least it would justify why
we have the `checkState` in the `StreamConfig#setStreamOperatorFactory`.
Or maybe:
```
toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
config.setString(IS_INSTANCE_OF_SinkWriterOperatorFactory, factory instance
of SinkWriterOperatorFactory);
```
would be better/cleaner. Either one is fine for me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]