[ https://issues.apache.org/jira/browse/FLINK-13245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16888788#comment-16888788 ]
zhijiang commented on FLINK-13245: ---------------------------------- Thanks for these further thoughts and I think it would make things more clearly after discussion. [~azagrebin] I could understand your above concerns. I agree that the current semantics of `CancelPartitionRequest/CloseRequest` are not very accurate, because they could indicate either successful consumption on consumer side or consumer task fails /any exceptions during consumption. Considering the concern of when to call `notifySubpartitionConsumed`, I think the current implementation is based on whether the producer receives the confirmable notification(Cancel/CloseRequest) from consumer side. If it receives any messages then it would call `notifySubpartitionConsumed` no matter with consumer finishes/fails. In the case of handling channel exception, it only happens in consumer locally, so it would not call `notifySubpartitionConsumed`. Also for the case of channel inactive, the producer could not distinguish whether it is caused by initiative close connection on consumer side or TM lost exceptionally, so it would not call ` notifySubpartitionConsumed`. For the first concern, we could provide the more definitely messages for clearly semantics of consumption successful/failed instead of current `Cancel/ClosePartition`. For the second concern we could also consider the proper way for handling messages/exception/inactive. But one precondition is that we should confirm the specific semantic of releasing partition based on consumption in partition management feature. Currently there are three strategies which could release partition: * partition release based on consumers confirmation via network * partition release based on JM notification * partition release when disconnection between TM/JM For the first strategy (partition release based on consumers confirmation): * We could define the network message as `ReleasePartition` instead of current `Cancel/ClosePartition`. Then it might not care about whether the consumer finishes/fails during consumption. The precondition for this way is reliable network notification, but actually we have no ack mechanism for such message in application layer. Even if consumer task fails before establishing the connection with producer, we still need rely on JM notification of releasing partitions of producers. * We could not provide specific semantic as now, and the current strategy is only coupling with existing mechanism in network stack. * The semantic actually could be defined clearly as partition release based on one successful consumption. And considering the implementation it could be done by both consumer notification and JM notification. In general I think we should consider how to define different release strategies which should provide specific semantics, and not caring about implementations when thinking about strategy. Actually any strategy could be implemented in multiple ways. E.g. the semantics might be divided into at-least once consumption, exactly-once consumption and at-most once consumption. After we confirm the specific semantics, then we would know how to refactor the current network stack considering implementation for certain strategy. It might need worth further re-architecture the partition release strategy in release-1.10, because the feature of interactive queries is difficult to expand another strategy based on current architecture. In order not to block current release, the existing modifications could solve the file leak issue I think. > Network stack is leaking files > ------------------------------ > > Key: FLINK-13245 > URL: https://issues.apache.org/jira/browse/FLINK-13245 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.9.0 > Reporter: Chesnay Schepler > Assignee: zhijiang > Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > There's file leak in the network stack / shuffle service. > When running the {{SlotCountExceedingParallelismTest}} on Windows a large > number of {{.channel}} files continue to reside in a > {{flink-netty-shuffle-XXX}} directory. > From what I've gathered so far these files are still being used by a > {{BoundedBlockingSubpartition}}. The cleanup logic in this class uses > ref-counting to ensure we don't release data while a reader is still present. > However, at the end of the job this count has not reached 0, and thus nothing > is being released. > The same issue is also present on the {{ResultPartition}} level; the > {{ReleaseOnConsumptionResultPartition}} also are being released while the > ref-count is greater than 0. > Overall it appears like there's some issue with the notifications for > partitions being consumed. > It is feasible that this issue has recently caused issues on Travis where the > build were failing due to a lack of disk space. -- This message was sent by Atlassian JIRA (v7.6.14#76016)