[jira] [Created] (FLINK-27484) Reduce ArchUnit violations in the project

2022-05-04 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-27484:
---

 Summary: Reduce ArchUnit violations in the project
 Key: FLINK-27484
 URL: https://issues.apache.org/jira/browse/FLINK-27484
 Project: Flink
  Issue Type: Improvement
  Components: API / Core, Connectors / Common, Runtime / Configuration
Reporter: Fabian Paul


When ArchUnit was introduced we deliberately ignored the existing violations. 
This is the umbrella ticket to hold the efforts to reduce the exposure.

 

In the long run, this gives our users clarity about using or not using a 
certain part of the codebase.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] DockerHub repository maintainers

2022-05-04 Thread Chesnay Schepler
One advantage is that the images are periodically rebuilt to get 
security fixes.


The operator is a different story anyway because it is AFAIK only 
supposed to be used via docker
(i.e., no standalone mode), which alleviates concerns about keeping the 
logic within the image

to a minimum (which bit us in the past on the flink side).

On 03/05/2022 16:09, Yang Wang wrote:

The flink-kubernetes-operator project is only published
via apache/flink-kubernetes-operator on docker hub and github packages.
We do not find the obvious advantages by using docker hub official images.

Best,
Yang

Xintong Song  于2022年4月28日周四 19:27写道:


I agree with you that doing QA for the image after the release has been
finalized doesn't feel right. IIUR, that is mostly because official image
PR needs 1) the binary release being deployed and propagated and 2) the
corresponding git commit being specified. I'm not completely sure about
this. Maybe we can improve the process by investigating more about the
feasibility of pre-verifying an official image PR before finalizing the
release. It's definitely a good thing to do if possible.

I also agree that QA from DockerHub folks is valuable to us.

I'm not against publishing official-images, and I'm not against working
closely with the DockerHub folks to improve the process of delivering the
official image. However, I don't think these should become reasons that we
don't release our own apache/flink images.

Taking the 1.12.0 as an example, admittedly it would be nice for us to
comply with the DockerHub folks' standards and not have a
just-for-kubernetes command in our entrypoint. However, this is IMO far
less important compared to delivering the image to our users timely. I
guess that's where the DockerHub folks and us have different
priorities, and that's why I think we should have a path that is fully
controlled by this community to deliver images. We could take their
valuable inputs and improve afterwards. Actually, that's what we did for
1.12.0 by starting to release to apache/flink.

Thank you~

Xintong Song



On Thu, Apr 28, 2022 at 6:30 PM Chesnay Schepler 
wrote:


I still think that's mostly a process issue.
Of course we can be blind-sided if we do the QA for a release artifact
after the release has been finalized.
But that's a clearly broken process from the get-go.

At the very least we should already open a PR when the RC is created to
get earlier feedback.

Moreover, nowadays the docker images are way slimmer and we are much
more careful on what is actually added to the scripts.

Finally, the problems they found did show that their QA is very valuable
to us. And side-stepping that for such an essential piece of a release
isn't a good idea imo.

On 28/04/2022 11:31, Xintong Song wrote:

I'm overall against only releasing to official-images.

We started releasing to apache/flink, in addition to the

official-image,

in

1.12.0. That was because releasing the official-image needs approval

from

the DockerHub folks, which is not under control of the Flink community.

For

1.12.0 there were unfortunately some divergences between us and the
DockerHub folks, and it ended-up taking us nearly 2 months to get that
official-image PR merged [1][2]. Many users, especially those who need
Flink's K8s & Native-K8s deployment modes, were asking for the image

after

1.12.0 was announced.

One could argue that what happened for 1.12.0 is not a regular case.
However, I'd like to point out that the docker images are not something
nice-to-have, but a practically necessary piece of the release for the

k8s

/ native-k8s deployments to work. I'm strongly against a release

process

where such an important piece depends on the approval of a 3rd party.

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-20650

[2] https://github.com/docker-library/official-images/pull/9249



On Thu, Apr 28, 2022 at 2:43 PM Chesnay Schepler 

wrote:

We could just stop releasing to apache/flink and only go for the
official-images route.

On 28/04/2022 07:43, Xintong Song wrote:

Forgot to mention that, we have also proposed to use one shared

account

and

limit its access to the PMC members, like what we do with the PyPI

account.

Unfortunately, INFRA rejected this proposal [1].


Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/INFRA-23208

On Thu, Apr 28, 2022 at 1:39 PM Xintong Song 

wrote:

Hi devs,

I'd like to start a discussion about maintainers for DockerHub
repositories under the *apache* namespace [1].

Currently, the Flink community maintains various repositories

(flink,

flink-statefun, flink-statefun-playground, and

flink-kubernetes-operator)

on DockerHub under the *apache* namespace. There's a limitation on

how

many

members the *apache* namespace can add, and recently INFRA is

complaining

about Flink taking too many places [2][3]. They would like us to

reduce

our

maintainers from 20 now to 5.

Jingsong and I would like to volunteer as two of 

[jira] [Created] (FLINK-27485) Documentation build pipeline is broken

2022-05-04 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-27485:
--

 Summary: Documentation build pipeline is broken
 Key: FLINK-27485
 URL: https://issues.apache.org/jira/browse/FLINK-27485
 Project: Flink
  Issue Type: Bug
  Components: Build System, Documentation
Reporter: Martijn Visser
Assignee: Martijn Visser


The current documentation build pipeline is broken due to two failures:
- It uses git command {{git branch --show-current}} which isn't supported by 
the installed Git version on the Docker image. We can switch to {{git rev-parse 
--abbrev-ref HEAD}} as an alternative
- The manual Hugo download and installation is outdated and doesn't add Hugo to 
the PATH



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[DISCUSS] Backport scalafmt to 1.15 branch

2022-05-04 Thread Timo Walther

Hi everyone,

The 1.15 release is almost out. We should still discuss whether we want 
to backport scalafmt to the release-1.15 branch. Currently, it is quite 
cumbersome to backport fixes in the table planner.


It seems scalafmt has stabilized in master. Are you ok with backporting 
the changes now and apply the reformatting? The only downside could be 
that exception/error messages might use a slightly different line number 
for Scala code.


Any other downsides that I forgot? Or other opinions on this topic?


Thanks,

Timo


[jira] [Created] (FLINK-27486) Reduce ArchUnit violations in connector base module

2022-05-04 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-27486:
---

 Summary: Reduce ArchUnit violations in connector base module
 Key: FLINK-27486
 URL: https://issues.apache.org/jira/browse/FLINK-27486
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Affects Versions: 1.16.0
Reporter: Fabian Paul






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits

2022-05-04 Thread Piotr Nowojski
Hi Becket,

Is this feature really non-optional? If so, adding those methods with
default implementation just defeats that purpose. On the other hand, if a
Source doesn't support pausing splits, the system would work. Arguably
watermark alignment would not, and maybe that would deserve logging some
warning if the user configured the alignment but is using sources that do
not support it. On the other hand. Also adding non default methods would
break the Public interface, so we need to make this feature optional.

> When implementing the base interface, users do not need to implement a
method with default implementation.

So why should we spam users with such methods that they do not need?

> Can you articulate a bit more on which part you think makes users harder
to understand?

Imagine you are an inexperienced user, first time dealing with a system.
You want to implement a source interface. You look at it, and in two worlds
you see two different things:

1. (default methods) You see an interface with 50 methods, with various
strange to you "supportX", "supportY", and tens of other methods. You don't
understand any of this, because you are struggling with even the
basic Flink concepts. And somewhere between those there are only 5 methods
that you actually need to implement, but you are not sure which one are
those. Remember, that you have mostly no idea what you are doing.
2. (decorative interfaces) You see an interface with the 5 most important
methods. You still struggle to implement those, but at least you don't have
to wonder about tens of optional features at the beginning.

> There is another benefit of the decorative interfaces which is not
> mentioned, but might be worth considering here. Usually the decorative
> interfaces give slightly better backwards compatibility than the new
> default method in the interfaces. (...)
> I think in Flink we in general do not guarantee custom components compiled
> with an older version can run with a newer version of Flink

Hmm, good point. Don't we actually want this kind of compatibility? There
were discussions about that [1]:

> Flink jobs or ecosystems like external connectors/formats built with
older Flink version X like 1.14 can be running on a newer Flink version Y
like 1.15 with no issue.

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-207%3A+Flink+backward+and+forward+compatibility

wt., 3 maj 2022 o 14:31 Becket Qin  napisał(a):

> Hi Piotr,
>
> Thanks for the comment.
>
> Just to clarify, I am not against the decorative interfaces, but I do think
> we should use them with caution. The main argument for adding the methods
> to the SourceReader is that these methods are effectively NON-OPTIONAL to
> SourceReader impl, i.e. starting from this FLIP, all the SourceReaders impl
> are expected to support this method, although some old implementations may
> not have implemented this feature. I think we should distinguish the new
> features from the optional features. While the public decorative interface
> is a solution to the optional features. We should not use it for the
> features that are non-optional.
>
> That said, this feature is optional for SplitReaders. Arguably we can have
> a decorative interface for that, but for simplicity and symmetry of the
> interface, personally I prefer just adding a new method.
>
> Regarding the advantages you mentioned about the decorative interfaces,
> they would make sense if:
> 1. The feature is optional.
> 2. There is only one decorative interface involved for a feature. Otherwise
> the argument that all the methods are grouped together will not stand.
>
> Compared with that, I think the current solution works fine in all cases,
> i.e. "having supportXXX() method in Source, and default methods /
> decorative interfaces in base interfaces.".
>
> The advantages are:
> > - clean and easy to implement base interface
>
> In the current approach, the Java doc of the SupportXXX() method in the
> Source would be the single source of truth regarding how to implement this
> feature. It lists the method that has to be implemented to support this
> feature, regardless of how many classes / interfaces are involved.
>
> When implementing the base interface, users do not need to implement a
> method with default implementation. If they are curious what the method is
> for, the java doc of that method simply points users to the SupportXXX()
> method in the Source. It seems not adding work to the users compared with
> decorative interfaces, but gives much better discoverability.
>
> - all of the methods from a single feature are grouped in a single
> > decorator interface, together with their dedicated java doc. It's also
> > easier to google search for help using the decorator name
>
> - if an optional feature requires two methods to be implemented at once,
> > decorator can guarantee that
>
> These two points are not true when multiple components and classes are
> involved collaboratively to provide a featu

Re: [DISCUSS] FLIP-222: Support full query lifecycle statements in SQL client

2022-05-04 Thread Paul Lam
Hi Shengkai,

Thanks a lot for your input!

> I just wonder how the users can get the web ui in the application mode.
Therefore, it's better we can list the Web UI using the SHOW statement.
WDYT?

I think it's a valid approach. I'm adding it to the FLIP.

> After the investigation, I am fine with the QUERY but the keyword JOB is
also okay to me.

In addition, CockroachDB has both SHOW QUERIES [1] and SHOW JOBS [2],
while the former shows the active running queries and the latter shows the
background tasks like schema changes. FYI.

WRT the questions:

> 1. Could you add some details about the behaviour with the different
execution.target, e.g. session, application mode?

IMHO, the difference between different `execution.target` is mostly about
cluster startup, which has little relation with the proposed statements.
These statements rely on the current ClusterClient/JobClient API,
which is deployment mode agnostic. Canceling a job in an application
cluster is the same as in a session cluster.

BTW, application mode is still in the development progress ATM [3].

> 2. Considering the SQL Client/Gateway is not limited to submitting the job
to the specified cluster, is it able to list jobs in the other clusters?

I think multi-cluster support in SQL Client/Gateway should be aligned with
CLI, at least at the early phase. We may use SET  to set a cluster id for a
session, then we have access to the cluster. However,  every SHOW
statement would only involve one cluster.

Best,
Paul Lam

[1] https://www.cockroachlabs.com/docs/stable/show-statements.html
[2] https://www.cockroachlabs.com/docs/v21.2/show-jobs
[3] https://issues.apache.org/jira/browse/FLINK-26541

Shengkai Fang  于2022年4月29日周五 15:36写道:

> Hi.
>
> Thanks for Paul's update.
>
> > It's better we can also get the infos about the cluster where the job is
> > running through the DESCRIBE statement.
>
> I just wonder how the users can get the web ui in the application mode.
> Therefore, it's better we can list the Web UI using the SHOW statement.
> WDYT?
>
>
> > QUERY or other keywords.
>
> I list the statement to manage the lifecycle of the query/dml in other
> systems:
>
> Mysql[1] allows users to SHOW [FULL] PROCESSLIST and use the KILL command
> to kill the query.
>
> ```
> mysql> SHOW PROCESSLIST;
>
> mysql> KILL 27;
> ```
>
>
> Postgres use the following statements to kill the queries.
>
> ```
> SELECT pg_cancel_backend()
>
> SELECT pg_terminate_backend()
> ```
>
> KSQL uses the following commands to control the query lifecycle[4].
>
> ```
> SHOW QUERIES;
>
> TERMINATE ;
>
> ```
>
> [1] https://dev.mysql.com/doc/refman/8.0/en/show-processlist.html
> [2] https://scaledynamix.com/blog/how-to-kill-mysql-queries/
> [3]
>
> https://stackoverflow.com/questions/35319597/how-to-stop-kill-a-query-in-postgresql
> [4]
>
> https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/show-queries/
> [5]
>
> https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/terminate/
>
> After the investigation, I am fine with the QUERY but the keyword JOB is
> also okay to me.
>
> We also have two questions here.
>
> 1. Could you add some details about the behaviour with the different
> execution.target, e.g. session, application mode?
>
> 2. Considering the SQL Client/Gateway is not limited to submitting the job
> to the specified cluster, is it able to list jobs in the other clusters?
>
>
> Best,
> Shengkai
>
> Paul Lam  于2022年4月28日周四 17:17写道:
>
> > Hi Martjin,
> >
> > Thanks a lot for your reply! I agree that the scope may be a bit
> confusing,
> > please let me clarify.
> >
> > The FLIP aims to add new SQL statements that are supported only in
> > sql-client, similar to
> > jar statements [1]. Jar statements can be parsed into jar operations,
> which
> > are used only in
> > CliClient in sql-client module and cannot be executed by TableEnvironment
> > (not available in
> > Table API program that contains SQL that you mentioned).
> >
> > WRT the unchanged CLI client, I mean CliClient instead of the sql-client
> > module, which
> > currently contains the gateway codes (e.g. Executor). The FLIP mainly
> > extends
> > the gateway part, and barely touches CliClient and REST server (REST
> > endpoint in FLIP-91).
> >
> > WRT the syntax, I don't have much experience with SQL standards, and I'd
> > like to hear
> > more opinions from the community. I prefer Hive-style syntax because I
> > think many users
> > are familiar with Hive, and there're on-going efforts to improve
> Flink-Hive
> > integration [2][3].
> > But my preference is not strong, I'm okay with other options too. Do you
> > think JOB/Task is
> > a good choice, or do you have other preferred keywords?
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/jar/
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-223%3A+Support+

Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-05-04 Thread Piotr Nowojski
Hi,

Thanks for the answers.

> we may still need to discuss whether the
> overdraft/reserve/spare should use extra buffers or buffers
> in (exclusive + floating buffers)?

and

> These things resolve the different problems (at least as I see that).
> The current hardcoded "1"  says that we switch "availability" to
> "unavailability" when one more buffer is left(actually a little less
> than one buffer since we write the last piece of data to this last
> buffer). The overdraft feature doesn't change this logic we still want
> to switch to "unavailability" in such a way but if we are already in
> "unavailability" and we want more buffers then we can take "overdraft
> number" more. So we can not avoid this hardcoded "1" since we need to
> understand when we should switch to "unavailability"

Ok, I see. So it seems to me that both of you have in mind to keep the
buffer pools as they are right now, but if we are in the middle of
processing a record, we can request extra overdraft buffers on top of
those? This is another way to implement the overdraft to what I was
thinking. I was thinking about something like keeping the "overdraft" or
more precisely buffer "reserve" in the buffer pool. I think my version
would be easier to implement, because it is just fiddling with min/max
buffers calculation and slightly modified `checkAvailability()` logic.

On the other hand  what you have in mind would better utilise the available
memory, right? It would require more code changes (how would we know when
we are allowed to request the overdraft?). However, in this case, I would
be tempted to set the number of overdraft buffers by default to
`Integer.MAX_VALUE`, and let the system request as many buffers as
necessary. The only downside that I can think of (apart of higher
complexity) would be higher chance of hitting a known/unsolved deadlock [1]
in a scenario:
- downstream task hasn't yet started
- upstream task requests overdraft and uses all available memory segments
from the global pool
- upstream task is blocked, because downstream task hasn't started yet and
can not consume any data
- downstream task tries to start, but can not, as there are no available
buffers

> BTW, for watermark, the number of buffers it needs is
> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
> the watermark won't block in requestMemory.

and

> the best overdraft size will be equal to parallelism.

That's a lot of buffers. I don't think we need that many for broadcasting
watermarks. Watermarks are small, and remember that every subpartition has
some partially filled/empty WIP buffer, so the vast majority of
subpartitions will not need to request a new buffer.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-13203

wt., 3 maj 2022 o 17:15 Anton Kalashnikov  napisał(a):

> Hi,
>
>
>  >> Do you mean to ignore it while processing records, but keep using
> `maxBuffersPerChannel` when calculating the availability of the output?
>
>
> Yes, it is correct.
>
>
>  >> Would it be a big issue if we changed it to check if at least
> "overdraft number of buffers are available", where "overdraft number" is
> configurable, instead of the currently hardcoded value of "1"?
>
>
> These things resolve the different problems (at least as I see that).
> The current hardcoded "1"  says that we switch "availability" to
> "unavailability" when one more buffer is left(actually a little less
> than one buffer since we write the last piece of data to this last
> buffer). The overdraft feature doesn't change this logic we still want
> to switch to "unavailability" in such a way but if we are already in
> "unavailability" and we want more buffers then we can take "overdraft
> number" more. So we can not avoid this hardcoded "1" since we need to
> understand when we should switch to "unavailability"
>
>
> -- About "reserve" vs "overdraft"
>
> As Fanrui mentioned above, perhaps, the best overdraft size will be
> equal to parallelism. Also, the user can set any value he wants. So even
> if parallelism is small(~5) but the user's flatmap produces a lot of
> data, the user can set 10 or even more. Which almost double the max
> buffers and it will be impossible to reserve. At least we need to figure
> out how to protect from such cases (the limit for an overdraft?). So
> actually it looks even more difficult than increasing the maximum buffers.
>
> I want to emphasize that overdraft buffers are soft configuration which
> means it takes as many buffers as the global buffers pool has
> available(maybe zero) but less than this configured value. It is also
> important to notice that perhaps, not many subtasks in TaskManager will
> be using this feature so we don't actually need a lot of available
> buffers for every subtask(Here, I mean that if we have only one
> window/flatmap operator and many other operators, then one TaskManager
> will have many ordinary subtasks which don't actually need overdraft and
> several subtasks that needs this featu

Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-05-04 Thread Dawid Wysakowicz

Hey all,

I have not replied in the thread yet, but I was following the discussion.

Personally, I like Fanrui's and Anton's idea. As far as I understand it 
the idea to distinguish between inside flatMap & outside would be fairly 
simple, but maybe slightly indirect. The checkAvailability would remain 
unchanged and it is checked always between separate invocations of the 
UDF. Therefore the overdraft buffers would not apply there. However once 
the pool says it is available, it means it has at least an initial 
buffer. So any additional request without checking for availability can 
be considered to be inside of processing a single record. This does not 
hold just for the LegacySource as I don't think it actually checks for 
the availability of buffers in the LocalBufferPool.


In the offline chat with Anton, we also discussed if we need a limit of 
the number of buffers we could overdraft (or in other words if the limit 
should be equal to Integer.MAX_VALUE), but personally I'd prefer to stay 
on the safe side and have it limited. The pool of network buffers is 
shared for the entire TaskManager, so it means it can be shared even 
across tasks of separate jobs. However, I might be just unnecessarily 
cautious here.


Best,

Dawid

On 04/05/2022 10:54, Piotr Nowojski wrote:

Hi,

Thanks for the answers.


we may still need to discuss whether the
overdraft/reserve/spare should use extra buffers or buffers
in (exclusive + floating buffers)?

and


These things resolve the different problems (at least as I see that).
The current hardcoded "1"  says that we switch "availability" to
"unavailability" when one more buffer is left(actually a little less
than one buffer since we write the last piece of data to this last
buffer). The overdraft feature doesn't change this logic we still want
to switch to "unavailability" in such a way but if we are already in
"unavailability" and we want more buffers then we can take "overdraft
number" more. So we can not avoid this hardcoded "1" since we need to
understand when we should switch to "unavailability"

Ok, I see. So it seems to me that both of you have in mind to keep the
buffer pools as they are right now, but if we are in the middle of
processing a record, we can request extra overdraft buffers on top of
those? This is another way to implement the overdraft to what I was
thinking. I was thinking about something like keeping the "overdraft" or
more precisely buffer "reserve" in the buffer pool. I think my version
would be easier to implement, because it is just fiddling with min/max
buffers calculation and slightly modified `checkAvailability()` logic.

On the other hand  what you have in mind would better utilise the available
memory, right? It would require more code changes (how would we know when
we are allowed to request the overdraft?). However, in this case, I would
be tempted to set the number of overdraft buffers by default to
`Integer.MAX_VALUE`, and let the system request as many buffers as
necessary. The only downside that I can think of (apart of higher
complexity) would be higher chance of hitting a known/unsolved deadlock [1]
in a scenario:
- downstream task hasn't yet started
- upstream task requests overdraft and uses all available memory segments
from the global pool
- upstream task is blocked, because downstream task hasn't started yet and
can not consume any data
- downstream task tries to start, but can not, as there are no available
buffers


BTW, for watermark, the number of buffers it needs is
numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
the watermark won't block in requestMemory.

and


the best overdraft size will be equal to parallelism.

That's a lot of buffers. I don't think we need that many for broadcasting
watermarks. Watermarks are small, and remember that every subpartition has
some partially filled/empty WIP buffer, so the vast majority of
subpartitions will not need to request a new buffer.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-13203

wt., 3 maj 2022 o 17:15 Anton Kalashnikov  napisał(a):


Hi,


  >> Do you mean to ignore it while processing records, but keep using
`maxBuffersPerChannel` when calculating the availability of the output?


Yes, it is correct.


  >> Would it be a big issue if we changed it to check if at least
"overdraft number of buffers are available", where "overdraft number" is
configurable, instead of the currently hardcoded value of "1"?


These things resolve the different problems (at least as I see that).
The current hardcoded "1"  says that we switch "availability" to
"unavailability" when one more buffer is left(actually a little less
than one buffer since we write the last piece of data to this last
buffer). The overdraft feature doesn't change this logic we still want
to switch to "unavailability" in such a way but if we are already in
"unavailability" and we want more buffers then we can take "overdraft
number" more. So we can not avoid this hardcoded 

[jira] [Created] (FLINK-27487) KafkaMetricWrappers do incorrect cast

2022-05-04 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27487:


 Summary: KafkaMetricWrappers do incorrect cast
 Key: FLINK-27487
 URL: https://issues.apache.org/jira/browse/FLINK-27487
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Runtime / Metrics
Affects Versions: 1.15.0
Reporter: Chesnay Schepler
 Fix For: 1.16.0, 1.15.1


In FLINK-24765 the kafka metric wrappers that bridge kafka metrics into our 
metric system were migrated from the deprecated {{KafkaMetric#value}} to 
{{#metricValue}}.

This migration was done incorrectly. It was assumed that #metricValue behaves 
similar to #value in that it always returns a double, but this is not the case, 
as they may also return longs or strings.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] Backport scalafmt to 1.15 branch

2022-05-04 Thread Chesnay Schepler

+1 to backport it now; the release is already being finalized after all.

On 04/05/2022 10:13, Timo Walther wrote:

Hi everyone,

The 1.15 release is almost out. We should still discuss whether we 
want to backport scalafmt to the release-1.15 branch. Currently, it is 
quite cumbersome to backport fixes in the table planner.


It seems scalafmt has stabilized in master. Are you ok with 
backporting the changes now and apply the reformatting? The only 
downside could be that exception/error messages might use a slightly 
different line number for Scala code.


Any other downsides that I forgot? Or other opinions on this topic?


Thanks,

Timo





Re: [DISCUSS] Backport scalafmt to 1.15 branch

2022-05-04 Thread Martijn Visser
+1

On Wed, 4 May 2022 at 13:58, Chesnay Schepler  wrote:

> +1 to backport it now; the release is already being finalized after all.
>
> On 04/05/2022 10:13, Timo Walther wrote:
> > Hi everyone,
> >
> > The 1.15 release is almost out. We should still discuss whether we
> > want to backport scalafmt to the release-1.15 branch. Currently, it is
> > quite cumbersome to backport fixes in the table planner.
> >
> > It seems scalafmt has stabilized in master. Are you ok with
> > backporting the changes now and apply the reformatting? The only
> > downside could be that exception/error messages might use a slightly
> > different line number for Scala code.
> >
> > Any other downsides that I forgot? Or other opinions on this topic?
> >
> >
> > Thanks,
> >
> > Timo
> >
>
>


[VOTE] FLIP-225: Implement standalone mode support in the kubernetes operator

2022-05-04 Thread Jassat, Usamah
Hi everyone,

Thanks for the feedback for FLIP-225: Implement standalone mode support in the 
kubernetes operator [1] on the discussion thread [2]

I’d like to start a vote for it. The vote will be open for at-least 72 hours 
unless there is an objection or not enough votes.

[1] 
(https://cwiki.apache.org/confluence/display/FLINK/FLIP-225%3A+Implement+standalone+mode+support+in+the+kubernetes+operator)

[2] (https://lists.apache.org/thread/rv964g6rq5bkc8kwx36y80nwfqcgn2s4)


Re: [VOTE] FLIP-225: Implement standalone mode support in the kubernetes operator

2022-05-04 Thread Gyula Fóra
+1

Gyula

On Wed, May 4, 2022 at 2:32 PM Jassat, Usamah 
wrote:

> Hi everyone,
>
> Thanks for the feedback for FLIP-225: Implement standalone mode support in
> the kubernetes operator [1] on the discussion thread [2]
>
> I’d like to start a vote for it. The vote will be open for at-least 72
> hours unless there is an objection or not enough votes.
>
> [1] (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-225%3A+Implement+standalone+mode+support+in+the+kubernetes+operator
> )
>
> [2] (https://lists.apache.org/thread/rv964g6rq5bkc8kwx36y80nwfqcgn2s4)
>


Re: [VOTE] FLIP-225: Implement standalone mode support in the kubernetes operator

2022-05-04 Thread Danny Cranmer
+1 (binding)

Thanks,
Danny

On Wed, May 4, 2022 at 1:34 PM Gyula Fóra  wrote:

> +1
>
> Gyula
>
> On Wed, May 4, 2022 at 2:32 PM Jassat, Usamah 
> wrote:
>
> > Hi everyone,
> >
> > Thanks for the feedback for FLIP-225: Implement standalone mode support
> in
> > the kubernetes operator [1] on the discussion thread [2]
> >
> > I’d like to start a vote for it. The vote will be open for at-least 72
> > hours unless there is an objection or not enough votes.
> >
> > [1] (
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-225%3A+Implement+standalone+mode+support+in+the+kubernetes+operator
> > )
> >
> > [2] (https://lists.apache.org/thread/rv964g6rq5bkc8kwx36y80nwfqcgn2s4)
> >
>


Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits

2022-05-04 Thread Becket Qin
Thanks for the reply and patient discussion, Piotr and Dawid.

Is there any reason for pausing reading from a split an optional feature,
other than that this was not included in the original interface?

To be honest I am really worried about the complexity of the user story
here. Optional features like this have a high overhead. Imagine this
feature is optional, now a user enabled watermark alignment and defined a
few watermark groups. Would it work? Hmm, that depends on whether the
involved Source has implmemented this feature. If the Sources are well
documented, good luck. Otherwise end users may have to look into the code
of the Source to see whether the feature is supported. Which is something
they shouldn't have to do.

I think it would be way simpler and clearer to just let end users and Flink
assume all the connectors will implement this feature. After all the
watermark group is not optinoal to the end users. If in some rare cases,
the feature cannot be supported, a clear UnsupportedOperationException will
be thrown to tell users to explicitly remove this Source from the watermark
group. I don't think we should have a warning message here, as they tend to
be ignored in many cases. If we do this, we don't even need the supportXXX
method in the Source for this feature. In fact this is exactly how many
interfaces works today. For example, SplitEnumerator#addSplitsBack() is not
supported by Pravega source because it does not support partial failover.
In that case, it simply throws an exception to trigger a global recovery.

The reason we add a default implementation in this case would just for the
sake of backwards compatibility so the old source can still compile. Sure,
in short term, this feature might not be supported by many existing
sources. That is OK, and it is quite visible to the source developers that
they did not override the default impl which throws an
UnsupportedOperationException.

@Dawid,

the Java doc of the SupportXXX() method in the Source would be the single
>> source of truth regarding how to implement this feature.
>
>

I also don't find it entirely true. Half of the classes are theoretically
> optional and are utility classes from the point of view how the interfaces
> are organized. Theoretically users do not need to use any of
> SourceReaderBase & SplitReader. Would be weird to list their methods in the
> Source interface.

I think the ultimate goal of java docs is to guide users to implement the
Source. If SourceReaderBase is the preferred way to implement a
SourceReader, it seems worth mentioning that. Even the Java language
documentation interfaces lists the konwn implementations [1] so people can
leverage them. But for this particular case, if we make the feature
non-optional, we don't even need the supportXXX() method for now.

Thanks,

Jiangjie (Becket) Qin



On Wed, May 4, 2022 at 4:37 PM Dawid Wysakowicz 
wrote:

> Hey Piotr and Becket,
>
> First of all, let me say I am happy with whichever option is agreed in the
> discussion.
>
> I wanted to clarify a few points from the discussion though:
>
> @Becket:
>
> The main argument for adding the methods to the SourceReader is that these
> methods are effectively NON-OPTIONAL to SourceReader impl, i.e. starting
> from this FLIP, all the SourceReaders impl are expected to support this
> method, although some old implementations may not have implemented this
> feature. I think we should distinguish the new features from the optional
> features. While the public decorative interface is a solution to the
> optional features. We should not use it for the features that are
> non-optional.
>
> I don't think that this feature is NON-OPTIONAL. Even though preferred, I
> still think it can be simply optional.
>
> the Java doc of the SupportXXX() method in the Source would be the single
> source of truth regarding how to implement this feature.
>
> I also don't find it entirely true. Half of the classes are theoretically
> optional and are utility classes from the point of view how the interfaces
> are organized. Theoretically users do not need to use any of
> SourceReaderBase & SplitReader. Would be weird to list their methods in the
> Source interface.
>
> @Piotr
>
> If we have all of the methods with default implementation in the base
> interface, the API doesn't give any clue to the user which set of methods
> are required to be implemented at the same time.
>
> I feel that no matter which option we choose this can not be solved
> entirely in either of the options, because of the point above and because
> the signature of SplitReader#pauseOrResumeSplits and
> SourceReader#pauseOrResumeSplits are slightly different (one identifies
> splits with splitId the other one passes the splits directly).
>
> Best,
>
> Dawid
> On 03/05/2022 14:30, Becket Qin wrote:
>
> Hi Piotr,
>
> Thanks for the comment.
>
> Just to clarify, I am not against the decorative interfaces, but I do
> think we should use them with caution. The main argument for add

Re: [DISCUSS] FLIP-91: Support SQL Client Gateway

2022-05-04 Thread Martijn Visser
Hi Shengkai,

> Agreed. The FLIP mainly focus on the Gateway. I think it's better to
rename the name to the "Support SQL Gateway". WDYT?

+1

> I think it's better to intergate the Gateway into the Flink code base.
The reason behind is
>  1. The Gateway relies on the Flink implementation,  I think we'd better
to maintain it inside the Flink. It really takes us much time to upgrade
the sql-gateway in ververica repo to the latest Flink version.
> 2. The Gateway is important to the Flink itself. Many users needs the
Gateway to manage the Flink SQL jobs. Actually Hive, Spark both have its
Gateway in its code base.

I would like to understand why it's complicated to make the upgrades
problematic. Is it because of relying on internal interfaces? If so, should
we not consider making them public?

A downside I see with integrating the Gateway into the Flink codebase is
that a) it will not be possible to have separate releases of the Gateway,
they will be tied to individual Flink releases and b) if you want the
Gateway to support multiple Flink versions, I can see that becoming
complicated in Flink's release branching and support mechanism. For
example, what if you have a Gateway released with Flink in Flink 1.16 and
Flink 1.17, which both support Flink 1.10 up to their latest version. Then
you encounter a bug in the implementation for Flink 1.12: that means that
you have to create multiple fixes, in multiple branches, and then release
multiple new Flink versions. I don't think that the Gateway is a 'core'
function of Flink which should be included with Flink. There are a lot of
users who use the DataStream, Table or Python as their implementation
layer. They all don't need this extra capability (even though you could
argue that in the future it would be nice to have something similar for
Python).

> Because the Gateway itself relies on the Flink inner implementation...I
think we can just use one Gateway per versions. Users can manage the
gateway with other utils.

I've left my comment above because I was going through each argument one by
one, but this was my assumption already: we should not rely on internal
interfaces for capability we want to support as a community. We should then
make these interfaces public.

> After I read FLIP-91[1], I want to add an init-file option. Its
functionality is the same as option '-i' of Flink SQL Client.

I don't think that an init-file option should be added to the SQL Gateway.
+1 for keeping that in the client, not the Gateway.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 29 Apr 2022 at 04:26, Shengkai Fang  wrote:

> Hi Marijn and LuNing.
>
> Thanks for your feedback!
>
> > The FLIP is called "SQL Client Gateway", but isn't this a REST Gateway
> which would be used by Flink's SQL Client (or other applications)?
>
> Agreed. The FLIP mainly focus on the Gateway. I think it's better to rename
> the name to the "Support SQL Gateway". WDYT?
>
> > From a user perspective, I would have expected that we start with the
> REST endpoint before explaining how we would integrate this into Flink. Now
> it's quite hard to first understand what we want to offer to users and if
> that will be sufficient for a first version.
>
> emmm. Considering that api is basically the operation of some concepts, is
> it better to introduce the core concepts first? But I agree you are right
> that we should start with the RESt endpoint. I reorganize the content to
> introduce the REST first in the public interfaces.
>
> > With Flink 1.15, we're introducing an OpenAPI specification. Can we
> also do this straight away for the REST Gateway?
>
> Yes. We will organize the related APIs into OpenAPI specification.
>
> >Should we introduce the REST Gateway as part of Flink's main repository?
> >Wouldn't we be better off to maintain this in a separate repository under
> >ASF?
>
> I think it's better to intergate the Gateway into the Flink code base. The
> reason behind is
>
> 1. The Gateway relies on the Flink implementation,  I think we'd better to
> maintain it inside the Flink. It really takes us much time to upgrade the
> sql-gateway in ververica repo to the latest Flink version.
>
> 2. The Gateway is important to the Flink itself. Many users needs the
> Gateway to manage the Flink SQL jobs. Actually Hive, Spark both have its
> Gateway in its code base.
>
> But I think it's fine to put other utils, e.g. JDBC under the ASF.
>
> > Ideally you would like to be able to support multiple Flink versions
> > with one version of the REST Gateway I think?
>
> > Users can upgrade a large number of Flink jobs versions gradually in a
> Gateway service.
>
> Because the Gateway itself relies on the Flink inner implementation...I
> think we can just use one Gateway per versions. Users can manage the
> gateway with other utils.
>
> >There's no mention of Batch or Streaming in this concept. If I recall
> >correctly, the current Flink SQL Gateway can only support Ba

Re: [DISCUSS] FLIP-223: Support HiveServer2 Endpoint

2022-05-04 Thread Martijn Visser
Hi Shengkai,

> Actually we will only rely on the API in the Hive, which only contains
the thrift file and the generated code

So this implementation would not rely in any way on Hive, only on Thrift?

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 29 Apr 2022 at 05:16, Shengkai Fang  wrote:

> Hi, Jark and Martijn
>
> Thanks for your feedback.
>
> > Kyuubi provides three ways to configure Hive metastore [1]. Could we
> provide similar abilities?
>
> Yes. I have updated the FLIP about this and it takes some time to figure
> out how the jdbc driver works. I added the section about how to use the
> hive JDBC to configure the session-level catalog.
>
> > I think we can improve the "HiveServer2 Compatibility" section.
>
> Yes. I have updated the FLIP and added more details about the
> compatibility.
>
> >  Prefer to first complete the discussion and vote on FLIP-91 then discuss
> FLIP-223
>
> Of course. We can wait until the discussion of the FLIP-91 finishes.
>
> > Maintenance concerns about the hive
>
> Actually we will only rely on the API in the Hive, which only contains the
> thrift file and the generated code[1]. I think it will not influence us to
> upgrade the java version.
>
> [1] https://github.com/apache/hive/tree/master/service-rpc
>
> Best,
> Shengkai
>
> Martijn Visser  于2022年4月26日周二 20:44写道:
>
> > Hi all,
> >
> > I'm not too familiar with Hive and HiveServer2, but I do have a couple of
> > questions/concerns:
> >
> > 1. What is the relationship between this FLIP and FLIP-91? My assumption
> > would be that this FLIP (and therefore the HiveServer2) implementation
> > would need to be integrated in the REST Gateway, is that correct? If so,
> I
> > would prefer to first complete the discussion and vote on FLIP-91, else
> > we'll have two moving FLIPs who have a direct relationship with each
> other.
> >
> > 2. While I understand that Hive is important (in the Chinese ecosystem,
> not
> > so much in Europe and the US), I still have maintenance concerns on this
> > topic. We know that the current Hive integration isn't exactly ideal and
> > requires a lot of work to get in better shape. At the same time, Hive
> still
> > doesn't support Java 11 while we need (and should, given the premier
> > support has ended already) to move away from Java 8.
> >
> > Best regards,
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> > https://github.com/MartijnVisser
> >
> >
> > On Mon, 25 Apr 2022 at 12:13, Jark Wu  wrote:
> >
> > > Thank Shengkai for driving this effort,
> > > I think this is an essential addition to Flink Batch.
> > >
> > > I have some small suggestions:
> > > 1) Kyuubi provides three ways to configure Hive metastore [1]. Could we
> > > provide similar abilities?
> > > Especially with the JDBC Connection URL, users can visit different Hive
> > > metastore server instances.
> > >
> > > 2) I think we can improve the "HiveServer2 Compatibility" section.
> > > We need to figure out two compatibility matrices. One is SQL Gateway
> with
> > > different versions of Hive metastore,
> > > and the other is different versions of Hive client (e.g., Hive JDBC)
> with
> > > SQL Gateway. We need to clarify
> > > what metastore and client versions we support and how users configure
> the
> > > versions.
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > [1]:
> > >
> > >
> >
> https://kyuubi.apache.org/docs/r1.3.1-incubating/deployment/hive_metastore.html#activate-configurations
> > >
> > > On Sun, 24 Apr 2022 at 15:02, Shengkai Fang  wrote:
> > >
> > > > Hi, Jiang.
> > > >
> > > > Thanks for your feedback!
> > > >
> > > > > Integrating the Hive ecosystem should not require changing the
> > service
> > > > interface
> > > >
> > > > I move the API change to the FLIP-91. But I think it's possible we
> add
> > > more
> > > > interfaces to intergrate the new endpoints in the future because
> every
> > > > endpoints's functionality is different. For example, the REST
> endpoint
> > > > doen't support to fetch operation-level logs but the hiveserver2
> > endpoint
> > > > supports. In this case, we need to modify the shared GatewayService
> to
> > > > support the functionality exposed by the new endpint.
> > > >
> > > > >  How to support different Hive versions?
> > > >
> > > > Do you means to support the different HiveServer2 version? The
> > > HiveServer2
> > > > uses the version to guarantee the compatibility. During the
> > openSession,
> > > > the client and server will determine the protocol
> > version(minimun(client
> > > > version, hiveendpoint version)). After that the client and the server
> > > uses
> > > > the determined version to communicate. In the HiveServer2 endpoint,
> it
> > > > determines how the endpoint deserialize the results and the result
> > > schema.
> > > > I add a section about HiveServer2 compatiblity.
> > > >
> > > > > Could you please fully provide its definition including input
> > > parameters
> > > > and the corres

[jira] [Created] (FLINK-27488) Migrate flink-docker CI to github actions

2022-05-04 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27488:


 Summary: Migrate flink-docker CI to github actions
 Key: FLINK-27488
 URL: https://issues.apache.org/jira/browse/FLINK-27488
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System / CI, flink-docker
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS)

2022-05-04 Thread Martijn Visser
Hi everyone,

Can we identify if this proposed syntax is part of the SQL standard?

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 29 Apr 2022 at 11:19, yuxia  wrote:

> Thanks for for driving this work, it's to be a useful feature.
> About the flip-218, I have some questions.
>
> 1: Does our CTAS syntax support specify target table's schema including
> column name and data type? I think it maybe a useful fature in case we want
> to change the data types in target table instead of always copy the source
> table's schema. It'll be more flexible with this feature.
> Btw, MySQL's "CREATE TABLE ... SELECT Statement"[1] support this feature.
>
> 2: Seems it'll requre sink to implement an public interface to drop table,
> so what's the interface will look like?
>
> [1] https://dev.mysql.com/doc/refman/8.0/en/create-table-select.html
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Mang Zhang" 
> 收件人: "dev" 
> 发送时间: 星期四, 2022年 4 月 28日 下午 4:57:24
> 主题: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS)
>
> Hi, everyone
>
>
> I would like to open a discussion for support select clause in CREATE
> TABLE(CTAS),
> With the development of business and the enhancement of flink sql
> capabilities, queries become more and more complex.
> Now the user needs to use the Create Table statement to create the target
> table first, and then execute the insert statement.
> However, the target table may have many columns, which will bring a lot of
> work outside the business logic to the user.
> At the same time, ensure that the schema of the created target table is
> consistent with the schema of the query result.
> Using a CTAS syntax like Hive/Spark can greatly facilitate the user.
>
>
>
> You can find more details in FLIP-218[1]. Looking forward to your feedback.
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-218%3A+Support+SELECT+clause+in+CREATE+TABLE(CTAS)
>
>
>
>
> --
>
> Best regards,
> Mang Zhang
>


Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-05-04 Thread Martijn Visser
Hi everyone,

I don't have much to chip in, but just wanted to express that I really
appreciate the in-depth discussion on this topic and I hope that others
will join the conversation.

Best regards,

Martijn

On Tue, 3 May 2022 at 10:15, Александр Смирнов  wrote:

> Hi Qingsheng, Leonard and Jark,
>
> Thanks for your detailed feedback! However, I have questions about
> some of your statements (maybe I didn't get something?).
>
> > Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF proc_time”
>
> I agree that the semantics of "FOR SYSTEM_TIME AS OF proc_time" is not
> fully implemented with caching, but as you said, users go on it
> consciously to achieve better performance (no one proposed to enable
> caching by default, etc.). Or by users do you mean other developers of
> connectors? In this case developers explicitly specify whether their
> connector supports caching or not (in the list of supported options),
> no one makes them do that if they don't want to. So what exactly is
> the difference between implementing caching in modules
> flink-table-runtime and in flink-table-common from the considered
> point of view? How does it affect on breaking/non-breaking the
> semantics of "FOR SYSTEM_TIME AS OF proc_time"?
>
> > confront a situation that allows table options in DDL to control the
> behavior of the framework, which has never happened previously and should
> be cautious
>
> If we talk about main differences of semantics of DDL options and
> config options("table.exec.xxx"), isn't it about limiting the scope of
> the options + importance for the user business logic rather than
> specific location of corresponding logic in the framework? I mean that
> in my design, for example, putting an option with lookup cache
> strategy in configurations would  be the wrong decision, because it
> directly affects the user's business logic (not just performance
> optimization) + touches just several functions of ONE table (there can
> be multiple tables with different caches). Does it really matter for
> the user (or someone else) where the logic is located, which is
> affected by the applied option?
> Also I can remember DDL option 'sink.parallelism', which in some way
> "controls the behavior of the framework" and I don't see any problem
> here.
>
> > introduce a new interface for this all-caching scenario and the design
> would become more complex
>
> This is a subject for a separate discussion, but actually in our
> internal version we solved this problem quite easily - we reused
> InputFormat class (so there is no need for a new API). The point is
> that currently all lookup connectors use InputFormat for scanning the
> data in batch mode: HBase, JDBC and even Hive - it uses class
> PartitionReader, that is actually just a wrapper around InputFormat.
> The advantage of this solution is the ability to reload cache data in
> parallel (number of threads depends on number of InputSplits, but has
> an upper limit). As a result cache reload time significantly reduces
> (as well as time of input stream blocking). I know that usually we try
> to avoid usage of concurrency in Flink code, but maybe this one can be
> an exception. BTW I don't say that it's an ideal solution, maybe there
> are better ones.
>
> > Providing the cache in the framework might introduce compatibility issues
>
> It's possible only in cases when the developer of the connector won't
> properly refactor his code and will use new cache options incorrectly
> (i.e. explicitly provide the same options into 2 different code
> places). For correct behavior all he will need to do is to redirect
> existing options to the framework's LookupConfig (+ maybe add an alias
> for options, if there was different naming), everything will be
> transparent for users. If the developer won't do refactoring at all,
> nothing will be changed for the connector because of backward
> compatibility. Also if a developer wants to use his own cache logic,
> he just can refuse to pass some of the configs into the framework, and
> instead make his own implementation with already existing configs and
> metrics (but actually I think that it's a rare case).
>
> > filters and projections should be pushed all the way down to the table
> function, like what we do in the scan source
>
> It's the great purpose. But the truth is that the ONLY connector that
> supports filter pushdown is FileSystemTableSource
> (no database connector supports it currently). Also for some databases
> it's simply impossible to pushdown such complex filters that we have
> in Flink.
>
> >  only applying these optimizations to the cache seems not quite useful
>
> Filters can cut off an arbitrarily large amount of data from the
> dimension table. For a simple example, suppose in dimension table
> 'users'
> we have column 'age' with values from 20 to 40, and input stream
> 'clicks' that is ~uniformly distributed by age of users. If we have
> filter 'age > 30',
> there will be twice less data in cache

Re: Source alignment for Iceberg

2022-05-04 Thread Becket Qin
Hey Piotr,

I think the mechanism FLIP-182 provided is a reasonable default one, which
ensures the watermarks are only drifted by an upper bound. However,
admittedly there are also other strategies for different purposes.

In the Iceberg case, I am not sure if a static strictly allowed watermark
drift is desired. The source might just want to finish reading the assigned
splits as fast as possible. And it is OK to have a drift of "one split",
instead of a fixed time period.

As another example, if there are some fast readers whose splits are always
throttled, while the other slow readers are struggling to keep up with the
rest of the splits, the split enumerator may decide to reassign the slow
splits so all the readers have something to read. This would need the
SplitEnumerator to be aware of the watermark progress on each reader. So it
seems useful to expose the WatermarkAlignmentEvent information to the
SplitEnumerator as well.

Thanks,

Jiangjie (Becket) Qin



On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski  wrote:

> Hi Steven,
>
> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just emit
> all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
> block the splits that are too much into the future? I can see this being an
> issue if the existence of too many blocked splits is occupying too many
> resources.
>
> If that's the case, indeed SourceCoordinator/SplitEnumerator would have to
> decide on some basis how many and which splits to assign in what order. But
> in that case I'm not sure how much you could use from FLIP-182 and
> FLIP-217. They seem somehow orthogonal to me, operating on different
> levels. FLIP-182 and FLIP-217 are working with whatever splits have already
> been generated and assigned. You could leverage FLIP-182 and FLIP-217 and
> take care of only the problem to limit the number of parallel active
> splits. And here I'm not sure if it would be worth generalising a solution
> across different connectors.
>
> Regarding the global watermark, I made a related comment sometime ago
> about it [1]. It sounds to me like you also need to solve this problem,
> otherwise Iceberg users will encounter late records in case of some race
> conditions between assigning new splits and completions of older.
>
> Best,
> Piotrek
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>
> pon., 2 maj 2022 o 04:26 Steven Wu  napisał(a):
>
>> add dev@ group to the thread as Thomas suggested
>>
>> Arvid,
>>
>> The scenario 3 (Dynamic assignment + temporary no split) in the FLIP-180
>> (idleness) can happen to Iceberg source alignment, as readers can be
>> temporarily starved due to the holdback by the enumerator when assigning
>> new splits upon request.
>>
>> Totally agree that we should decouple this discussion with the FLIP-217,
>> which addresses the split level watermark alignment problem as a follow-up
>> of FLIP-182
>>
>> Becket,
>>
>> Yes, currently Iceberg source implemented the alignment leveraging the
>> dynamic split assignment from FLIP-27 design. Basically, the enumerator
>> can
>> hold back split assignments to readers when necessary. Everything are
>> centralized in the enumerator: (1) watermark extraction and aggregation
>> (2)
>> alignment decision and execution
>>
>> The motivation of this discussion is to see if Iceberg source can leverage
>> some of the watermark alignment solutions (like FLIP-182) from Flink
>> framework. E.g., as mentioned in the doc, Iceberg source can potentially
>> leverage the FLIP-182 framework to do the watermark extraction and
>> aggregation. For the alignment decision and execution, we can keep them in
>> the centralized enumerator.
>>
>> Thanks,
>> Steven
>>
>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin  wrote:
>>
>> > Hi Steven,
>> >
>> > Thanks for pulling me into this thread. I think the timestamp
>> > alignment use case here is a good example of what FLIP-27 was designed
>> for.
>> >
>> > Technically speaking, Iceberg source can already implement the timestamp
>> > alignment in the Flink new source even without FLIP-182. However, I
>> > understand the rationale here because timestamp alignment is also
>> trying to
>> > orchestrate the consumption of splits. However, it looks like FLIP-182
>> was
>> > not designed in a way that it can be easily extended for other use
>> cases.
>> > It may probably worth thinking of a more general mechanism to answer the
>> > following questions:
>> >
>> > 1. What information whose source of truth is the Flink framework should
>> be
>> > exposed to the SplitEnumerator and SourceReader? And how?
>> > 2. What control actions in the Flink framework are worth exposing to the
>> > SplitEnumerators and SourceReaders? And how?
>> >
>> > In the context of timestamp alignment, the first question is more
>> > relevant. For example, instead of hardcode the ReportWatermarkEvent
>> > handl

[jira] [Created] (FLINK-27489) Allow users to run dedicated tests in the CI

2022-05-04 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-27489:


 Summary: Allow users to run dedicated tests in the CI
 Key: FLINK-27489
 URL: https://issues.apache.org/jira/browse/FLINK-27489
 Project: Flink
  Issue Type: New Feature
  Components: Build System / CI
Reporter: Dawid Wysakowicz


Users can specify a dedicated test that is run on the CI in a PR.

Users are able to run any test at any given point in time.

It should use the existing user interface (e.g. FlinkBot).
{code}
@flinkbot run 
org.apache.flink.test.checkpointing.TimestampedFileInputSplitTest#testSplitComparison
{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27490) [JUnit5 Migration] Module: flink-table-code-splitter

2022-05-04 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-27490:
---

 Summary: [JUnit5 Migration] Module: flink-table-code-splitter
 Key: FLINK-27490
 URL: https://issues.apache.org/jira/browse/FLINK-27490
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-05-04 Thread Anton Kalashnikov

Hi,

It is a good point about possible deadlock I have no answer for it right 
now but we definitely should take it into account.



`Integer.MAX_VALUE` as default value was my idea as well but now, as 
Dawid mentioned, I think it is dangerous since it is too implicit for 
the user and if the user submits one more job for the same TaskManger 
but one subtask takes a lot of buffers, it will be not clear for the 
user what's going on and why the second job can not start. So if the 
user sets manually the big number of overdraft it is ok but the 
implicitly big number looks not so good.



I still don't understand how should be limited "reserve" implementation. 
I mean if we have X buffers in total and the user sets overdraft equal 
to X we obviously can not reserve all buffers, but how many we are 
allowed to reserve? Should it be a different configuration like 
percentegeForReservedBuffers?



>> That's a lot of buffers. I don't think we need that many for 
broadcasting

watermarks.


Actually, I totally agree that we don't need a lot of buffers for 
overdraft. I think that even if we implement the first task "ignoring 
maxBuffersPerChannel while processing record", it covers most cases 
already. but if we will be able to allocate several more buffers(~10) to 
cover the corner case when we are out of all buffers, it will be great.



In a conclusion, I think that if we are talking about a small number of 
overdraft buffers, both implementation looks pretty similar(easy to 
implement, and small/no risks of problems). But if we are talking about 
the big number(~parallelism), both implementations require extra 
complexity for avoiding problems(deadlocks, reserving too many buffers).


I will think about the arguments for both implementations a little more.

--

Best regards,
Anton Kalashnikov

04.05.2022 11:28, Dawid Wysakowicz пишет:

Hey all,

I have not replied in the thread yet, but I was following the discussion.

Personally, I like Fanrui's and Anton's idea. As far as I understand 
it the idea to distinguish between inside flatMap & outside would be 
fairly simple, but maybe slightly indirect. The checkAvailability 
would remain unchanged and it is checked always between separate 
invocations of the UDF. Therefore the overdraft buffers would not 
apply there. However once the pool says it is available, it means it 
has at least an initial buffer. So any additional request without 
checking for availability can be considered to be inside of processing 
a single record. This does not hold just for the LegacySource as I 
don't think it actually checks for the availability of buffers in the 
LocalBufferPool.


In the offline chat with Anton, we also discussed if we need a limit 
of the number of buffers we could overdraft (or in other words if the 
limit should be equal to Integer.MAX_VALUE), but personally I'd prefer 
to stay on the safe side and have it limited. The pool of network 
buffers is shared for the entire TaskManager, so it means it can be 
shared even across tasks of separate jobs. However, I might be just 
unnecessarily cautious here.


Best,

Dawid

On 04/05/2022 10:54, Piotr Nowojski wrote:

Hi,

Thanks for the answers.


we may still need to discuss whether the
overdraft/reserve/spare should use extra buffers or buffers
in (exclusive + floating buffers)?

and


These things resolve the different problems (at least as I see that).
The current hardcoded "1"  says that we switch "availability" to
"unavailability" when one more buffer is left(actually a little less
than one buffer since we write the last piece of data to this last
buffer). The overdraft feature doesn't change this logic we still want
to switch to "unavailability" in such a way but if we are already in
"unavailability" and we want more buffers then we can take "overdraft
number" more. So we can not avoid this hardcoded "1" since we need to
understand when we should switch to "unavailability"

Ok, I see. So it seems to me that both of you have in mind to keep the
buffer pools as they are right now, but if we are in the middle of
processing a record, we can request extra overdraft buffers on top of
those? This is another way to implement the overdraft to what I was
thinking. I was thinking about something like keeping the "overdraft" or
more precisely buffer "reserve" in the buffer pool. I think my version
would be easier to implement, because it is just fiddling with min/max
buffers calculation and slightly modified `checkAvailability()` logic.

On the other hand  what you have in mind would better utilise the 
available
memory, right? It would require more code changes (how would we know 
when
we are allowed to request the overdraft?). However, in this case, I 
would

be tempted to set the number of overdraft buffers by default to
`Integer.MAX_VALUE`, and let the system request as many buffers as
necessary. The only downside that I can think of (apart of higher
complexity) would be higher chance of hitting a known/unsolved 
dea

Re: Source alignment for Iceberg

2022-05-04 Thread Steven Wu
Piotr, thanks a lot for your feedback.

> I can see this being an issue if the existence of too many blocked splits
is occupying too many resources.

This is not desirable. Eagerly assigning many splits to a reader can defeat
the benefits of pull based dynamic split assignments. Iceberg readers
request one split at a time upon start or completion of a split. Dynamic
split assignment is better for work sharing/stealing as Becket mentioned.
Limiting number of active splits can be handled by the FLIP-27 Iceberg
source and is somewhat orthogonal to watermark alignment.

> Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle the
watermark alignment and block the splits that are too much into the future?

The enumerator just assigns the next split to the requesting reader instead
of holding back the split assignment. Let the reader handle the pause (if
the file split requires alignment wait).  This strategy might work and
leverage more from the framework.

We probably need the following to make this work
* extract watermark/timestamp only at the completion of a split (not at
record level). Because records in a file aren't probably not sorted by the
timestamp field, the pause or watermark advancement is probably better done
at file level.
* source readers checkpoint the watermark. otherwise, upon restart readers
won't be able to determine the local watermark and pause for alignment. We
don't want to emit records upon restart due to unknown watermark info.

All,

Any opinion on different timestamp for source alignment (vs Flink
application watermark)? For Iceberg source, we might want to enforce
alignment on kafka timestamp but Flink application watermark may use event
time field from payload.

Thanks,
Steven

On Wed, May 4, 2022 at 7:02 AM Becket Qin  wrote:

> Hey Piotr,
>
> I think the mechanism FLIP-182 provided is a reasonable default one, which
> ensures the watermarks are only drifted by an upper bound. However,
> admittedly there are also other strategies for different purposes.
>
> In the Iceberg case, I am not sure if a static strictly allowed watermark
> drift is desired. The source might just want to finish reading the assigned
> splits as fast as possible. And it is OK to have a drift of "one split",
> instead of a fixed time period.
>
> As another example, if there are some fast readers whose splits are always
> throttled, while the other slow readers are struggling to keep up with the
> rest of the splits, the split enumerator may decide to reassign the slow
> splits so all the readers have something to read. This would need the
> SplitEnumerator to be aware of the watermark progress on each reader. So it
> seems useful to expose the WatermarkAlignmentEvent information to the
> SplitEnumerator as well.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski 
> wrote:
>
> > Hi Steven,
> >
> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just emit
> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
> > block the splits that are too much into the future? I can see this being
> an
> > issue if the existence of too many blocked splits is occupying too many
> > resources.
> >
> > If that's the case, indeed SourceCoordinator/SplitEnumerator would have
> to
> > decide on some basis how many and which splits to assign in what order.
> But
> > in that case I'm not sure how much you could use from FLIP-182 and
> > FLIP-217. They seem somehow orthogonal to me, operating on different
> > levels. FLIP-182 and FLIP-217 are working with whatever splits have
> already
> > been generated and assigned. You could leverage FLIP-182 and FLIP-217 and
> > take care of only the problem to limit the number of parallel active
> > splits. And here I'm not sure if it would be worth generalising a
> solution
> > across different connectors.
> >
> > Regarding the global watermark, I made a related comment sometime ago
> > about it [1]. It sounds to me like you also need to solve this problem,
> > otherwise Iceberg users will encounter late records in case of some race
> > conditions between assigning new splits and completions of older.
> >
> > Best,
> > Piotrek
> >
> > [1]
> >
> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
> >
> > pon., 2 maj 2022 o 04:26 Steven Wu  napisał(a):
> >
> >> add dev@ group to the thread as Thomas suggested
> >>
> >> Arvid,
> >>
> >> The scenario 3 (Dynamic assignment + temporary no split) in the FLIP-180
> >> (idleness) can happen to Iceberg source alignment, as readers can be
> >> temporarily starved due to the holdback by the enumerator when assigning
> >> new splits upon request.
> >>
> >> Totally agree that we should decouple this discussion with the FLIP-217,
> >> which addresses the split level watermark alignment problem as a
> follow-up
> >> of FLIP-182
> >>
> >> Becket,
> >>
> >> 

Pause processing of Timer messages temporarily.

2022-05-04 Thread Rajat Gupta
Hi Members,

I am currently working on a project for which we need to do some
modifications in our streaming application. We are using Flink to achieve
that.

One of the technical requirements is to pause the processing of Timer
Messages temporarily (lets say 5 mins) and then start the processing of the
timer messages.

How can we achieve this?

I found the API to Shutdown the TimerService for our application, but could
not find anything to restart it again.

Additional questions

1. If I shutdown the timer Service, the call to `RegisterProcessingTimer`
would fail, will it also delete the timers which were already registered?

My use case is that I want to register the timers but don't want the
`OnTimer(...) method to get invoked.

How can I achieve such functionality?

Thank you in advance for getting back to me.

Regards,
Rajat Gupta


[jira] [Created] (FLINK-27491) Support env replacement in flink-kubernetes-operator CR

2022-05-04 Thread Yang Wang (Jira)
Yang Wang created FLINK-27491:
-

 Summary: Support env replacement in flink-kubernetes-operator CR
 Key: FLINK-27491
 URL: https://issues.apache.org/jira/browse/FLINK-27491
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Yang Wang


Flink deployment resources support env interpolation natively using $() 
syntax[1]. Users expected this to "just work" like other resources when using 
the operator, but it does not.

It would be a great addition, simplifying job startup decision-making while 
following existing conventions.

 

 
{code:java}
job:
  jarURI: local:///my.jar
  entryClass: my.JobMainKt
  args:
    - "--kafka.bootstrap.servers"
    - "my.kafka.host:9093"
    - "--kafka.sasl.username"
    - "$(KAFKA_SASL_USERNAME)"
    - "--kafka.sasl.password"
    - "$(KAFKA_SASL_PASSWORD)" {code}
 
[1]. 
[https://kubernetes.io/docs/tasks/inject-data-application/_print/#use-environment-variables-to-define-arguments]
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [VOTE] FLIP-216: Introduce pluggable dialect and plan for migrating Hive dialect

2022-05-04 Thread yuxia
Hi all,

The voting time for FLIP-216 has passed. I'm closing the vote now.

There were 6 votes, 5 of which are binding:
- Ron
- Martijn Visser (binding)
- Jark Wu (binding)
- Jing Zhang (binding)
- Jingsong Li (binding)
- Leonard Xu (binding)

There were no disapproving votes.

Thus, FLIP-216 has been accepted.

Thanks everyone for joining the discussion and giving feedback!



- 原始邮件 -
发件人: "Leonard Xu" 
收件人: "dev" 
发送时间: 星期五, 2022年 4 月 29日 下午 1:57:44
主题: Re: [VOTE] FLIP-216: Introduce pluggable dialect and plan for migrating 
Hive dialect

Thanks Yuxia for driving this work.

+1(binding)

Best,
Leonard

> 2022年4月28日 下午8:29,Jing Zhang  写道:
> 
> +1 (binding)
> 
> Best,
> Jing Zhang
> 
> Martijn Visser  于2022年4月28日周四 20:11写道:
> 
>> +1 (Binding)
>> 
>> On Thu, 28 Apr 2022 at 13:40, ron  wrote:
>> 
>>> +1
>>> 
>>> Best,
>>> Ron
>>> 
>>> 
 -原始邮件-
 发件人: "Jark Wu" 
 发送时间: 2022-04-28 15:46:22 (星期四)
 收件人: dev 
 抄送:
 主题: Re: [VOTE] FLIP-216: Introduce pluggable dialect and plan for
>>> migrating Hive dialect
 
 +1 (binding)
 
 Thank Yuxia, for driving this work.
 
 Best,
 Jark
 
 On Thu, 28 Apr 2022 at 11:58, Jingsong Li 
>>> wrote:
 
> +1 (Binding)
> 
> A very good step to move forward.
> 
> Best.
> Jingsong
> 
> On Wed, Apr 27, 2022 at 9:33 PM yuxia 
>>> wrote:
>> 
>> Hi, everyone
>> 
>> Thanks all for attention to FLIP-216: Introduce pluggable dialect
>> and
> plan for migrating Hive dialect [1] and participation in the
>>> discussion in
> the mail thread [2].
>> 
>> I'd like to start a vote for it. The vote will be open for at least
>>> 72
> hours unless there is an objection or not enough votes.
>> 
>> [1] [
> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-216%3A++Introduce+pluggable+dialect+and+plan+for+migrating+Hive+dialect
> |
> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-216%3A++Introduce+pluggable+dialect+and+plan+for+migrating+Hive+dialect
> ]
>> [2] [
>>> https://lists.apache.org/thread/66g79w5zlod2ylyv8k065j57pjjmv1jo
> | https://lists.apache.org/thread/66g79w5zlod2ylyv8k065j57pjjmv1jo ]
>> 
>> 
>> Best regards,
>> Yuxia
> 
>>> 
>>> 
>>> --
>>> Best,
>>> Ron
>>> 
>>


[jira] [Created] (FLINK-27492) Flink table / sql scala API is missing in the binary package

2022-05-04 Thread Yun Gao (Jira)
Yun Gao created FLINK-27492:
---

 Summary: Flink table / sql scala API is missing in the binary 
package
 Key: FLINK-27492
 URL: https://issues.apache.org/jira/browse/FLINK-27492
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.15.0, 1.16.0
Reporter: Yun Gao


Currently it seems the flink-scala-api, flink-table-api-scala-bridge is missing 
from the binary package. If we start a standalone cluster from the binary 
distribution package and then submit a table / sql job in scala, it would fail 
due to not found the StreamTableEnvironment class. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-05-04 Thread rui fan
Hi everyone,

I still have some questions.

1. If the total buffers of LocalBufferPool <= the reserve buffers, will
LocalBufferPool never be available? Can't process data?
2. If the overdraft buffer use the extra buffers, when the downstream
task inputBuffer is insufficient, it should fail to start the job, and then
restart? When the InputBuffer is initialized, it will apply for enough
buffers, right?

Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
When we finish this feature and after users use it, if users feedback
this issue we can discuss again.

Thanks
fanrui

On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz 
wrote:

> Hey all,
>
> I have not replied in the thread yet, but I was following the discussion.
>
> Personally, I like Fanrui's and Anton's idea. As far as I understand it
> the idea to distinguish between inside flatMap & outside would be fairly
> simple, but maybe slightly indirect. The checkAvailability would remain
> unchanged and it is checked always between separate invocations of the
> UDF. Therefore the overdraft buffers would not apply there. However once
> the pool says it is available, it means it has at least an initial
> buffer. So any additional request without checking for availability can
> be considered to be inside of processing a single record. This does not
> hold just for the LegacySource as I don't think it actually checks for
> the availability of buffers in the LocalBufferPool.
>
> In the offline chat with Anton, we also discussed if we need a limit of
> the number of buffers we could overdraft (or in other words if the limit
> should be equal to Integer.MAX_VALUE), but personally I'd prefer to stay
> on the safe side and have it limited. The pool of network buffers is
> shared for the entire TaskManager, so it means it can be shared even
> across tasks of separate jobs. However, I might be just unnecessarily
> cautious here.
>
> Best,
>
> Dawid
>
> On 04/05/2022 10:54, Piotr Nowojski wrote:
> > Hi,
> >
> > Thanks for the answers.
> >
> >> we may still need to discuss whether the
> >> overdraft/reserve/spare should use extra buffers or buffers
> >> in (exclusive + floating buffers)?
> > and
> >
> >> These things resolve the different problems (at least as I see that).
> >> The current hardcoded "1"  says that we switch "availability" to
> >> "unavailability" when one more buffer is left(actually a little less
> >> than one buffer since we write the last piece of data to this last
> >> buffer). The overdraft feature doesn't change this logic we still want
> >> to switch to "unavailability" in such a way but if we are already in
> >> "unavailability" and we want more buffers then we can take "overdraft
> >> number" more. So we can not avoid this hardcoded "1" since we need to
> >> understand when we should switch to "unavailability"
> > Ok, I see. So it seems to me that both of you have in mind to keep the
> > buffer pools as they are right now, but if we are in the middle of
> > processing a record, we can request extra overdraft buffers on top of
> > those? This is another way to implement the overdraft to what I was
> > thinking. I was thinking about something like keeping the "overdraft" or
> > more precisely buffer "reserve" in the buffer pool. I think my version
> > would be easier to implement, because it is just fiddling with min/max
> > buffers calculation and slightly modified `checkAvailability()` logic.
> >
> > On the other hand  what you have in mind would better utilise the
> available
> > memory, right? It would require more code changes (how would we know when
> > we are allowed to request the overdraft?). However, in this case, I would
> > be tempted to set the number of overdraft buffers by default to
> > `Integer.MAX_VALUE`, and let the system request as many buffers as
> > necessary. The only downside that I can think of (apart of higher
> > complexity) would be higher chance of hitting a known/unsolved deadlock
> [1]
> > in a scenario:
> > - downstream task hasn't yet started
> > - upstream task requests overdraft and uses all available memory segments
> > from the global pool
> > - upstream task is blocked, because downstream task hasn't started yet
> and
> > can not consume any data
> > - downstream task tries to start, but can not, as there are no available
> > buffers
> >
> >> BTW, for watermark, the number of buffers it needs is
> >> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
> >> the watermark won't block in requestMemory.
> > and
> >
> >> the best overdraft size will be equal to parallelism.
> > That's a lot of buffers. I don't think we need that many for broadcasting
> > watermarks. Watermarks are small, and remember that every subpartition
> has
> > some partially filled/empty WIP buffer, so the vast majority of
> > subpartitions will not need to request a new buffer.
> >
> > Best,
> > Piotrek
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-13203
> >
> > wt., 3 maj 2022 o 17:15 Anton Kalashni

Re: [VOTE] Apache Flink Table Store 0.1.0, release candidate #2

2022-05-04 Thread Jingsong Li
Hi Konstantin,

Thanks for your suggestions. I will add more descriptions.

Best,
Jingsong

On Sat, Apr 30, 2022 at 12:14 AM Jane Chan  wrote:
>
> Hi all,
>
> +1 for the release (non-binding). The check follows the Jira release
> note[1] and is listed as follows.
>
> - Verify that the source distributions of [2] do not contain any
> binaries;
> - Build the source distribution to ensure all source files have Apache
> headers, and test functionality against Maven Staged Artifacts under Java8
> and Scala 2.12;
> - Check README.md;
> - Check file hashes(MD5 and SHA-1 for [3], SHA-512 for [2]) and GPG
> signatures for [2] and [3] with gpg 2.3.6;
>
> Best,
> Jane Chan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
> [2]
> https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/
> [3] https://repository.apache.org/content/repositories/orgapacheflink-1502/
>
> On Fri, Apr 29, 2022 at 10:24 AM Jingsong Li  wrote:
>
> > Hi everyone,
> >
> > Please review and vote on the release candidate #2 for the version 0.1.0 of
> > Apache Flink Table Store, as follows:
> >
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > **Release Overview**
> >
> > As an overview, the release consists of the following:
> > a) Table Store canonical source distribution, to be deployed to the
> > release repository at dist.apache.org
> > b) Maven artifacts to be deployed to the Maven Central Repository
> >
> > **Staging Areas to Review**
> >
> > The staging areas containing the above mentioned artifacts are as follows,
> > for your review:
> > * All artifacts for a) and b) can be found in the corresponding dev
> > repository at dist.apache.org [2]
> > * All artifacts for c) can be found at the Apache Nexus Repository [3]
> > * Pre Bundled Binaries Jar can work fine with quick start [4][5]
> >
> > All artifacts are signed with the key
> > 2C2B6A653B07086B65E4369F7C76245E0A318150 [6]
> >
> > Other links for your review:
> > * JIRA release notes [7]
> > * source code tag "release-0.1.0-rc2" [8]
> > * PR to update the website Downloads page to include Table Store
> > links [9]
> >
> > **Vote Duration**
> >
> > The voting time will run for at least 72 hours.
> > It is adopted by majority approval, with at least 3 PMC affirmative votes.
> >
> > Best,
> > Jingsong Lee
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
> > [2]
> > https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/
> > [3]
> > https://repository.apache.org/content/repositories/orgapacheflink-1502/
> > [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1502/org/apache/flink/flink-table-store-dist/0.1.0/flink-table-store-dist-0.1.0.jar
> > [5]
> > https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/docs/try-table-store/quick-start/
> > [6] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [7]
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351234
> > [8] https://github.com/apache/flink-table-store/tree/release-0.1.0-rc2
> > [9] https://github.com/apache/flink-web/pull/531
> >


Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits

2022-05-04 Thread Piotr Nowojski
Hi Becket and Dawid,

> I feel that no matter which option we choose this can not be solved
entirely in either of the options, because of the point above and because
the signature of SplitReader#pauseOrResumeSplits and
SourceReader#pauseOrResumeSplits are slightly different (one identifies
splits with splitId the other one passes the splits directly).

Yes, that's a good point in this case and for features that need to be
implemented in more than one place.

> Is there any reason for pausing reading from a split an optional feature,
> other than that this was not included in the original interface?

An additional argument in favor of making it optional is to simplify source
implementation. But on its own I'm not sure if that would be enough to
justify making this feature optional. Maybe.

> I think it would be way simpler and clearer to just let end users and
Flink
> assume all the connectors will implement this feature.

As I wrote above that would be an interesting choice to make (ease of
implementation for new users, vs system consistency). Regardless of that,
yes, for me the main argument is the API backward compatibility. But let's
clear a couple of points:
- The current proposal adding methods to the base interface with default
implementations is an OPTIONAL feature. Same as the decorative version
would be.
- Decorative version could implement "throw UnsupportedOperationException"
if user enabled watermark alignment just as well and I agree that's a
better option compared to logging a warning.

Best,
Piotrek


śr., 4 maj 2022 o 15:40 Becket Qin  napisał(a):

> Thanks for the reply and patient discussion, Piotr and Dawid.
>
> Is there any reason for pausing reading from a split an optional feature,
> other than that this was not included in the original interface?
>
> To be honest I am really worried about the complexity of the user story
> here. Optional features like this have a high overhead. Imagine this
> feature is optional, now a user enabled watermark alignment and defined a
> few watermark groups. Would it work? Hmm, that depends on whether the
> involved Source has implmemented this feature. If the Sources are well
> documented, good luck. Otherwise end users may have to look into the code
> of the Source to see whether the feature is supported. Which is something
> they shouldn't have to do.
>
> I think it would be way simpler and clearer to just let end users and Flink
> assume all the connectors will implement this feature. After all the
> watermark group is not optinoal to the end users. If in some rare cases,
> the feature cannot be supported, a clear UnsupportedOperationException will
> be thrown to tell users to explicitly remove this Source from the watermark
> group. I don't think we should have a warning message here, as they tend to
> be ignored in many cases. If we do this, we don't even need the supportXXX
> method in the Source for this feature. In fact this is exactly how many
> interfaces works today. For example, SplitEnumerator#addSplitsBack() is not
> supported by Pravega source because it does not support partial failover.
> In that case, it simply throws an exception to trigger a global recovery.
>
> The reason we add a default implementation in this case would just for the
> sake of backwards compatibility so the old source can still compile. Sure,
> in short term, this feature might not be supported by many existing
> sources. That is OK, and it is quite visible to the source developers that
> they did not override the default impl which throws an
> UnsupportedOperationException.
>
> @Dawid,
>
> the Java doc of the SupportXXX() method in the Source would be the single
> >> source of truth regarding how to implement this feature.
> >
> >
>
> I also don't find it entirely true. Half of the classes are theoretically
> > optional and are utility classes from the point of view how the
> interfaces
> > are organized. Theoretically users do not need to use any of
> > SourceReaderBase & SplitReader. Would be weird to list their methods in
> the
> > Source interface.
>
> I think the ultimate goal of java docs is to guide users to implement the
> Source. If SourceReaderBase is the preferred way to implement a
> SourceReader, it seems worth mentioning that. Even the Java language
> documentation interfaces lists the konwn implementations [1] so people can
> leverage them. But for this particular case, if we make the feature
> non-optional, we don't even need the supportXXX() method for now.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Wed, May 4, 2022 at 4:37 PM Dawid Wysakowicz 
> wrote:
>
> > Hey Piotr and Becket,
> >
> > First of all, let me say I am happy with whichever option is agreed in
> the
> > discussion.
> >
> > I wanted to clarify a few points from the discussion though:
> >
> > @Becket:
> >
> > The main argument for adding the methods to the SourceReader is that
> these
> > methods are effectively NON-OPTIONAL to SourceReader impl, i.e. starting
> > from this FLI

[DISCUSS] FLIP-228: Support Within between events in CEP Pattern

2022-05-04 Thread Nicholas
Hi everyone,




Pattern#withIn interface in CEP defines the maximum time interval in which a 
matching pattern has to be completed in order to be considered valid, which 
interval corresponds to the maximum time gap between first and the last event. 
The interval representing the maximum time gap between events is required to 
define in the scenario like purchasing good within a maximum of 5 minutes after 
browsing. 




I would like to start a discussion about FLIP-228[1], in which within between 
events is proposed in Pattern to support the definition of the maximum time 
interval in which a completed partial matching pattern is considered valid, 
which interval represents the maximum time gap between events for partial 
matching Pattern.




Hence we propose the Pattern#partialWithin interface to define the maximum time 
interval in which a completed partial matching pattern is considered valid. 
Please take a look at the FLIP page [1] to get more details. Any feedback about 
the FLIP-228 would be appreciated!




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-228%3A+Support+Within+between+events+in+CEP+Pattern




Best regards,

Nicholas Jiang

[ANNOUNCE] Apache Flink 1.15.0 released

2022-05-04 Thread Yun Gao
The Apache Flink community is very happy to announce the release of
Apache Flink 1.15.0, which is the first release for the Apache Flink
1.15 series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the
improvements for this release:
https://flink.apache.org/news/2022/05/05/1.15-announcement.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350442

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Joe, Till, Yun Gao


Re: [VOTE] Apache Flink Table Store 0.1.0, release candidate #2

2022-05-04 Thread Nicholas Jiang
Hi everyone,

+1 for the release (non-binding). 

- Built and compiled source codes [PASSED]
- Went through quick start guide [PASSED]
- Checked README.md [PASSED]
- Checked that use the table store jar to build query table application [PASSED]

Best regards,

Nicholas Jiang

On 2022/04/29 02:24:09 Jingsong Li wrote:
> Hi everyone,
> 
> Please review and vote on the release candidate #2 for the version 0.1.0 of
> Apache Flink Table Store, as follows:
> 
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> 
> **Release Overview**
> 
> As an overview, the release consists of the following:
> a) Table Store canonical source distribution, to be deployed to the
> release repository at dist.apache.org
> b) Maven artifacts to be deployed to the Maven Central Repository
> 
> **Staging Areas to Review**
> 
> The staging areas containing the above mentioned artifacts are as follows,
> for your review:
> * All artifacts for a) and b) can be found in the corresponding dev
> repository at dist.apache.org [2]
> * All artifacts for c) can be found at the Apache Nexus Repository [3]
> * Pre Bundled Binaries Jar can work fine with quick start [4][5]
> 
> All artifacts are signed with the key
> 2C2B6A653B07086B65E4369F7C76245E0A318150 [6]
> 
> Other links for your review:
> * JIRA release notes [7]
> * source code tag "release-0.1.0-rc2" [8]
> * PR to update the website Downloads page to include Table Store
> links [9]
> 
> **Vote Duration**
> 
> The voting time will run for at least 72 hours.
> It is adopted by majority approval, with at least 3 PMC affirmative votes.
> 
> Best,
> Jingsong Lee
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/
> [3] https://repository.apache.org/content/repositories/orgapacheflink-1502/
> [4] 
> https://repository.apache.org/content/repositories/orgapacheflink-1502/org/apache/flink/flink-table-store-dist/0.1.0/flink-table-store-dist-0.1.0.jar
> [5] 
> https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/docs/try-table-store/quick-start/
> [6] https://dist.apache.org/repos/dist/release/flink/KEYS
> [7] 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351234
> [8] https://github.com/apache/flink-table-store/tree/release-0.1.0-rc2
> [9] https://github.com/apache/flink-web/pull/531
> 


Re: [ANNOUNCE] Apache Flink 1.15.0 released

2022-05-04 Thread Martijn Visser
Thank you Yun Gao, Till and Joe for driving this release. Your efforts are
greatly appreciated!

To everyone who has opened Jira tickets, provided PRs, reviewed code,
written documentation or anything contributed in any other way, this
release was (once again) made possible by you! Thank you.

Best regards,

Martijn

Op do 5 mei 2022 om 08:38 schreef Yun Gao 

> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.15.0, which is the first release for the Apache Flink
> 1.15 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this release:
> https://flink.apache.org/news/2022/05/05/1.15-announcement.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350442
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Joe, Till, Yun Gao
>
-- 

Martijn Visser | Product Manager

mart...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


Re: [DISCUSS] Keep the Community Member Information Up-to-Date

2022-05-04 Thread Xintong Song
Thanks Tison and Yun for sharing your opinions.

@Tison,
Keep both a potential out-of-date table and a link to the up-to-date list
sounds good to me. We can also add a notice about the page being
potentially out-of-date.

@Yun,
- I'm not sure about using someone's github avatar *by default*. I think we
should at least get permission from that person, which IMO makes little
difference than asking that person to update the website.
- Adding a reminder in the invitation letter sounds good. However, I'm not
sure whether we have such a place holding the template of an
invitation letter. It seems to me most PMC members would just randomly pick
a previous invitation letter from the mailing list archive and replace the
name.
- Adding a notice on the website definitely makes sense. We can also link
to the up-to-date list in that notice.


Thank you~

Xintong Song



On Fri, Apr 29, 2022 at 2:59 PM Yun Tang  wrote:

> Hi Xintong,
>
>
> +1 to add a link for the full member information.
>
> And I think the avatars are very friendly for website viewers. The lazy
> update might be caused by the invitation letter to new committer/PMC do not
> have such hint to tell them to update the website.
> If we can:
>
>   1.  Manually update the information on the website if someone voluntary,
> we can use the avatars from the github account by default.
>   2.  Add such reminder information in the invitation letters.
>   3.  Add descriptions in the website to tell viewers that the information
> might not be up-to-date.
>
> I think this looks like a better solution.
>
> Best
> Yun Tang
> 
> From: tison 
> Sent: Thursday, April 28, 2022 21:52
> To: dev 
> Subject: Re: [DISCUSS] Keep the Community Member Information Up-to-Date
>
> Hi Xintong,
>
> Thanks for starting this discussion.
>
> +0 to replace the information with link to
> https://projects.apache.org/committee.html?flink.
> +1 to add such a link.
>
> My opinion is that we the community doesn't have to keep the page up to
> date since Apache has a member page[1]
> that isn't up to date also.
>
> We can add one line to redirect to the whole list so that those who are
> "lazy" to add themselves on the page
> don't have to do it. And keep the table so that those who are proud to
> announce their membership or trying a commit
> with their commit access can do.
>
> Best,
> tison.
>
> [1] https://www.apache.org/foundation/members.html
>
>
> Xintong Song  于2022年4月28日周四 21:26写道:
>
> > >
> > > Personally I'm tempted to just link to
> > > https://projects.apache.org/committee.html?flink, if at all.
> > >
> >
> > Despite its fashionless look, I'm thinking the same thing...
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Apr 28, 2022 at 8:41 PM Jingsong Li 
> > wrote:
> >
> > > One value is that this page has avatars. :-)
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, Apr 28, 2022 at 8:27 PM Chesnay Schepler 
> > > wrote:
> > > >
> > > > Personally I'm tempted to just link to
> > > > https://projects.apache.org/committee.html?flink, if at all.
> > > >
> > > > I'm not sure overall whether this listing really provides value in
> the
> > > > first place.
> > > >
> > > > On 28/04/2022 13:58, Xintong Song wrote:
> > > > > Hi Flink Committers & PMC members,
> > > > >
> > > > > I just noticed that the list of community members on our website
> [1]
> > is
> > > > > quite out-of-date. According to the ASF roster [2], this project
> > > currently
> > > > > has 87 committers, 39 of which are PMC members. However, there's
> only
> > > 62
> > > > > people listed on our website, and many (e.g., me) have outdated
> > roles.
> > > > >
> > > > > I believe the list on the website is supposed to be updated by each
> > new
> > > > > committer / PMC member. I remember reading somewhere that suggested
> > new
> > > > > committers to add themselves to this list as the first trying-out
> for
> > > > > merging changes. Unfortunately I couldn't find it anymore.
> > > > >
> > > > > Do you think we should keep the page manually updated, or shall we
> > > > > investigate some way to keep it automatically synchronized?
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > > [1] https://flink.apache.org/community.html
> > > > >
> > > > > [2] https://whimsy.apache.org/roster/committee/flink
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-91: Support SQL Client Gateway

2022-05-04 Thread Nicholas Jiang
Hi Shengkai,

I have another concern about the submission of batch job. Does the Flink SQL 
gateway support to submit batch job? In Kyuubi, BatchProcessBuilder is used to 
submit batch job. What about the Flink SQL gateway?

Best regards,
Nicholas Jiang

On 2022/04/24 03:28:36 Shengkai Fang wrote:
> Hi. Jiang.
> 
> Thanks for your feedback!
> 
> > Do the public interfaces of GatewayService refer to any service?
> 
> We will only expose one GatewayService implementation. We will put the
> interface into the common package and the developer who wants to implement
> a new endpoint can just rely on the interface package rather than the
> implementation.
> 
> > What's the behavior of SQL Client Gateway working on Yarn or K8S? Does
> the SQL Client Gateway support application or session mode on Yarn?
> 
> I think we can support SQL Client Gateway to submit the jobs in
> application/sesison mode.
> 
> > Is there any event trigger in the operation state machine?
> 
> Yes. I have already updated the content and add more details about the
> state machine. During the revise, I found that I mix up the two concepts:
> job submission and job execution. In fact, we only control the submission
> mode at the gateway layer. Therefore, we don't need to mapping the
> JobStatus here. If the user expects that the synchronization behavior is to
> wait for the completion of the job execution before allowing the next
> statement to be executed, then the Operation lifecycle should also contains
> the job's execution, which means users should set `table.dml-sync`.
> 
> > What's the return schema for the public interfaces of GatewayService?
> Like getTable interface, what's the return value schema?
> 
> The API of the GatewayService return the java objects and the endpoint can
> organize the objects with expected schema. The return results is also list
> the section ComponetAPI#GatewayService#API. The return type of the
> GatewayService#getTable is `ContextResolvedTable`.
> 
> > How does the user get the operation log?
> 
> The OperationManager will register the LogAppender before the Operation
> execution. The Log Appender will hijack the logger and also write the log
> that related to the Operation to another files. When users wants to fetch
> the Operation log, the GatewayService will read the content in the file and
> return.
> 
> Best,
> Shengkai
> 
> 
> 
> 
> Nicholas Jiang  于2022年4月22日周五 16:21写道:
> 
> > Hi Shengkai.
> >
> > Thanks for driving the proposal of SQL Client Gateway. I have some
> > knowledge of Kyuubi and have some questions about the design:
> >
> > 1.Do the public interfaces of GatewayService refer to any service? If
> > referring to HiveService, does GatewayService need interfaces like
> > getQueryId etc.
> >
> > 2.What's the behavior of SQL Client Gateway working on Yarn or K8S? Does
> > the SQL Client Gateway support application or session mode on Yarn?
> >
> > 3.Is there any event trigger in the operation state machine?
> >
> > 4.What's the return schema for the public interfaces of GatewayService?
> > Like getTable interface, what's the return value schema?
> >
> > 5.How does the user get the operation log?
> >
> > Thanks,
> > Nicholas Jiang
> >
> > On 2022/04/21 06:42:30 Shengkai Fang wrote:
> > > Hi, Flink developers.
> > >
> > > I want to start a discussion about the FLIP-91: Support Flink SQL
> > > Gateway[1]. Flink SQL Gateway is a service that allows users to submit
> > and
> > > manage their jobs in the online environment with the pluggable endpoints.
> > > The reason why we introduce the Gateway with pluggable endpoints is that
> > > many users have their preferences. For example, the HiveServer2 users
> > > prefer to use the gateway with HiveServer2-style API, which has numerous
> > > tools. However, some filnk-native users may prefer to use the REST API.
> > > Therefore, we propose the SQL Gateway with pluggable endpoint.
> > >
> > > In the FLIP, we also propose the REST endpoint, which has the similar
> > > APIs compared to the gateway in the ververica/flink-sql-gateway[2]. At
> > the
> > > last, we discuss how to use the SQL Client to submit the statement to the
> > > Gateway with the REST API.
> > >
> > > I am glad that you can give some feedback about FLIP-91.
> > >
> > > Best,
> > > Shengkai
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Client+Gateway
> > > [2] https://github.com/ververica/flink-sql-gateway
> > >
> >
>