[ https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lu updated FLINK-30131: ----------------------- Attachment: image-2022-11-24-17-10-45-651.png > flink iterate will suspend when record is a bit large > ----------------------------------------------------- > > Key: FLINK-30131 > URL: https://issues.apache.org/jira/browse/FLINK-30131 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.15.2 > Reporter: Lu > Priority: Major > Attachments: image-2022-11-22-14-59-08-272.png, > image-2022-11-24-17-10-45-651.png > > > > {code:java} > //代码占位符 > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 10000000); > configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, > MemorySize.parse("4g")); > configuration.setInteger("taskmanager.network.memory.buffers-per-channel", > 10000000); > configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate", > 10000000); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > env.setParallelism(1); > List<Integer> list = new ArrayList<>(10); > for (int i = 1; i < 10000; i++) { > list.add(i); > } > DataStreamSource<Integer> integerDataStreamSource = env.fromCollection(list); > DataStream<byte[]> map = integerDataStreamSource.map(i -> new > byte[10000000]).setParallelism(1).name("map to byte[]").shuffle(); > IterativeStream<byte[]> iterate = map.iterate(); > DataStream<byte[]> map1 = iterate.process(new ProcessFunction<byte[], > byte[]>() { > @Override > public void processElement(byte[] value, ProcessFunction<byte[], > byte[]>.Context ctx, Collector<byte[]> out) throws Exception { > out.collect(value); > } > }).name("multi collect"); > DataStream<byte[]> filter = map1.filter(i -> true > ).setParallelism(1).name("feedback"); > iterate.closeWith(filter); > map1.map(bytes -> bytes.length).name("map to length").print(); > env.execute(); {code} > my code is above. > > when i use iterate with big record , the iterate will suspend at a random > place. when i saw the stack, it has a suspicious thread > !image-2022-11-22-14-59-08-272.png|width=751,height=328! > it seems like a network related problem. so i increse the network buffer > memory and num. but it only delay the suspend point, it will still suspend > after iterate a little more times than before. > i want to know if this is a bug or i have some error in my code or > configuration. > looking forward to your reply. thanks in advance. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)