Re: [DISCUSS] FLIP-224: Blacklist Mechanism

2022-05-02 Thread Chesnay Schepler

I do share the concern between blurring the lines a bit.

That said, I'd prefer to not have any auto-detection and only have an 
opt-in mechanism
to manually block processes/nodes. To me this sounds yet again like one 
of those

magical mechanisms that will rarely work just right.
An external system can leverage way more information after all.

Moreover, I'm quite concerned about the complexity of this proposal.
Tracking on both the RM/JM side; syncing between components; adjustments 
to the

slot and resource protocol.

In a way it seems overly complicated.

If we look at it purely from an active resource management perspective, 
then there
isn't really a need to touch the slot protocol at all (or in fact to 
anything in the JobMaster),
because there isn't any point in keeping around blocked TMs in the first 
place.
They'd just be idling, potentially shutting down after a while by the RM 
because of

it (unless we _also_ touch that logic).
Here the blocking of a process (be it by blocking the process or node) is
equivalent with shutting down the blocked process(es).
Once the block is lifted we can just spin it back up.

And I do wonder whether we couldn't apply the same line of thinking to 
standalone resource management.
Here being able to stop/restart a process/node manually should be a core 
requirement for a Flink deployment anyway.



On 02/05/2022 08:49, Martijn Visser wrote:

Hi everyone,

Thanks for creating this FLIP. I can understand the problem and I see value
in the automatic detection and blocklisting. I do have some concerns with
the ability to manually specify to be blocked resources. I have two
concerns;

* Most organizations explicitly have a separation of concerns, meaning that
there's a group who's responsible for managing a cluster and there's a user
group who uses that cluster. With the introduction of this mechanism, the
latter group now can influence the responsibility of the first group. So it
can be possible that someone from the user group blocks something, which
causes an outage (which could result in paging mechanism triggering etc)
which impacts the first group.
* How big is the group of people who can go through the process of manually
identifying a node that isn't behaving as it should be? I do think this
group is relatively limited. Does it then make sense to introduce such a
feature, which would only be used by a really small user group of Flink? We
still have to maintain, test and support such a feature.

I'm +1 for the autodetection features, but I'm leaning towards not exposing
this to the user group but having this available strictly for cluster
operators. They could then also set up their paging/metrics/logging system
to take this into account.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 29 Apr 2022 at 09:39, Yangze Guo  wrote:


Thanks for driving this, Zhu and Lijie.

+1 for the overall proposal. Just share some cents here:

- Why do we need to expose
cluster.resource-blacklist.item.timeout-check-interval to the user?
I think the semantics of `cluster.resource-blacklist.item.timeout` is
sufficient for the user. How to guarantee the timeout mechanism is
Flink's internal implementation. I think it will be very confusing and
we do not need to expose it to users.

- ResourceManager can notify the exception of a task manager to
`BlacklistHandler` as well.
For example, the slot allocation might fail in case the target task
manager is busy or has a network jitter. I don't mean we need to cover
this case in this version, but we can also open a `notifyException` in
`ResourceManagerBlacklistHandler`.

- Before we sync the blocklist to ResourceManager, will the slot of a
blocked task manager continues to be released and allocated?

Best,
Yangze Guo

On Thu, Apr 28, 2022 at 3:11 PM Lijie Wang 
wrote:

Hi Konstantin,

Thanks for your feedback. I will response your 4 remarks:


1) Thanks for reminding me of the controversy. I think “BlockList” is

good

enough, and I will change it in FLIP.


2) Your suggestion for the REST API is a good idea. Based on the above, I
would change REST API as following:

POST/GET /blocklist/nodes

POST/GET /blocklist/taskmanagers

DELETE /blocklist/node/

DELETE /blocklist/taskmanager/


3) If a node is blocking/blocklisted, it means that all task managers on
this node are blocklisted. All slots on these TMs are not available. This
is actually a bit like TM losts, but these TMs are not really lost, they
are in an unavailable status, and they are still registered in this flink
cluster. They will be available again once the corresponding blocklist

item

is removed. This behavior is the same in active/non-active clusters.
However in the active clusters, these TMs may be released due to idle
timeouts.


4) For the item timeout, I prefer to keep it. The reasons are as

following:

a) The timeout will not affect users adding or removing items via REST

API,

and users can disable it by co

Re: [DISCUSS] FLIP-224: Blacklist Mechanism

2022-05-02 Thread Роман Бойко
Thanks for good initiative, Lijie and Zhu!

If it's possible I'd like to participate in development.

I agree with 3rd point of Konstantin's reply - we should consider to move
somehow the information of blocklisted nodes/TMs from active
ResourceManager to non-active ones. Probably storing inside
Zookeeper/Configmap might be helpful here.

And I agree with Martijn that a lot of organizations don't want to expose
such API for a cluster user group. But I think it's necessary to have the
mechanism for unblocking the nodes/TMs anyway for avoiding incorrect
automatic behaviour.

And another one small suggestion - I think it would be better to extend the
*BlocklistedItem* class with the *endTimestamp* field and fill it at the
item creation. This simple addition will allow to:

   -

   Provide the ability to users to setup the exact time of blocklist end
   through RestAPI
   -

   Not being tied to a single value of
   *cluster.resource-blacklist.item.timeout*


On Mon, 2 May 2022 at 14:17, Chesnay Schepler  wrote:

> I do share the concern between blurring the lines a bit.
>
> That said, I'd prefer to not have any auto-detection and only have an
> opt-in mechanism
> to manually block processes/nodes. To me this sounds yet again like one
> of those
> magical mechanisms that will rarely work just right.
> An external system can leverage way more information after all.
>
> Moreover, I'm quite concerned about the complexity of this proposal.
> Tracking on both the RM/JM side; syncing between components; adjustments
> to the
> slot and resource protocol.
>
> In a way it seems overly complicated.
>
> If we look at it purely from an active resource management perspective,
> then there
> isn't really a need to touch the slot protocol at all (or in fact to
> anything in the JobMaster),
> because there isn't any point in keeping around blocked TMs in the first
> place.
> They'd just be idling, potentially shutting down after a while by the RM
> because of
> it (unless we _also_ touch that logic).
> Here the blocking of a process (be it by blocking the process or node) is
> equivalent with shutting down the blocked process(es).
> Once the block is lifted we can just spin it back up.
>
> And I do wonder whether we couldn't apply the same line of thinking to
> standalone resource management.
> Here being able to stop/restart a process/node manually should be a core
> requirement for a Flink deployment anyway.
>
>
> On 02/05/2022 08:49, Martijn Visser wrote:
> > Hi everyone,
> >
> > Thanks for creating this FLIP. I can understand the problem and I see
> value
> > in the automatic detection and blocklisting. I do have some concerns with
> > the ability to manually specify to be blocked resources. I have two
> > concerns;
> >
> > * Most organizations explicitly have a separation of concerns, meaning
> that
> > there's a group who's responsible for managing a cluster and there's a
> user
> > group who uses that cluster. With the introduction of this mechanism, the
> > latter group now can influence the responsibility of the first group. So
> it
> > can be possible that someone from the user group blocks something, which
> > causes an outage (which could result in paging mechanism triggering etc)
> > which impacts the first group.
> > * How big is the group of people who can go through the process of
> manually
> > identifying a node that isn't behaving as it should be? I do think this
> > group is relatively limited. Does it then make sense to introduce such a
> > feature, which would only be used by a really small user group of Flink?
> We
> > still have to maintain, test and support such a feature.
> >
> > I'm +1 for the autodetection features, but I'm leaning towards not
> exposing
> > this to the user group but having this available strictly for cluster
> > operators. They could then also set up their paging/metrics/logging
> system
> > to take this into account.
> >
> > Best regards,
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> > https://github.com/MartijnVisser
> >
> >
> > On Fri, 29 Apr 2022 at 09:39, Yangze Guo  wrote:
> >
> >> Thanks for driving this, Zhu and Lijie.
> >>
> >> +1 for the overall proposal. Just share some cents here:
> >>
> >> - Why do we need to expose
> >> cluster.resource-blacklist.item.timeout-check-interval to the user?
> >> I think the semantics of `cluster.resource-blacklist.item.timeout` is
> >> sufficient for the user. How to guarantee the timeout mechanism is
> >> Flink's internal implementation. I think it will be very confusing and
> >> we do not need to expose it to users.
> >>
> >> - ResourceManager can notify the exception of a task manager to
> >> `BlacklistHandler` as well.
> >> For example, the slot allocation might fail in case the target task
> >> manager is busy or has a network jitter. I don't mean we need to cover
> >> this case in this version, but we can also open a `notifyException` in
> >> `ResourceManagerBlacklistHandler`.
> >>
> >> - Before we sync the blo

Re: [DISCUSS] FLIP-224: Blacklist Mechanism

2022-05-02 Thread Becket Qin
Thanks for the proposal, Lijie.

This is an interesting feature and discussion, and somewhat related to the
design principle about how people should operate Flink.

I think there are three things involved in this FLIP.
 a) Detect and report the unstable node.
 b) Collect the information of the unstable node and form a blocklist.
 c) Take the action to block nodes.

My two cents:

1. It looks like people all agree that Flink should have c). It is not only
useful for cases of node failures, but also handy for some planned
maintenance.

2. People have different opinions on b), i.e. who should be the brain to
make the decision to block a node. I think this largely depends on who we
talk to. Different users would probably give different answers. For people
who do have a centralized node health management service, let Flink do just
do a) and c) would be preferred. So essentially Flink would be one of the
sources that may detect unstable nodes, report it to that service, and then
take the command from that service to block the problematic nodes. On the
other hand, for users who do not have such a service, simply letting Flink
be clever by itself to block the suspicious nodes might be desired to
ensure the jobs are running smoothly.

So that indicates a) and b) here should be pluggable / optional.

In light of this, maybe it would make sense to have something pluggable
like a UnstableNodeReporter which exposes unstable nodes actively. (A more
general interface should be JobInfoReporter which can be used to report
any information of type . But I'll just keep the scope relevant to this
FLIP here). Personally speaking, I think it is OK to have a default
implementation of a reporter which just tells Flink to take action to block
problematic nodes and also unblocks them after timeout.

Thanks,

Jiangjie (Becket) Qin


On Mon, May 2, 2022 at 3:27 PM Роман Бойко  wrote:

> Thanks for good initiative, Lijie and Zhu!
>
> If it's possible I'd like to participate in development.
>
> I agree with 3rd point of Konstantin's reply - we should consider to move
> somehow the information of blocklisted nodes/TMs from active
> ResourceManager to non-active ones. Probably storing inside
> Zookeeper/Configmap might be helpful here.
>
> And I agree with Martijn that a lot of organizations don't want to expose
> such API for a cluster user group. But I think it's necessary to have the
> mechanism for unblocking the nodes/TMs anyway for avoiding incorrect
> automatic behaviour.
>
> And another one small suggestion - I think it would be better to extend the
> *BlocklistedItem* class with the *endTimestamp* field and fill it at the
> item creation. This simple addition will allow to:
>
>-
>
>Provide the ability to users to setup the exact time of blocklist end
>through RestAPI
>-
>
>Not being tied to a single value of
>*cluster.resource-blacklist.item.timeout*
>
>
> On Mon, 2 May 2022 at 14:17, Chesnay Schepler  wrote:
>
> > I do share the concern between blurring the lines a bit.
> >
> > That said, I'd prefer to not have any auto-detection and only have an
> > opt-in mechanism
> > to manually block processes/nodes. To me this sounds yet again like one
> > of those
> > magical mechanisms that will rarely work just right.
> > An external system can leverage way more information after all.
> >
> > Moreover, I'm quite concerned about the complexity of this proposal.
> > Tracking on both the RM/JM side; syncing between components; adjustments
> > to the
> > slot and resource protocol.
> >
> > In a way it seems overly complicated.
> >
> > If we look at it purely from an active resource management perspective,
> > then there
> > isn't really a need to touch the slot protocol at all (or in fact to
> > anything in the JobMaster),
> > because there isn't any point in keeping around blocked TMs in the first
> > place.
> > They'd just be idling, potentially shutting down after a while by the RM
> > because of
> > it (unless we _also_ touch that logic).
> > Here the blocking of a process (be it by blocking the process or node) is
> > equivalent with shutting down the blocked process(es).
> > Once the block is lifted we can just spin it back up.
> >
> > And I do wonder whether we couldn't apply the same line of thinking to
> > standalone resource management.
> > Here being able to stop/restart a process/node manually should be a core
> > requirement for a Flink deployment anyway.
> >
> >
> > On 02/05/2022 08:49, Martijn Visser wrote:
> > > Hi everyone,
> > >
> > > Thanks for creating this FLIP. I can understand the problem and I see
> > value
> > > in the automatic detection and blocklisting. I do have some concerns
> with
> > > the ability to manually specify to be blocked resources. I have two
> > > concerns;
> > >
> > > * Most organizations explicitly have a separation of concerns, meaning
> > that
> > > there's a group who's responsible for managing a cluster and there's a
> > user
> > > group who uses that clust

[jira] [Created] (FLINK-27467) Remove CliFrontendTestBase

2022-05-02 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27467:


 Summary: Remove CliFrontendTestBase
 Key: FLINK-27467
 URL: https://issues.apache.org/jira/browse/FLINK-27467
 Project: Flink
  Issue Type: Technical Debt
  Components: Command Line Client, Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


The class isn't really providing any value.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27468) Observing JobManager deployment. Previous status: MISSING

2022-05-02 Thread Matyas Orhidi (Jira)
Matyas Orhidi created FLINK-27468:
-

 Summary: Observing JobManager deployment. Previous status: MISSING
 Key: FLINK-27468
 URL: https://issues.apache.org/jira/browse/FLINK-27468
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-0.1.0
Reporter: Matyas Orhidi


The operator keeps looping if the K8s deployment gets deleted ( and probably 
when the job is in terminal Flink state such as FAILED). We need to agree on 
how to handle such cases and fix it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27469) Remove CliFrontendRunWithYarnTest

2022-05-02 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27469:


 Summary: Remove CliFrontendRunWithYarnTest
 Key: FLINK-27469
 URL: https://issues.apache.org/jira/browse/FLINK-27469
 Project: Flink
  Issue Type: Technical Debt
  Components: Deployment / YARN, Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


This test doesn't actually test any behavior of the yarn cli.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27470) [JUnit5 Migration] Module: flink-statebackend-heap-spillable

2022-05-02 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27470:


 Summary: [JUnit5 Migration] Module: 
flink-statebackend-heap-spillable
 Key: FLINK-27470
 URL: https://issues.apache.org/jira/browse/FLINK-27470
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27471) Add ARRAY_DISTINCT supported in SQL & Table API

2022-05-02 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-27471:
---

 Summary: Add ARRAY_DISTINCT supported in SQL & Table API
 Key: FLINK-27471
 URL: https://issues.apache.org/jira/browse/FLINK-27471
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergey Nuyanzin


Removes duplicate values from the array.

Syntax:
array_distinct(array) 

Arguments:
array: An ARRAY to be handled.

Returns:

An ARRAY. If value is NULL, the result is NULL. Keeps order of elements.
Examples:
{code:sql}
SELECT array_distinct(ARRAY[1, 2, 3, 2, 1]);
-- [1, 2, 3]
SELECT array_distinct(ARRAY[1, NULL, 1]);
-- [1, NULL]
{code}

See also 
https://spark.apache.org/docs/latest/api/sql/index.html#array_distinct






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27472) Setup flink-connector-redis repository

2022-05-02 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-27472:
--

 Summary: Setup flink-connector-redis repository
 Key: FLINK-27472
 URL: https://issues.apache.org/jira/browse/FLINK-27472
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Martijn Visser
Assignee: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-connector-redis] MartijnVisser merged pull request #1: [FLINK-27472][Connector][Redis] Setup Redis connector repository

2022-05-02 Thread GitBox


MartijnVisser merged PR #1:
URL: https://github.com/apache/flink-connector-redis/pull/1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-27473) Capture time that a job spends on initializing tasks

2022-05-02 Thread Jun Qin (Jira)
Jun Qin created FLINK-27473:
---

 Summary: Capture time that a job spends on initializing tasks
 Key: FLINK-27473
 URL: https://issues.apache.org/jira/browse/FLINK-27473
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Metrics
Reporter: Jun Qin


Similar to https://issues.apache.org/jira/browse/FLINK-25888, we should have it 
also for initializing tasks.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27474) Use Hugo build information to derive current branch for externally hosted documentation

2022-05-02 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-27474:
--

 Summary: Use Hugo build information to derive current branch for 
externally hosted documentation
 Key: FLINK-27474
 URL: https://issues.apache.org/jira/browse/FLINK-27474
 Project: Flink
  Issue Type: Technical Debt
Reporter: Martijn Visser


Follow-up ticket from 
https://github.com/apache/flink/pull/19571#discussion_r862819931 to use 
{{docs/config.toml#branch}} to determine what's the current branch. This 
information is needed to pull the correct externally hosted documentation (from 
connectors) in for building all the documentation



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27475) Clean-up unused Maven settings from flink-connector-elasticsearch

2022-05-02 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-27475:
--

 Summary: Clean-up unused Maven settings from 
flink-connector-elasticsearch
 Key: FLINK-27475
 URL: https://issues.apache.org/jira/browse/FLINK-27475
 Project: Flink
  Issue Type: Technical Debt
Reporter: Martijn Visser
Assignee: Martijn Visser


https://github.com/apache/flink-connector-elasticsearch currently has quite 
some not used Maven configuration in its POM file. We should clean this up. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27476) Build new import option that only focus on maven main classes

2022-05-02 Thread Jing Ge (Jira)
Jing Ge created FLINK-27476:
---

 Summary: Build new import option that only focus on  maven main 
classes
 Key: FLINK-27476
 URL: https://issues.apache.org/jira/browse/FLINK-27476
 Project: Flink
  Issue Type: Improvement
Reporter: Jing Ge


ImportOption.DoNotIncludeTests.class used currently has some issue when running 
test with testContainer. It would be good to define the target class path 
precisely.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27477) Drop flink-yarn test-jar

2022-05-02 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27477:


 Summary: Drop flink-yarn test-jar
 Key: FLINK-27477
 URL: https://issues.apache.org/jira/browse/FLINK-27477
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System, Deployment / YARN, Tests
Reporter: Chesnay Schepler
 Fix For: 1.16.0


We could do just fine without this test-jar.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-05-02 Thread Piotr Nowojski
Hi,

+1 for the general proposal from my side. It would be a nice workaround
flatMaps, WindowOperators and large records issues with unaligned
checkpoints.

> The first task is about ignoring max buffers per channel. This means if
> we request a memory segment from LocalBufferPool and the
> maxBuffersPerChannel is reached for this channel, we just ignore that
> and continue to allocate buffer while LocalBufferPool has it(it is
> actually not a overdraft).

Do you mean to ignore it while processing records, but keep using
`maxBuffersPerChannel` when calculating the availability of the output?

> The second task is about the real overdraft. I am pretty convinced now
> that we, unfortunately, need configuration for limitation of overdraft
> number(because it is not ok if one subtask allocates all buffers of one
> TaskManager considering that several different jobs can be submitted on
> this TaskManager). So idea is to have
> maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool).
> In this case, when a limit of buffers in LocalBufferPool is reached,
> LocalBufferPool can request additionally from NetworkBufferPool up to
> maxOverdraftBuffersPerPartition buffers.

+1 for just having this as a separate configuration. Is it a big problem
that legacy sources would be ignoring it? Note that we already have
effectively hardcoded a single overdraft buffer.
`LocalBufferPool#checkAvailability` checks if there is a single buffer
available and this works the same for all tasks (including legacy source
tasks). Would it be a big issue if we changed it to check if at least
"overdraft number of buffers are available", where "overdraft number" is
configurable, instead of the currently hardcoded value of "1"?

Best,
Piotrek

pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> napisał(a):

> Let me add some information about the LegacySource.
>
> If we want to disable the overdraft buffer for LegacySource.
> Could we add the enableOverdraft in LocalBufferPool?
> The default value is false. If the getAvailableFuture is called,
> change enableOverdraft=true. It indicates whether there are
> checks isAvailable elsewhere.
>
> I don't think it is elegant, but it's safe. Please correct me if I'm wrong.
>
> Thanks
> fanrui
>
> On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> wrote:
>
> > Hi,
> >
> > Thanks for your quick response.
> >
> > For question 1/2/3, we think they are clear. We just need to discuss the
> > default value in PR.
> >
> > For the legacy source, you are right. It's difficult for general
> > implementation.
> > Currently, we implement ensureRecordWriterIsAvailable() in
> > SourceFunction.SourceContext. And call it in our common LegacySource,
> > e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume kafka, so
> > fixing FlinkKafkaConsumer solved most of our problems.
> >
> > Core code:
> > ```
> > public void ensureRecordWriterIsAvailable() {
> >  if (recordWriter == null
> >   ||
> > !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
> > false)
> >   || recordWriter.isAvailable()) {
> >   return;
> >  }
> >
> >  CompletableFuture resumeFuture =
> recordWriter.getAvailableFuture();
> >  try {
> >   resumeFuture.get();
> >  } catch (Throwable ignored) {
> >  }
> > }
> > ```
> >
> > LegacySource calls sourceContext.ensureRecordWriterIsAvailable()
> > before synchronized (checkpointLock) and collects records.
> > Please let me know if there is a better solution.
> >
> > Thanks
> > fanrui
> >
> > On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov 
> > wrote:
> >
> >> Hi.
> >>
> >> -- 1. Do you mean split this into two JIRAs or two PRs or two commits
> in a
> >> PR?
> >>
> >> Perhaps, the separated ticket will be better since this task has fewer
> >> questions but we should find a solution for LegacySource first.
> >>
> >> --  2. For the first task, if the flink user disables the Unaligned
> >> Checkpoint, do we ignore max buffers per channel? Because the
> >> overdraft
> >> isn't useful for the Aligned Checkpoint, it still needs to wait for
> >> downstream Task to consume.
> >>
> >> I think that the logic should be the same for AC and UC. As I
> understand,
> >> the overdraft maybe is not really helpful for AC but it doesn't make it
> >> worse as well.
> >>
> >>   3. For the second task
> >> --  - The default value of maxOverdraftBuffersPerPartition may also
> >> need
> >>to be discussed.
> >>
> >> I think it should be a pretty small value or even 0 since it kind of
> >> optimization and user should understand what they do(especially if we
> >> implement the first task).
> >>
> >> --  - If the user disables the Unaligned Checkpoint, can we set the
> >>maxOverdraftBuffersPerPartition=0? Because the overdraft isn't
> >> useful for
> >>the Aligned Checkpoint.
> >>
> >> The same answer that above, if the overdraft doesn't make degradation
> for
> >> the Align

Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4

2022-05-02 Thread Thomas Weise
Hi,

Thank you for working on the 1.15.0 release. Is there an update wrt
finalizing the release?

Thanks,
Thomas


On Mon, Apr 25, 2022 at 9:17 PM Yun Gao 
wrote:

> I'm happy to announce that we have unanimously approved this release.
>
> There are 6 explicit approving votes, 3 of which are binding:
>
>  * Dawid Wysakowicz (binding)
> * Xingbo Huang
> * Matthias Pohl
> * Yang Wang
> * Zhu Zhu (binding)
> * Guowei Ma (binding)
>
> There are no disapproving votes.
>
> Thanks everyone!
>
> Best,
> Yun Gao


Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4

2022-05-02 Thread Yun Gao
Hi Thomas,

Sorry for the long delay, currently we are very nearly done 
for finalizing the release and we should be able to finish all
works and announce the release on Thursday.

Best,
Yun Gao




--
From:Thomas Weise 
Send Time:2022 May 3 (Tue.) 09:22
To:dev ; Yun Gao 
Subject:Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4

Hi,

Thank you for working on the 1.15.0 release. Is there an update wrt
finalizing the release?

Thanks,
Thomas


On Mon, Apr 25, 2022 at 9:17 PM Yun Gao 
wrote:

> I'm happy to announce that we have unanimously approved this release.
>
> There are 6 explicit approving votes, 3 of which are binding:
>
>  * Dawid Wysakowicz (binding)
> * Xingbo Huang
> * Matthias Pohl
> * Yang Wang
> * Zhu Zhu (binding)
> * Guowei Ma (binding)
>
> There are no disapproving votes.
>
> Thanks everyone!
>
> Best,
> Yun Gao



Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4

2022-05-02 Thread 陳昌倬
On Tue, May 03, 2022 at 10:23:33AM +0800, Yun Gao wrote:
> Hi Thomas,
> 
> Sorry for the long delay, currently we are very nearly done 
> for finalizing the release and we should be able to finish all
> works and announce the release on Thursday.
> 
> Best,
> Yun Gao
> 
> 
> 
> 
> --
> From:Thomas Weise 
> Send Time:2022 May 3 (Tue.) 09:22
> To:dev ; Yun Gao 
> Subject:Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4
> 
> Hi,
> 
> Thank you for working on the 1.15.0 release. Is there an update wrt
> finalizing the release?
> 
> Thanks,
> Thomas


I am confused. There is already a 1.15.0 tag in GitHub in [0], is this
not the 1.15.0 release?


[0] https://github.com/apache/flink/releases/tag/release-1.15.0

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4

2022-05-02 Thread Yun Gao
Hi ChangZhuo,

Yes, it is indeed the tag for 1.15.0,
we have finished affairs like uploading
artifacts, uploading images and creating
tags. 

Best,
Yun Gao



--
From:ChangZhuo Chen (陳昌倬) 
Send Time:2022 May 3 (Tue.) 10:34
To:dev ; Yun Gao 
Subject:Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4

On Tue, May 03, 2022 at 10:23:33AM +0800, Yun Gao wrote:
> Hi Thomas,
> 
> Sorry for the long delay, currently we are very nearly done 
> for finalizing the release and we should be able to finish all
> works and announce the release on Thursday.
> 
> Best,
> Yun Gao
> 
> 
> 
> 
> --
> From:Thomas Weise 
> Send Time:2022 May 3 (Tue.) 09:22
> To:dev ; Yun Gao 
> Subject:Re: [RESULT] [VOTE] Release 1.15.0, release candidate #4
> 
> Hi,
> 
> Thank you for working on the 1.15.0 release. Is there an update wrt
> finalizing the release?
> 
> Thanks,
> Thomas


I am confused. There is already a 1.15.0 tag in GitHub in [0], is this
not the 1.15.0 release?


[0] https://github.com/apache/flink/releases/tag/release-1.15.0

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


[jira] [Created] (FLINK-27478) failed job by uncertain cause

2022-05-02 Thread teeguo (Jira)
teeguo created FLINK-27478:
--

 Summary: failed job by uncertain cause 
 Key: FLINK-27478
 URL: https://issues.apache.org/jira/browse/FLINK-27478
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.4
Reporter: teeguo


For the following job:
{code:java}
//
from pyflink.common.serialization import JsonRowDeserializationSchemafrom 
pyflink.common.typeinfo import Typesfrom pyflink.datastream import 
StreamExecutionEnvironmentfrom pyflink.datastream.connectors import 
FlinkKafkaConsumer

def state_access_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    deserialization_schema = 
JsonRowDeserializationSchema.builder().type_info(type_info=Types.ROW_NAMED(["r0",
 "r1", "r2"], [Types.STRING(), Types.STRING(), Types.STRING()])).build()
     kafka_consumer = 
FlinkKafkaConsumer(topics='topic',deserialization_schema=deserialization_schema,properties={'bootstrap.servers':
 'localhost:9092', 'group.id': 'test-consumer-group'})
    ds = env.add_source(kafka_consumer)

    ds.print()
    env.execute('state_access_demo')
if __name__ == '__main__':
     state_access_demo(){code}
It failed with the following exception which doesn't contain any useful 
information
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
        at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
        at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
        at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
        at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
        at akka.dispatch.OnComplete.internal(Future.scala:300)
        at akka.dispatch.OnComplete.internal(Future.scala:297)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
        at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
        at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
        at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
        at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
        at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
 

Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-05-02 Thread rui fan
Hi Piotrek

> Do you mean to ignore it while processing records, but keep using
> `maxBuffersPerChannel` when calculating the availability of the output?

I think yes, and please Anton Kalashnikov to help double check.

> +1 for just having this as a separate configuration. Is it a big problem
> that legacy sources would be ignoring it? Note that we already have
> effectively hardcoded a single overdraft buffer.
> `LocalBufferPool#checkAvailability` checks if there is a single buffer
> available and this works the same for all tasks (including legacy source
> tasks). Would it be a big issue if we changed it to check if at least
> "overdraft number of buffers are available", where "overdraft number" is
> configurable, instead of the currently hardcoded value of "1"?

Do you mean don't add the extra buffers? We just use (exclusive buffers *
parallelism + floating buffers)? The LocalBufferPool will be available when
(usedBuffers+overdraftBuffers <=
exclusiveBuffers*parallelism+floatingBuffers)
and all subpartitions don't reach the maxBuffersPerChannel, right?

If yes, I think it can solve the problem of legacy source. There may be
some impact. If overdraftBuffers is large and only one buffer is used to
process a single record, exclusive buffers*parallelism + floating buffers
cannot be used. It may only be possible to use (exclusive buffers *
parallelism
+ floating buffers - overdraft buffers + 1). For throughput, if turn up the
overdraft buffers, the flink user needs to turn up exclusive or floating
buffers. And it also affects the InputChannel.

If not, I don't think it can solve the problem of legacy source. The legacy
source don't check isAvailable, If there are the extra buffers, legacy
source
will use them up until block in requestMemory.


Thanks
fanrui

On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski  wrote:

> Hi,
>
> +1 for the general proposal from my side. It would be a nice workaround
> flatMaps, WindowOperators and large records issues with unaligned
> checkpoints.
>
> > The first task is about ignoring max buffers per channel. This means if
> > we request a memory segment from LocalBufferPool and the
> > maxBuffersPerChannel is reached for this channel, we just ignore that
> > and continue to allocate buffer while LocalBufferPool has it(it is
> > actually not a overdraft).
>
> Do you mean to ignore it while processing records, but keep using
> `maxBuffersPerChannel` when calculating the availability of the output?
>
> > The second task is about the real overdraft. I am pretty convinced now
> > that we, unfortunately, need configuration for limitation of overdraft
> > number(because it is not ok if one subtask allocates all buffers of one
> > TaskManager considering that several different jobs can be submitted on
> > this TaskManager). So idea is to have
> > maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool).
> > In this case, when a limit of buffers in LocalBufferPool is reached,
> > LocalBufferPool can request additionally from NetworkBufferPool up to
> > maxOverdraftBuffersPerPartition buffers.
>
> +1 for just having this as a separate configuration. Is it a big problem
> that legacy sources would be ignoring it? Note that we already have
> effectively hardcoded a single overdraft buffer.
> `LocalBufferPool#checkAvailability` checks if there is a single buffer
> available and this works the same for all tasks (including legacy source
> tasks). Would it be a big issue if we changed it to check if at least
> "overdraft number of buffers are available", where "overdraft number" is
> configurable, instead of the currently hardcoded value of "1"?
>
> Best,
> Piotrek
>
> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> napisał(a):
>
> > Let me add some information about the LegacySource.
> >
> > If we want to disable the overdraft buffer for LegacySource.
> > Could we add the enableOverdraft in LocalBufferPool?
> > The default value is false. If the getAvailableFuture is called,
> > change enableOverdraft=true. It indicates whether there are
> > checks isAvailable elsewhere.
> >
> > I don't think it is elegant, but it's safe. Please correct me if I'm
> wrong.
> >
> > Thanks
> > fanrui
> >
> > On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > Thanks for your quick response.
> > >
> > > For question 1/2/3, we think they are clear. We just need to discuss
> the
> > > default value in PR.
> > >
> > > For the legacy source, you are right. It's difficult for general
> > > implementation.
> > > Currently, we implement ensureRecordWriterIsAvailable() in
> > > SourceFunction.SourceContext. And call it in our common LegacySource,
> > > e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume kafka, so
> > > fixing FlinkKafkaConsumer solved most of our problems.
> > >
> > > Core code:
> > > ```
> > > public void ensureRecordWriterIsAvailable() {
> > >  if (recordWriter == null
> > >   ||
> > >
> !configuration.getBoolean(Ex