[ 
https://issues.apache.org/jira/browse/FLINK-37545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grzegorz Liter updated FLINK-37545:
-----------------------------------
    Description: 
When using MetricGroup to expose custom metrics in Watermark Generator

Example:
{code:java}
WatermarkStrategy
                .forGenerator(ctx ->
                        new CustomWatermarkStrategy(ctx.getMetricGroup()))
                .withTimestampAssigner((event, timestamp) -> ...);
{code}
StackOverflowError will be thrown.

This is due to changes done in https://issues.apache.org/jira/browse/FLINK-35886

Previously code in 
`org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#open`
 look like this:
{code:java}
        watermarkGenerator =
                emitProgressiveWatermarks
                        ? 
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                        : new NoWatermarksGenerator<>();
{code}
here in the `this::getMetricGroup` `this` refers to parent class instance

now it looks like this
{code:java}
watermarkGenerator =
                emitProgressiveWatermarks
                        ? watermarkStrategy.createWatermarkGenerator(
                                new WatermarkGeneratorSupplier.Context() {
                                    @Override
                                    public MetricGroup getMetricGroup() {
                                        return this.getMetricGroup();
                                    }

                                    @Override
                                    public RelativeClock 
getInputActivityClock() {
                                        return inputActivityClock;
                                    }
                                })
                        : new NoWatermarksGenerator<>();
{code}
where `this` reference to anonymous class instance and causes infinite 
recursive loop

  was:
When using MetricGroup to expose custom metrics in Watermark Generator

Example:

{code:java}
WatermarkStrategy
                .forGenerator(ctx ->
                        new CustomWatermarkStrategy(ctx.getMetricGroup()))
                .withTimestampAssigner((event, timestamp) -> ...);
{code}

StackOverflowError will be thrown.

This is due to changes done in https://issues.apache.org/jira/browse/FLINK-35886

Previously code in 
`org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#open`
 look like this:
{code:java}
        watermarkGenerator =
                emitProgressiveWatermarks
                        ? 
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                        : new NoWatermarksGenerator<>();
{code}

here in the `this::getMetricGroup` `this` refers to parent class instance

now it looks like this
{code:java}
watermarkGenerator =
                emitProgressiveWatermarks
                        ? watermarkStrategy.createWatermarkGenerator(
                                new WatermarkGeneratorSupplier.Context() {
                                    @Override
                                    public MetricGroup getMetricGroup() {
                                        return this.getMetricGroup();
                                    }

                                    @Override
                                    public RelativeClock 
getInputActivityClock() {
                                        return inputActivityClock;
                                    }
                                })
                        : new NoWatermarksGenerator<>();
{code}

where `this` reference to anonymous class instance and causes infinite 
recursive loop


> StackOverflowError when using MetricGroup in custom WatermarkStrategy
> ---------------------------------------------------------------------
>
>                 Key: FLINK-37545
>                 URL: https://issues.apache.org/jira/browse/FLINK-37545
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.19.2, 1.20.1
>            Reporter: Grzegorz Liter
>            Priority: Critical
>
> When using MetricGroup to expose custom metrics in Watermark Generator
> Example:
> {code:java}
> WatermarkStrategy
>                 .forGenerator(ctx ->
>                         new CustomWatermarkStrategy(ctx.getMetricGroup()))
>                 .withTimestampAssigner((event, timestamp) -> ...);
> {code}
> StackOverflowError will be thrown.
> This is due to changes done in 
> https://issues.apache.org/jira/browse/FLINK-35886
> Previously code in 
> `org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#open`
>  look like this:
> {code:java}
>         watermarkGenerator =
>                 emitProgressiveWatermarks
>                         ? 
> watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
>                         : new NoWatermarksGenerator<>();
> {code}
> here in the `this::getMetricGroup` `this` refers to parent class instance
> now it looks like this
> {code:java}
> watermarkGenerator =
>                 emitProgressiveWatermarks
>                         ? watermarkStrategy.createWatermarkGenerator(
>                                 new WatermarkGeneratorSupplier.Context() {
>                                     @Override
>                                     public MetricGroup getMetricGroup() {
>                                         return this.getMetricGroup();
>                                     }
>                                     @Override
>                                     public RelativeClock 
> getInputActivityClock() {
>                                         return inputActivityClock;
>                                     }
>                                 })
>                         : new NoWatermarksGenerator<>();
> {code}
> where `this` reference to anonymous class instance and causes infinite 
> recursive loop



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to