pnowojski commented on code in PR #23550:
URL: https://github.com/apache/flink/pull/23550#discussion_r1371854774
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -368,7 +373,8 @@ public void setStreamOperator(StreamOperator<?> operator) {
public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
if (factory != null) {
- toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
+ toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
+ config.setString(SERIALIZED_UDF_CLASS_NAME,
factory.getClass().getName());
Review Comment:
Maybe solution 3 would be also fine. If it fails due to class loader issues,
it would err on the safe side. 🤷
Solution 1 wouldn't be fragile in a sense, that it wouldn't allow for silent
errors. As I mentioned, if you moved the class name equality check into
`StreamConfig`'s method `boolean
StreamConfig#isSinkWriterOperatorFactory(Class<...> ...)`, then both the
equality check upon deserialization and the `checkState` during serialization
would reside "close to one another" in the `StreamConfig` class. `checkState`
would prevent from any silent issues.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -368,7 +373,8 @@ public void setStreamOperator(StreamOperator<?> operator) {
public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
if (factory != null) {
- toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
+ toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
+ config.setString(SERIALIZED_UDF_CLASS_NAME,
factory.getClass().getName());
Review Comment:
Maybe solution 3 would be also fine. If it fails due to class loader issues,
it would err on the safe side. 🤷
Solution 1 wouldn't be fragile in a sense, that it wouldn't allow for silent
errors. As I mentioned, if you moved the class name equality check into
`StreamConfig`'s method `boolean
StreamConfig#isSinkWriterOperatorFactory(Class<...> ...)`, then both the
equality check upon deserialization and the `checkState` during serialization
would reside "close to one another" in the `StreamConfig` class. `checkState`
would prevent from any silent issues.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]