[ 
https://issues.apache.org/jira/browse/FLINK-37545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grzegorz Liter updated FLINK-37545:
-----------------------------------
    Description: 
When using MetricGroup to expose custom metrics in Watermark Generator

Example:
```
WatermarkStrategy
                .forGenerator(ctx ->
                        new CustomWatermarkStrategy(ctx.getMetricGroup()))
                .withTimestampAssigner((event, timestamp) -> ...);
```

StackOverflowError will be thrown.

This is due to changes done in https://issues.apache.org/jira/browse/FLINK-35886

Previously code in 
`org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#open`
 look like this:
```
        watermarkGenerator =
                emitProgressiveWatermarks
                        ? 
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                        : new NoWatermarksGenerator<>();
```

here in the `this::getMetricGroup` `this` refers to parent class instance

now it looks like this
```
watermarkGenerator =
                emitProgressiveWatermarks
                        ? watermarkStrategy.createWatermarkGenerator(
                                new WatermarkGeneratorSupplier.Context() {
                                    @Override
                                    public MetricGroup getMetricGroup() {
                                        return this.getMetricGroup();
                                    }

                                    @Override
                                    public RelativeClock 
getInputActivityClock() {
                                        return inputActivityClock;
                                    }
                                })
                        : new NoWatermarksGenerator<>();
```

where `this` reference to anonymous class instance and causes infinite 
recursive loop

  was:
When using MetricGroup to expose custom metrics in Watermark Generator

Example:
```
{{WatermarkStrategy
                .forGenerator(ctx ->
                        new CustomWatermarkStrategy(ctx.getMetricGroup()))
                .withTimestampAssigner((event, timestamp) -> ...);}}
```

StackOverflowError will be thrown.

This is due to changes done in https://issues.apache.org/jira/browse/FLINK-35886

Previously code in 
`org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#open`
 look like this:
```
        watermarkGenerator =
                emitProgressiveWatermarks
                        ? 
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
                        : new NoWatermarksGenerator<>();
```

here in the `this::getMetricGroup` `this` refers to parent class instance

now it looks like this
```
watermarkGenerator =
                emitProgressiveWatermarks
                        ? watermarkStrategy.createWatermarkGenerator(
                                new WatermarkGeneratorSupplier.Context() {
                                    @Override
                                    public MetricGroup getMetricGroup() {
                                        return this.getMetricGroup();
                                    }

                                    @Override
                                    public RelativeClock 
getInputActivityClock() {
                                        return inputActivityClock;
                                    }
                                })
                        : new NoWatermarksGenerator<>();
```

where `this` reference to anonymous class instance and causes infinite 
recursive loop


> StackOverflowError when using MetricGroup in custom WatermarkStrategy
> ---------------------------------------------------------------------
>
>                 Key: FLINK-37545
>                 URL: https://issues.apache.org/jira/browse/FLINK-37545
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.19.2, 1.20.1
>            Reporter: Grzegorz Liter
>            Priority: Critical
>
> When using MetricGroup to expose custom metrics in Watermark Generator
> Example:
> ```
> WatermarkStrategy
>                 .forGenerator(ctx ->
>                         new CustomWatermarkStrategy(ctx.getMetricGroup()))
>                 .withTimestampAssigner((event, timestamp) -> ...);
> ```
> StackOverflowError will be thrown.
> This is due to changes done in 
> https://issues.apache.org/jira/browse/FLINK-35886
> Previously code in 
> `org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#open`
>  look like this:
> ```
>         watermarkGenerator =
>                 emitProgressiveWatermarks
>                         ? 
> watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
>                         : new NoWatermarksGenerator<>();
> ```
> here in the `this::getMetricGroup` `this` refers to parent class instance
> now it looks like this
> ```
> watermarkGenerator =
>                 emitProgressiveWatermarks
>                         ? watermarkStrategy.createWatermarkGenerator(
>                                 new WatermarkGeneratorSupplier.Context() {
>                                     @Override
>                                     public MetricGroup getMetricGroup() {
>                                         return this.getMetricGroup();
>                                     }
>                                     @Override
>                                     public RelativeClock 
> getInputActivityClock() {
>                                         return inputActivityClock;
>                                     }
>                                 })
>                         : new NoWatermarksGenerator<>();
> ```
> where `this` reference to anonymous class instance and causes infinite 
> recursive loop



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to