1996fanrui commented on code in PR #23597:
URL: https://github.com/apache/flink/pull/23597#discussion_r1372830355
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -641,16 +641,26 @@ private Map<StreamConfig.SourceInputConfig,
ChainedSource> createChainedSources(
@Nullable
private Counter getOperatorRecordsOutCounter(
StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
- String streamOperatorFactoryClassName =
operatorConfig.getStreamOperatorFactoryClassName();
+ ClassLoader userCodeClassloader =
containingTask.getUserCodeClassLoader();
+ Class<StreamOperatorFactory<?>> streamOperatorFactoryClass =
+
operatorConfig.getStreamOperatorFactoryClass(userCodeClassloader);
+
// Do not use the numRecordsOut counter on output if this operator is
SinkWriterOperator.
//
// Metric "numRecordsOut" is defined as the total number of records
written to the
// external system in FLIP-33, but this metric is occupied in
AbstractStreamOperator as the
// number of records sent to downstream operators, which is number of
Committable batches
// sent to SinkCommitter. So we skip registering this metric on output
and leave this metric
// to sink writer implementations to report.
- if
(SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName))
{
- return null;
+ try {
+ Class<?> sinkWriterFactoryClass =
+
userCodeClassloader.loadClass(SinkWriterOperatorFactory.class.getName());
+ if
(sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)) {
Review Comment:
> Thanks for the update! Have you finally decided to go with the solution 3
instead of solution 1?
Hi @pnowojski , as we discussed at this comment!
I didn't use the solution3 before, because I'm worried about the
`SinkWriterOperatorFactory` and `streamOperatorFactoryClass` are loaded by
different classloader. If they are loaded by different classloader,
`sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)` cannot be
determined accurately whether streamOperatorFactoryClass is a subclass of
sinkWriterFactoryClass.
However, after I double-check, I found they are loaded by the same
classloader(`containingTask.getUserCodeClassLoader()`), so my worries are
unnecessary, solution3 can work.
The `configuration.getStreamOperatorFactory(userCodeClassloader);` also use
the `userCodeClassloader` in OperatorChain.
Above is why solution3 can work.
-----------------------------------------
Following is the difference between solutions 1 and 3:
- Solution3 can work directly even if adding new sub-class of
`SinkWriterOperatorFactory`, because
`sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)` can
support it.
- For Solution1, developer must handle it if he want to add a subclass of
`SinkWriterOperatorFactory`.
- And we need to explain some backgrounds in the comments, why throw an
exception when pass a subclass of `SinkWriterOperatorFactory`.
BTW, if I change this line from `if
(sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)) {` to `if
(sinkWriterFactoryClass.equals(streamOperatorFactoryClass)) {`, it will be
solution1.
Based on these differences, solution3 is easy to understand and maintain,
WDYT?
Please correct me if my understanding is wrong, thanks~
[1]
https://github.com/apache/flink/blob/3ff225c5f993282d6dfc7726fc08cc00058d9a7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L169
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]