Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5447#discussion_r167633643 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -591,16 +591,28 @@ public void collect(StreamRecord<T> record) { operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { - // Enrich error message - ClassCastException replace = new ClassCastException( - String.format( - "%s. Failed to push OutputTag with id '%s' to operator. " + - "This can occur when multiple OutputTags with different types " + - "but identical names are being used.", - e.getMessage(), - outputTag.getId())); - - throw new ExceptionInChainedOperatorException(replace); + ClassCastException replace; + if (outputTag != null) { + // Enrich error message + replace = new ClassCastException( + String.format( + "%s. Failed to push OutputTag with id '%s' to operator. " + + "This can occur when multiple OutputTags with different types " + + "but identical names are being used.", + e.getMessage(), + outputTag.getId())); + + throw new ExceptionInChainedOperatorException(replace); + } else { + replace = new ClassCastException( + String.format( + "%s. Failed to push OutputTag with id '%s' to operator. " + --- End diff -- Please properly read my comments. Jut replace the else block with `throw e`. If the OutputTag is null there's no point in modifying the error message.
---