看上去主要是 state 在heap中太大导致的, 建议可以切换为  RocksdbStatebackend

yj h <hyjcry...@gmail.com> 于2022年3月27日周日 17:07写道:

> 请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次
>
> 配置相关:
> 配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置
>
> 目前采取的排查步骤:
> 1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) 很快就oom掉了
> 2.采用了evictor
>                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
>                 .evictor(TimeEvictor.of(Time.seconds(0),true))
>
> 3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
>
> 2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大
>
> mat分析dump 内存占用较大的是
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
> org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
>
> 使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助
>
> 代码:
> //其中ShoppingRecords UserClickModel都是普通的bean对象
> inputStream
>                 .filter(data -> "pv".equals(data.getBehavior()))
>                 .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
> Long>>() {
>                     @Override
>                     public Tuple2<LocalDate, Long> getKey(ShoppingRecords
> value) throws Exception {
>                         Instant instant =
> Instant.ofEpochMilli(value.getTs());
>                         return Tuple2.of(
>                                 LocalDateTime.ofInstant(instant,
> ZoneId.of("Asia/Shanghai")).toLocalDate(),
>                                 value.getItemId()
>                         );
>                     }
>                 })
>                 .window(TumblingEventTimeWindows.of(Time.days(1)))
> //                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> //                .evictor(TimeEvictor.of(Time.seconds(0),true))
>
> .trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
>                 .process(new ProcessWindowFunctionBitMap())
> //                .addSink(new RedisSink<>(conf, new UvRedisSink()));
>                 .addSink(new PrintSinkFunction());
>
>  public static class ProcessWindowFunctionBitMap
>             extends ProcessWindowFunction<ShoppingRecords, UserClickModel,
> Tuple2<LocalDate, Long>, TimeWindow> {
>
>         private transient ValueState<Integer> pvState;
>         private transient ValueState<Roaring64NavigableMap> bitMapState;
>
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             super.open(parameters);
>             ValueStateDescriptor<Integer> pvStateDescriptor = new
> ValueStateDescriptor<>("pv", Integer.class);
>             ValueStateDescriptor<Roaring64NavigableMap>
> bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
>                     , TypeInformation.of(new
> TypeHint<Roaring64NavigableMap>() {
>             }));
> //            BlockingQueue
>             // 过期状态清除
>             StateTtlConfig stateTtlConfig = StateTtlConfig
>
> .newBuilder(org.apache.flink.api.common.time.Time.days(1))
>
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                     .build();
>             // 开启ttl
>             pvStateDescriptor.enableTimeToLive(stateTtlConfig);
>             bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
>
>             pvState = this.getRuntimeContext().getState(pvStateDescriptor);
>             bitMapState =
> this.getRuntimeContext().getState(bitMapStateDescriptor);
>
>         }
>
>         @Override
>         public void process(Tuple2<LocalDate, Long> key,
>                             ProcessWindowFunction<ShoppingRecords,
> UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
>                             Iterable<ShoppingRecords> elements,
> Collector<UserClickModel> out) throws Exception {
>             // 当前状态的pv uv
>             Integer pv = pvState.value();
>             Roaring64NavigableMap bitMap = bitMapState.value();
>             if (bitMap == null) {
>                 bitMap = new Roaring64NavigableMap();
>                 pv = 0;
>             }
>
>             Iterator<ShoppingRecords> iterator = elements.iterator();
>             while (iterator.hasNext()) {
>                 pv = pv + 1;
>                 long uid = iterator.next().getUser_id();
>                 //如果userId可以转成long
>                 bitMap.add(uid);
>             }
>
>             // 更新pv
>             pvState.update(pv);
>
>             UserClickModel UserClickModel = new UserClickModel();
>
>             UserClickModel.setDate(key.f0.toString());
>             UserClickModel.setProduct(key.f1);
>             UserClickModel.setPv(pv);
>             UserClickModel.setUv(bitMap.getIntCardinality());
>
>             out.collect(UserClickModel);
>         }
>     }
>

回复