Re: [DISCUSS] FLIP-224: Blacklist Mechanism
I do share the concern between blurring the lines a bit. That said, I'd prefer to not have any auto-detection and only have an opt-in mechanism to manually block processes/nodes. To me this sounds yet again like one of those magical mechanisms that will rarely work just right. An external system can leverage way more information after all. Moreover, I'm quite concerned about the complexity of this proposal. Tracking on both the RM/JM side; syncing between components; adjustments to the slot and resource protocol. In a way it seems overly complicated. If we look at it purely from an active resource management perspective, then there isn't really a need to touch the slot protocol at all (or in fact to anything in the JobMaster), because there isn't any point in keeping around blocked TMs in the first place. They'd just be idling, potentially shutting down after a while by the RM because of it (unless we _also_ touch that logic). Here the blocking of a process (be it by blocking the process or node) is equivalent with shutting down the blocked process(es). Once the block is lifted we can just spin it back up. And I do wonder whether we couldn't apply the same line of thinking to standalone resource management. Here being able to stop/restart a process/node manually should be a core requirement for a Flink deployment anyway. On 02/05/2022 08:49, Martijn Visser wrote: Hi everyone, Thanks for creating this FLIP. I can understand the problem and I see value in the automatic detection and blocklisting. I do have some concerns with the ability to manually specify to be blocked resources. I have two concerns; * Most organizations explicitly have a separation of concerns, meaning that there's a group who's responsible for managing a cluster and there's a user group who uses that cluster. With the introduction of this mechanism, the latter group now can influence the responsibility of the first group. So it can be possible that someone from the user group blocks something, which causes an outage (which could result in paging mechanism triggering etc) which impacts the first group. * How big is the group of people who can go through the process of manually identifying a node that isn't behaving as it should be? I do think this group is relatively limited. Does it then make sense to introduce such a feature, which would only be used by a really small user group of Flink? We still have to maintain, test and support such a feature. I'm +1 for the autodetection features, but I'm leaning towards not exposing this to the user group but having this available strictly for cluster operators. They could then also set up their paging/metrics/logging system to take this into account. Best regards, Martijn Visser https://twitter.com/MartijnVisser82 https://github.com/MartijnVisser On Fri, 29 Apr 2022 at 09:39, Yangze Guo wrote: Thanks for driving this, Zhu and Lijie. +1 for the overall proposal. Just share some cents here: - Why do we need to expose cluster.resource-blacklist.item.timeout-check-interval to the user? I think the semantics of `cluster.resource-blacklist.item.timeout` is sufficient for the user. How to guarantee the timeout mechanism is Flink's internal implementation. I think it will be very confusing and we do not need to expose it to users. - ResourceManager can notify the exception of a task manager to `BlacklistHandler` as well. For example, the slot allocation might fail in case the target task manager is busy or has a network jitter. I don't mean we need to cover this case in this version, but we can also open a `notifyException` in `ResourceManagerBlacklistHandler`. - Before we sync the blocklist to ResourceManager, will the slot of a blocked task manager continues to be released and allocated? Best, Yangze Guo On Thu, Apr 28, 2022 at 3:11 PM Lijie Wang wrote: Hi Konstantin, Thanks for your feedback. I will response your 4 remarks: 1) Thanks for reminding me of the controversy. I think “BlockList” is good enough, and I will change it in FLIP. 2) Your suggestion for the REST API is a good idea. Based on the above, I would change REST API as following: POST/GET /blocklist/nodes POST/GET /blocklist/taskmanagers DELETE /blocklist/node/ DELETE /blocklist/taskmanager/ 3) If a node is blocking/blocklisted, it means that all task managers on this node are blocklisted. All slots on these TMs are not available. This is actually a bit like TM losts, but these TMs are not really lost, they are in an unavailable status, and they are still registered in this flink cluster. They will be available again once the corresponding blocklist item is removed. This behavior is the same in active/non-active clusters. However in the active clusters, these TMs may be released due to idle timeouts. 4) For the item timeout, I prefer to keep it. The reasons are as following: a) The timeout will not affect users adding or removing items via REST API, and users can disable it by co
Re: [DISCUSS] FLIP-224: Blacklist Mechanism
Thanks for good initiative, Lijie and Zhu! If it's possible I'd like to participate in development. I agree with 3rd point of Konstantin's reply - we should consider to move somehow the information of blocklisted nodes/TMs from active ResourceManager to non-active ones. Probably storing inside Zookeeper/Configmap might be helpful here. And I agree with Martijn that a lot of organizations don't want to expose such API for a cluster user group. But I think it's necessary to have the mechanism for unblocking the nodes/TMs anyway for avoiding incorrect automatic behaviour. And another one small suggestion - I think it would be better to extend the *BlocklistedItem* class with the *endTimestamp* field and fill it at the item creation. This simple addition will allow to: - Provide the ability to users to setup the exact time of blocklist end through RestAPI - Not being tied to a single value of *cluster.resource-blacklist.item.timeout* On Mon, 2 May 2022 at 14:17, Chesnay Schepler wrote: > I do share the concern between blurring the lines a bit. > > That said, I'd prefer to not have any auto-detection and only have an > opt-in mechanism > to manually block processes/nodes. To me this sounds yet again like one > of those > magical mechanisms that will rarely work just right. > An external system can leverage way more information after all. > > Moreover, I'm quite concerned about the complexity of this proposal. > Tracking on both the RM/JM side; syncing between components; adjustments > to the > slot and resource protocol. > > In a way it seems overly complicated. > > If we look at it purely from an active resource management perspective, > then there > isn't really a need to touch the slot protocol at all (or in fact to > anything in the JobMaster), > because there isn't any point in keeping around blocked TMs in the first > place. > They'd just be idling, potentially shutting down after a while by the RM > because of > it (unless we _also_ touch that logic). > Here the blocking of a process (be it by blocking the process or node) is > equivalent with shutting down the blocked process(es). > Once the block is lifted we can just spin it back up. > > And I do wonder whether we couldn't apply the same line of thinking to > standalone resource management. > Here being able to stop/restart a process/node manually should be a core > requirement for a Flink deployment anyway. > > > On 02/05/2022 08:49, Martijn Visser wrote: > > Hi everyone, > > > > Thanks for creating this FLIP. I can understand the problem and I see > value > > in the automatic detection and blocklisting. I do have some concerns with > > the ability to manually specify to be blocked resources. I have two > > concerns; > > > > * Most organizations explicitly have a separation of concerns, meaning > that > > there's a group who's responsible for managing a cluster and there's a > user > > group who uses that cluster. With the introduction of this mechanism, the > > latter group now can influence the responsibility of the first group. So > it > > can be possible that someone from the user group blocks something, which > > causes an outage (which could result in paging mechanism triggering etc) > > which impacts the first group. > > * How big is the group of people who can go through the process of > manually > > identifying a node that isn't behaving as it should be? I do think this > > group is relatively limited. Does it then make sense to introduce such a > > feature, which would only be used by a really small user group of Flink? > We > > still have to maintain, test and support such a feature. > > > > I'm +1 for the autodetection features, but I'm leaning towards not > exposing > > this to the user group but having this available strictly for cluster > > operators. They could then also set up their paging/metrics/logging > system > > to take this into account. > > > > Best regards, > > > > Martijn Visser > > https://twitter.com/MartijnVisser82 > > https://github.com/MartijnVisser > > > > > > On Fri, 29 Apr 2022 at 09:39, Yangze Guo wrote: > > > >> Thanks for driving this, Zhu and Lijie. > >> > >> +1 for the overall proposal. Just share some cents here: > >> > >> - Why do we need to expose > >> cluster.resource-blacklist.item.timeout-check-interval to the user? > >> I think the semantics of `cluster.resource-blacklist.item.timeout` is > >> sufficient for the user. How to guarantee the timeout mechanism is > >> Flink's internal implementation. I think it will be very confusing and > >> we do not need to expose it to users. > >> > >> - ResourceManager can notify the exception of a task manager to > >> `BlacklistHandler` as well. > >> For example, the slot allocation might fail in case the target task > >> manager is busy or has a network jitter. I don't mean we need to cover > >> this case in this version, but we can also open a `notifyException` in > >> `ResourceManagerBlacklistHandler`. > >> > >> - Before we sync the blo
Re: [DISCUSS] FLIP-224: Blacklist Mechanism
Thanks for the proposal, Lijie. This is an interesting feature and discussion, and somewhat related to the design principle about how people should operate Flink. I think there are three things involved in this FLIP. a) Detect and report the unstable node. b) Collect the information of the unstable node and form a blocklist. c) Take the action to block nodes. My two cents: 1. It looks like people all agree that Flink should have c). It is not only useful for cases of node failures, but also handy for some planned maintenance. 2. People have different opinions on b), i.e. who should be the brain to make the decision to block a node. I think this largely depends on who we talk to. Different users would probably give different answers. For people who do have a centralized node health management service, let Flink do just do a) and c) would be preferred. So essentially Flink would be one of the sources that may detect unstable nodes, report it to that service, and then take the command from that service to block the problematic nodes. On the other hand, for users who do not have such a service, simply letting Flink be clever by itself to block the suspicious nodes might be desired to ensure the jobs are running smoothly. So that indicates a) and b) here should be pluggable / optional. In light of this, maybe it would make sense to have something pluggable like a UnstableNodeReporter which exposes unstable nodes actively. (A more general interface should be JobInfoReporter which can be used to report any information of type . But I'll just keep the scope relevant to this FLIP here). Personally speaking, I think it is OK to have a default implementation of a reporter which just tells Flink to take action to block problematic nodes and also unblocks them after timeout. Thanks, Jiangjie (Becket) Qin On Mon, May 2, 2022 at 3:27 PM Роман Бойко wrote: > Thanks for good initiative, Lijie and Zhu! > > If it's possible I'd like to participate in development. > > I agree with 3rd point of Konstantin's reply - we should consider to move > somehow the information of blocklisted nodes/TMs from active > ResourceManager to non-active ones. Probably storing inside > Zookeeper/Configmap might be helpful here. > > And I agree with Martijn that a lot of organizations don't want to expose > such API for a cluster user group. But I think it's necessary to have the > mechanism for unblocking the nodes/TMs anyway for avoiding incorrect > automatic behaviour. > > And another one small suggestion - I think it would be better to extend the > *BlocklistedItem* class with the *endTimestamp* field and fill it at the > item creation. This simple addition will allow to: > >- > >Provide the ability to users to setup the exact time of blocklist end >through RestAPI >- > >Not being tied to a single value of >*cluster.resource-blacklist.item.timeout* > > > On Mon, 2 May 2022 at 14:17, Chesnay Schepler wrote: > > > I do share the concern between blurring the lines a bit. > > > > That said, I'd prefer to not have any auto-detection and only have an > > opt-in mechanism > > to manually block processes/nodes. To me this sounds yet again like one > > of those > > magical mechanisms that will rarely work just right. > > An external system can leverage way more information after all. > > > > Moreover, I'm quite concerned about the complexity of this proposal. > > Tracking on both the RM/JM side; syncing between components; adjustments > > to the > > slot and resource protocol. > > > > In a way it seems overly complicated. > > > > If we look at it purely from an active resource management perspective, > > then there > > isn't really a need to touch the slot protocol at all (or in fact to > > anything in the JobMaster), > > because there isn't any point in keeping around blocked TMs in the first > > place. > > They'd just be idling, potentially shutting down after a while by the RM > > because of > > it (unless we _also_ touch that logic). > > Here the blocking of a process (be it by blocking the process or node) is > > equivalent with shutting down the blocked process(es). > > Once the block is lifted we can just spin it back up. > > > > And I do wonder whether we couldn't apply the same line of thinking to > > standalone resource management. > > Here being able to stop/restart a process/node manually should be a core > > requirement for a Flink deployment anyway. > > > > > > On 02/05/2022 08:49, Martijn Visser wrote: > > > Hi everyone, > > > > > > Thanks for creating this FLIP. I can understand the problem and I see > > value > > > in the automatic detection and blocklisting. I do have some concerns > with > > > the ability to manually specify to be blocked resources. I have two > > > concerns; > > > > > > * Most organizations explicitly have a separation of concerns, meaning > > that > > > there's a group who's responsible for managing a cluster and there's a > > user > > > group who uses that clust
[jira] [Created] (FLINK-27467) Remove CliFrontendTestBase
Chesnay Schepler created FLINK-27467: Summary: Remove CliFrontendTestBase Key: FLINK-27467 URL: https://issues.apache.org/jira/browse/FLINK-27467 Project: Flink Issue Type: Technical Debt Components: Command Line Client, Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 The class isn't really providing any value. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27468) Observing JobManager deployment. Previous status: MISSING
Matyas Orhidi created FLINK-27468: - Summary: Observing JobManager deployment. Previous status: MISSING Key: FLINK-27468 URL: https://issues.apache.org/jira/browse/FLINK-27468 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-0.1.0 Reporter: Matyas Orhidi The operator keeps looping if the K8s deployment gets deleted ( and probably when the job is in terminal Flink state such as FAILED). We need to agree on how to handle such cases and fix it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27469) Remove CliFrontendRunWithYarnTest
Chesnay Schepler created FLINK-27469: Summary: Remove CliFrontendRunWithYarnTest Key: FLINK-27469 URL: https://issues.apache.org/jira/browse/FLINK-27469 Project: Flink Issue Type: Technical Debt Components: Deployment / YARN, Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 This test doesn't actually test any behavior of the yarn cli. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27470) [JUnit5 Migration] Module: flink-statebackend-heap-spillable
Chesnay Schepler created FLINK-27470: Summary: [JUnit5 Migration] Module: flink-statebackend-heap-spillable Key: FLINK-27470 URL: https://issues.apache.org/jira/browse/FLINK-27470 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends, Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27471) Add ARRAY_DISTINCT supported in SQL & Table API
Sergey Nuyanzin created FLINK-27471: --- Summary: Add ARRAY_DISTINCT supported in SQL & Table API Key: FLINK-27471 URL: https://issues.apache.org/jira/browse/FLINK-27471 Project: Flink Issue Type: Sub-task Reporter: Sergey Nuyanzin Removes duplicate values from the array. Syntax: array_distinct(array) Arguments: array: An ARRAY to be handled. Returns: An ARRAY. If value is NULL, the result is NULL. Keeps order of elements. Examples: {code:sql} SELECT array_distinct(ARRAY[1, 2, 3, 2, 1]); -- [1, 2, 3] SELECT array_distinct(ARRAY[1, NULL, 1]); -- [1, NULL] {code} See also https://spark.apache.org/docs/latest/api/sql/index.html#array_distinct -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27472) Setup flink-connector-redis repository
Martijn Visser created FLINK-27472: -- Summary: Setup flink-connector-redis repository Key: FLINK-27472 URL: https://issues.apache.org/jira/browse/FLINK-27472 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Martijn Visser Assignee: Martijn Visser -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-connector-redis] MartijnVisser merged pull request #1: [FLINK-27472][Connector][Redis] Setup Redis connector repository
MartijnVisser merged PR #1: URL: https://github.com/apache/flink-connector-redis/pull/1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-27473) Capture time that a job spends on initializing tasks
Jun Qin created FLINK-27473: --- Summary: Capture time that a job spends on initializing tasks Key: FLINK-27473 URL: https://issues.apache.org/jira/browse/FLINK-27473 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Runtime / Metrics Reporter: Jun Qin Similar to https://issues.apache.org/jira/browse/FLINK-25888, we should have it also for initializing tasks. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27474) Use Hugo build information to derive current branch for externally hosted documentation
Martijn Visser created FLINK-27474: -- Summary: Use Hugo build information to derive current branch for externally hosted documentation Key: FLINK-27474 URL: https://issues.apache.org/jira/browse/FLINK-27474 Project: Flink Issue Type: Technical Debt Reporter: Martijn Visser Follow-up ticket from https://github.com/apache/flink/pull/19571#discussion_r862819931 to use {{docs/config.toml#branch}} to determine what's the current branch. This information is needed to pull the correct externally hosted documentation (from connectors) in for building all the documentation -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27475) Clean-up unused Maven settings from flink-connector-elasticsearch
Martijn Visser created FLINK-27475: -- Summary: Clean-up unused Maven settings from flink-connector-elasticsearch Key: FLINK-27475 URL: https://issues.apache.org/jira/browse/FLINK-27475 Project: Flink Issue Type: Technical Debt Reporter: Martijn Visser Assignee: Martijn Visser https://github.com/apache/flink-connector-elasticsearch currently has quite some not used Maven configuration in its POM file. We should clean this up. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27476) Build new import option that only focus on maven main classes
Jing Ge created FLINK-27476: --- Summary: Build new import option that only focus on maven main classes Key: FLINK-27476 URL: https://issues.apache.org/jira/browse/FLINK-27476 Project: Flink Issue Type: Improvement Reporter: Jing Ge ImportOption.DoNotIncludeTests.class used currently has some issue when running test with testContainer. It would be good to define the target class path precisely. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27477) Drop flink-yarn test-jar
Chesnay Schepler created FLINK-27477: Summary: Drop flink-yarn test-jar Key: FLINK-27477 URL: https://issues.apache.org/jira/browse/FLINK-27477 Project: Flink Issue Type: Technical Debt Components: Build System, Deployment / YARN, Tests Reporter: Chesnay Schepler Fix For: 1.16.0 We could do just fine without this test-jar. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-227: Support overdraft buffer
Hi, +1 for the general proposal from my side. It would be a nice workaround flatMaps, WindowOperators and large records issues with unaligned checkpoints. > The first task is about ignoring max buffers per channel. This means if > we request a memory segment from LocalBufferPool and the > maxBuffersPerChannel is reached for this channel, we just ignore that > and continue to allocate buffer while LocalBufferPool has it(it is > actually not a overdraft). Do you mean to ignore it while processing records, but keep using `maxBuffersPerChannel` when calculating the availability of the output? > The second task is about the real overdraft. I am pretty convinced now > that we, unfortunately, need configuration for limitation of overdraft > number(because it is not ok if one subtask allocates all buffers of one > TaskManager considering that several different jobs can be submitted on > this TaskManager). So idea is to have > maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool). > In this case, when a limit of buffers in LocalBufferPool is reached, > LocalBufferPool can request additionally from NetworkBufferPool up to > maxOverdraftBuffersPerPartition buffers. +1 for just having this as a separate configuration. Is it a big problem that legacy sources would be ignoring it? Note that we already have effectively hardcoded a single overdraft buffer. `LocalBufferPool#checkAvailability` checks if there is a single buffer available and this works the same for all tasks (including legacy source tasks). Would it be a big issue if we changed it to check if at least "overdraft number of buffers are available", where "overdraft number" is configurable, instead of the currently hardcoded value of "1"? Best, Piotrek pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> napisał(a): > Let me add some information about the LegacySource. > > If we want to disable the overdraft buffer for LegacySource. > Could we add the enableOverdraft in LocalBufferPool? > The default value is false. If the getAvailableFuture is called, > change enableOverdraft=true. It indicates whether there are > checks isAvailable elsewhere. > > I don't think it is elegant, but it's safe. Please correct me if I'm wrong. > > Thanks > fanrui > > On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> wrote: > > > Hi, > > > > Thanks for your quick response. > > > > For question 1/2/3, we think they are clear. We just need to discuss the > > default value in PR. > > > > For the legacy source, you are right. It's difficult for general > > implementation. > > Currently, we implement ensureRecordWriterIsAvailable() in > > SourceFunction.SourceContext. And call it in our common LegacySource, > > e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume kafka, so > > fixing FlinkKafkaConsumer solved most of our problems. > > > > Core code: > > ``` > > public void ensureRecordWriterIsAvailable() { > > if (recordWriter == null > > || > > !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, > > false) > > || recordWriter.isAvailable()) { > > return; > > } > > > > CompletableFuture resumeFuture = > recordWriter.getAvailableFuture(); > > try { > > resumeFuture.get(); > > } catch (Throwable ignored) { > > } > > } > > ``` > > > > LegacySource calls sourceContext.ensureRecordWriterIsAvailable() > > before synchronized (checkpointLock) and collects records. > > Please let me know if there is a better solution. > > > > Thanks > > fanrui > > > > On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov > > wrote: > > > >> Hi. > >> > >> -- 1. Do you mean split this into two JIRAs or two PRs or two commits > in a > >> PR? > >> > >> Perhaps, the separated ticket will be better since this task has fewer > >> questions but we should find a solution for LegacySource first. > >> > >> -- 2. For the first task, if the flink user disables the Unaligned > >> Checkpoint, do we ignore max buffers per channel? Because the > >> overdraft > >> isn't useful for the Aligned Checkpoint, it still needs to wait for > >> downstream Task to consume. > >> > >> I think that the logic should be the same for AC and UC. As I > understand, > >> the overdraft maybe is not really helpful for AC but it doesn't make it > >> worse as well. > >> > >> 3. For the second task > >> -- - The default value of maxOverdraftBuffersPerPartition may also > >> need > >>to be discussed. > >> > >> I think it should be a pretty small value or even 0 since it kind of > >> optimization and user should understand what they do(especially if we > >> implement the first task). > >> > >> -- - If the user disables the Unaligned Checkpoint, can we set the > >>maxOverdraftBuffersPerPartition=0? Because the overdraft isn't > >> useful for > >>the Aligned Checkpoint. > >> > >> The same answer that above, if the overdraft doesn't make degradation > for > >> the Align
Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4
Hi, Thank you for working on the 1.15.0 release. Is there an update wrt finalizing the release? Thanks, Thomas On Mon, Apr 25, 2022 at 9:17 PM Yun Gao wrote: > I'm happy to announce that we have unanimously approved this release. > > There are 6 explicit approving votes, 3 of which are binding: > > * Dawid Wysakowicz (binding) > * Xingbo Huang > * Matthias Pohl > * Yang Wang > * Zhu Zhu (binding) > * Guowei Ma (binding) > > There are no disapproving votes. > > Thanks everyone! > > Best, > Yun Gao
Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4
Hi Thomas, Sorry for the long delay, currently we are very nearly done for finalizing the release and we should be able to finish all works and announce the release on Thursday. Best, Yun Gao -- From:Thomas Weise Send Time:2022 May 3 (Tue.) 09:22 To:dev ; Yun Gao Subject:Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4 Hi, Thank you for working on the 1.15.0 release. Is there an update wrt finalizing the release? Thanks, Thomas On Mon, Apr 25, 2022 at 9:17 PM Yun Gao wrote: > I'm happy to announce that we have unanimously approved this release. > > There are 6 explicit approving votes, 3 of which are binding: > > * Dawid Wysakowicz (binding) > * Xingbo Huang > * Matthias Pohl > * Yang Wang > * Zhu Zhu (binding) > * Guowei Ma (binding) > > There are no disapproving votes. > > Thanks everyone! > > Best, > Yun Gao
Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4
On Tue, May 03, 2022 at 10:23:33AM +0800, Yun Gao wrote: > Hi Thomas, > > Sorry for the long delay, currently we are very nearly done > for finalizing the release and we should be able to finish all > works and announce the release on Thursday. > > Best, > Yun Gao > > > > > -- > From:Thomas Weise > Send Time:2022 May 3 (Tue.) 09:22 > To:dev ; Yun Gao > Subject:Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4 > > Hi, > > Thank you for working on the 1.15.0 release. Is there an update wrt > finalizing the release? > > Thanks, > Thomas I am confused. There is already a 1.15.0 tag in GitHub in [0], is this not the 1.15.0 release? [0] https://github.com/apache/flink/releases/tag/release-1.15.0 -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4
Hi ChangZhuo, Yes, it is indeed the tag for 1.15.0, we have finished affairs like uploading artifacts, uploading images and creating tags. Best, Yun Gao -- From:ChangZhuo Chen (陳昌倬) Send Time:2022 May 3 (Tue.) 10:34 To:dev ; Yun Gao Subject:Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4 On Tue, May 03, 2022 at 10:23:33AM +0800, Yun Gao wrote: > Hi Thomas, > > Sorry for the long delay, currently we are very nearly done > for finalizing the release and we should be able to finish all > works and announce the release on Thursday. > > Best, > Yun Gao > > > > > -- > From:Thomas Weise > Send Time:2022 May 3 (Tue.) 09:22 > To:dev ; Yun Gao > Subject:Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4 > > Hi, > > Thank you for working on the 1.15.0 release. Is there an update wrt > finalizing the release? > > Thanks, > Thomas I am confused. There is already a 1.15.0 tag in GitHub in [0], is this not the 1.15.0 release? [0] https://github.com/apache/flink/releases/tag/release-1.15.0 -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B
[jira] [Created] (FLINK-27478) failed job by uncertain cause
teeguo created FLINK-27478: -- Summary: failed job by uncertain cause Key: FLINK-27478 URL: https://issues.apache.org/jira/browse/FLINK-27478 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.4 Reporter: teeguo For the following job: {code:java} // from pyflink.common.serialization import JsonRowDeserializationSchemafrom pyflink.common.typeinfo import Typesfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.datastream.connectors import FlinkKafkaConsumer def state_access_demo(): env = StreamExecutionEnvironment.get_execution_environment() deserialization_schema = JsonRowDeserializationSchema.builder().type_info(type_info=Types.ROW_NAMED(["r0", "r1", "r2"], [Types.STRING(), Types.STRING(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer(topics='topic',deserialization_schema=deserialization_schema,properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test-consumer-group'}) ds = env.add_source(kafka_consumer) ds.print() env.execute('state_access_demo') if __name__ == '__main__': state_access_demo(){code} It failed with the following exception which doesn't contain any useful information {code:java} py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) at akka.dispatch.OnComplete.internal(Future.scala:300) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
Re: [DISCUSS] FLIP-227: Support overdraft buffer
Hi Piotrek > Do you mean to ignore it while processing records, but keep using > `maxBuffersPerChannel` when calculating the availability of the output? I think yes, and please Anton Kalashnikov to help double check. > +1 for just having this as a separate configuration. Is it a big problem > that legacy sources would be ignoring it? Note that we already have > effectively hardcoded a single overdraft buffer. > `LocalBufferPool#checkAvailability` checks if there is a single buffer > available and this works the same for all tasks (including legacy source > tasks). Would it be a big issue if we changed it to check if at least > "overdraft number of buffers are available", where "overdraft number" is > configurable, instead of the currently hardcoded value of "1"? Do you mean don't add the extra buffers? We just use (exclusive buffers * parallelism + floating buffers)? The LocalBufferPool will be available when (usedBuffers+overdraftBuffers <= exclusiveBuffers*parallelism+floatingBuffers) and all subpartitions don't reach the maxBuffersPerChannel, right? If yes, I think it can solve the problem of legacy source. There may be some impact. If overdraftBuffers is large and only one buffer is used to process a single record, exclusive buffers*parallelism + floating buffers cannot be used. It may only be possible to use (exclusive buffers * parallelism + floating buffers - overdraft buffers + 1). For throughput, if turn up the overdraft buffers, the flink user needs to turn up exclusive or floating buffers. And it also affects the InputChannel. If not, I don't think it can solve the problem of legacy source. The legacy source don't check isAvailable, If there are the extra buffers, legacy source will use them up until block in requestMemory. Thanks fanrui On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski wrote: > Hi, > > +1 for the general proposal from my side. It would be a nice workaround > flatMaps, WindowOperators and large records issues with unaligned > checkpoints. > > > The first task is about ignoring max buffers per channel. This means if > > we request a memory segment from LocalBufferPool and the > > maxBuffersPerChannel is reached for this channel, we just ignore that > > and continue to allocate buffer while LocalBufferPool has it(it is > > actually not a overdraft). > > Do you mean to ignore it while processing records, but keep using > `maxBuffersPerChannel` when calculating the availability of the output? > > > The second task is about the real overdraft. I am pretty convinced now > > that we, unfortunately, need configuration for limitation of overdraft > > number(because it is not ok if one subtask allocates all buffers of one > > TaskManager considering that several different jobs can be submitted on > > this TaskManager). So idea is to have > > maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool). > > In this case, when a limit of buffers in LocalBufferPool is reached, > > LocalBufferPool can request additionally from NetworkBufferPool up to > > maxOverdraftBuffersPerPartition buffers. > > +1 for just having this as a separate configuration. Is it a big problem > that legacy sources would be ignoring it? Note that we already have > effectively hardcoded a single overdraft buffer. > `LocalBufferPool#checkAvailability` checks if there is a single buffer > available and this works the same for all tasks (including legacy source > tasks). Would it be a big issue if we changed it to check if at least > "overdraft number of buffers are available", where "overdraft number" is > configurable, instead of the currently hardcoded value of "1"? > > Best, > Piotrek > > pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> napisał(a): > > > Let me add some information about the LegacySource. > > > > If we want to disable the overdraft buffer for LegacySource. > > Could we add the enableOverdraft in LocalBufferPool? > > The default value is false. If the getAvailableFuture is called, > > change enableOverdraft=true. It indicates whether there are > > checks isAvailable elsewhere. > > > > I don't think it is elegant, but it's safe. Please correct me if I'm > wrong. > > > > Thanks > > fanrui > > > > On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> wrote: > > > > > Hi, > > > > > > Thanks for your quick response. > > > > > > For question 1/2/3, we think they are clear. We just need to discuss > the > > > default value in PR. > > > > > > For the legacy source, you are right. It's difficult for general > > > implementation. > > > Currently, we implement ensureRecordWriterIsAvailable() in > > > SourceFunction.SourceContext. And call it in our common LegacySource, > > > e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume kafka, so > > > fixing FlinkKafkaConsumer solved most of our problems. > > > > > > Core code: > > > ``` > > > public void ensureRecordWriterIsAvailable() { > > > if (recordWriter == null > > > || > > > > !configuration.getBoolean(Ex