[jira] [Created] (FLINK-24563) Comparing timstamp_ltz with result of date_format with specific format throws NullPointerException

2021-10-15 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-24563:
---

 Summary: Comparing timstamp_ltz with result of date_format with 
specific format throws NullPointerException
 Key: FLINK-24563
 URL: https://issues.apache.org/jira/browse/FLINK-24563
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.15.0
Reporter: Caizhi Weng
 Fix For: 1.15.0, 1.14.1, 1.13.4


Add the following test case to 
{{org.apache.flink.table.planner.runtime.batch.sql.CalcITCase}} to reproduce 
this issue.
{code:scala}
@Test
def myTest(): Unit = {
  val data: Seq[Row] = Seq(
row(
  LocalDateTime.of(2021, 10, 15, 0, 0, 0),
  LocalDateTime.of(2021, 10, 15, 0, 0, 0).toInstant(ZoneOffset.UTC)))
  val dataId = TestValuesTableFactory.registerData(data)
  val ddl =
s"""
   |CREATE TABLE MyTable (
   |  ts TIMESTAMP,
   |  ltz TIMESTAMP_LTZ
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'true'
   |)
   |""".stripMargin
  tEnv.executeSql(ddl)

  checkResult("SELECT DATE_FORMAT(ts, 'MMdd') = ltz FROM MyTable", 
Seq(row(false)))
}
{code}

The exception stack is
{code}
java.lang.RuntimeException: Failed to fetch next result

at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at 
org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:109)
at 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:300)
at 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:140)
at 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:106)
at 
org.apache.flink.table.planner.runtime.batch.sql.CalcITCase.myTest(CalcITCase.scala:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
a

[jira] [Created] (FLINK-24564) Change the default compression to snappy for parquet, orc, avro in table

2021-10-15 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-24564:


 Summary: Change the default compression to snappy for parquet, 
orc, avro in table
 Key: FLINK-24564
 URL: https://issues.apache.org/jira/browse/FLINK-24564
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / Runtime
Reporter: Jingsong Lee
 Fix For: 1.15.0


According to the experience of other frameworks, snappy compression is 
recommended by default, which will reduce the file size.
This does not affect reading, because these formats will automatically 
uncompress the file according to the head information of the file.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24565) Port Avro FileSystemFormatFactory to StreamFormat

2021-10-15 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24565:
---

 Summary: Port Avro FileSystemFormatFactory to StreamFormat
 Key: FLINK-24565
 URL: https://issues.apache.org/jira/browse/FLINK-24565
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Caizhi Weng






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24566) Remove CsvFileSystemFormatFactory

2021-10-15 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24566:
---

 Summary: Remove CsvFileSystemFormatFactory
 Key: FLINK-24566
 URL: https://issues.apache.org/jira/browse/FLINK-24566
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani


The DeserializationSchema implementation we have should be enough



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [Discussion] Avoid redundancy of Javadoc @return tag comment

2021-10-15 Thread Jing Ge
Since images do not work with pony mail, enclosed please find them as
attachments.

Best regards
Jing

On Thu, Oct 14, 2021 at 6:43 PM Jing Ge  wrote:

> Agree. If the description contains more information than the return tag
> comment. Part of content overlap is acceptable. Otherwise I think it
> depends on the visibility and influence of the API. Agree again, it is not
> a big issue for internal interfaces and classes. But for some core public
> APIs that will be used by many application developers, such redundancy
> looks unprofessional and sometimes even confused, e.g. using read mode in
> Intellij Idea, it will look like:
>
> [image: image.png]
> [image: image.png]
>
> When I read these for the first time, I was confused, why again and again?
> Is there anything wrong with my Intellij idea, maybe the rendering has a
> bug? After I realized it was the "paragon of redundancy" - word from
> Robert C. Martin, you could imagine my feeling about the code quality at
> that time :-).
>
> I would suggest doing clean-up for some core public APIs that have an
> impact on application developers.
>
> best regards
> Jing
>
> On Thu, Oct 14, 2021 at 3:24 PM Piotr Nowojski 
> wrote:
>
>> +1 for trying to clean this up, but I'm not entirely sure in which
>> direction. Usually I was fixing this towards option 1. I agree, in your
>> example option 2. looks much better, but usually the javadoc contains a
>> bit
>> more information, not only copy/pasted @return text. For example:
>>
>> /**
>>  * Send out a request to a specified coordinator and return the
>> response.
>>  *
>>  * @param operatorId specifies which coordinator to receive the
>> request
>>  * @param request the request to send
>>  * @return the response from the coordinator
>>  */
>> CompletableFuture sendCoordinationRequest
>>
>> Sometimes this can be split quite nicely between @return and main java
>> doc:
>>
>> /**
>>  * Try to transition the execution state from the current state to the
>> new state.
>>  *
>>  * @param currentState of the execution
>>  * @param newState of the execution
>>  * @return true if the transition was successful, otherwise false
>>  */
>> private boolean transitionState(ExecutionState currentState,
>> ExecutionState newState);
>>
>> but that's not always the case.
>>
>> At the same time I don't have hard feelings either direction. After all it
>> doesn't seem to be that big of an issue even if we leave it as is.
>>
>> Best,
>> Piotrek
>>
>> czw., 14 paź 2021 o 14:25 Jing Ge  napisał(a):
>>
>> > Hi Flink developers,
>> >
>> > It might be a good idea to avoid the redundant javadoc comment found in
>> > some classes, e.g. org.apache.flink.core.fs.Path w.r.t. the @Return tag
>> > comment on some methods.
>> >
>> > To make the discussion clear, let's focus on a concrete example(there
>> are
>> > many more):
>> >
>> > > /**
>> > >  * Returns the FileSystem that owns this Path.
>> > >  *
>> > >  * @return the FileSystem that owns this Path
>> > >  * @throws IOException thrown if the file system could not be
>> retrieved
>> > >  */
>> > > public FileSystem getFileSystem() throws IOException {
>> > > return FileSystem.get(this.toUri());
>> > > }
>> > >
>> > >
>> > In order to remove the redundancy, there are two options:
>> >
>> > option 1: keep the description and remove the @Return tag comment:
>> >
>> > > /**
>> > >  * Returns the FileSystem that owns this Path.
>> > >  *
>> > >  * @throws IOException thrown if the file system could not be
>> retrieved
>> > >  */
>> > > public FileSystem getFileSystem() throws IOException {
>> > > return FileSystem.get(this.toUri());
>> > > }
>> > >
>> > > option 2: keep the @return tag comment and remove the duplicated
>> > description:
>> >
>> > > /**
>> > >  * @return the FileSystem that owns this Path
>> > >  * @throws IOException thrown if the file system could not be
>> retrieved
>> > >  */
>> > > public FileSystem getFileSystem() throws IOException {
>> > > return FileSystem.get(this.toUri());
>> > > }
>> > >
>> > > It looks like these two options are similar. From the developer's
>> > perspective, I would prefer using @return tag comment, i.e. option 2.
>> > Having an explicit @return tag makes it easier for me to find the return
>> > value quickly.
>> >
>> > This issue is very common, it has been used as a Noise Comments example
>> in
>> > Uncle Bob's famous "Clean Code" book on page 64 but unfortunately
>> without
>> > giving any clear recommendation about how to solve it. From
>> Stackoverflow,
>> > we can find an interesting discussion about this issue and developer's
>> > thoughts behind it:
>> >
>> >
>> https://stackoverflow.com/questions/10088311/javadoc-return-tag-comment-duplication-necessary
>> > .
>> > Javadoc 16 provides even a new feature to solve this common issue.
>> >
>> > Since @return is recommended to use for the javadoc, I would suggest
>> Flink
>> > community following it and therefore open this di

[jira] [Created] (FLINK-24567) Support create tables using CatalogView defination

2021-10-15 Thread Xianxun Ye (Jira)
Xianxun Ye created FLINK-24567:
--

 Summary: Support create tables using CatalogView defination
 Key: FLINK-24567
 URL: https://issues.apache.org/jira/browse/FLINK-24567
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Xianxun Ye


Now when we run this sql will throw a ValidationException
{code:java}
// code placeholder
sql("create view vi as select * from slothTest.testdb.animal");
sql("CREATE TABLE print_table WITH ('connector' = 'blackhole') LIKE vi 
(EXCLUDING ALL)");
{code}
{code:java}
org.apache.flink.table.api.ValidationException: Source table 
'`default_catalog`.`default_database`.`vi`' of the LIKE clause can not be a 
VIEW, at line 1, column 60
{code}
Create table from some views during running a perfermence test job could be 
usefull. 
 hello [~dwysakowicz] what do you think about this feature?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-10-15 Thread Etienne Chauchot

Hi Sandeep and community,

I'm a PMC member of the Beam community and also a contributor on Flink. 
I also used parquet in conjunction with Beam pipelines running with 
Flink Beam runner. So I should be able to help:


I don't think it is a Flink issue as you have a Beam pipeline so you use 
only Beam IOs that are wrapped by Beam runner inside a Flink operator at 
(Beam) translation time.


I don't think It is related to S3 either but much more to ParquetIO in 
Beam (read and write connector).


I'll check at the Beam side and get back to you.

Best

Etienne.

On 30/09/2021 14:42, Till Rohrmann wrote:

Hi Sandeep,

I am not a Beam expert. The problem might be caused by the used S3
filesystem implementation. Have you tried whether the same problem occurs
when using vanilla Flink's latest version? Alternatively, you could also
reach out to the Beam community or ask on Flink's user ML whether people
have experience with such a problem.

Some of the exceptions look as if your network is a bit flakey. You might
wanna look into the infrastructure you are running on.

Cheers,
Till

On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
 wrote:


Hi,
We have a simple Beam application which reads from Kafka, converts to
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
have a fixed window of 5 minutes after conversion to
PCollection and then writing to S3. We have around 320
columns in our data. Our intention is to write large files of size 128MB or
more so that it won’t have a small file problem when reading back from
Hive. But from what we observed it is taking too much memory to write to S3
(giving memory of 8GB to heap is not enough to write 50 MB files and it is
going OOM). When I increase memory for heap to 32GB then it take lot of
time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection parquetRecord = …….

parquetRecord.apply(FileIO.write()
 .via(ParquetIO.sink(getOutput_schema()))
 .to(outputPath.isEmpty() ? outputPath() : outputPath)
 .withNumShards(5)
 .withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


   1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
 at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
 at
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
 at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
 at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
 at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
 at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
 at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
 at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
 at
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
 at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
 at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
 at
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
 at java.lang.Iterable.forEach(Iterable.java:75)
 at
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
 at
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoke

[DISCUSS] Creating an external connector repository

2021-10-15 Thread Arvid Heise
Dear community,

Today I would like to kickstart a series of discussions around creating an
external connector repository. The main idea is to decouple the release
cycle of Flink with the release cycles of the connectors. This is a common
approach in other big data analytics projects and seems to scale better
than the current approach. In particular, it will yield the following
changes.


   -

   Faster releases of connectors: New features can be added more quickly,
   bugs can be fixed immediately, and we can have faster security patches in
   case of direct or indirect (through dependencies) security flaws.
   -

   New features can be added to old Flink versions: If the connector API
   didn’t change, the same connector jar may be used with different Flink
   versions. Thus, new features can also immediately be used with older Flink
   versions. A compatibility matrix on each connector page will help users to
   find suitable connector versions for their Flink versions.
   -

   More activity and contributions around connectors: If we ease the
   contribution and development process around connectors, we will see faster
   development and also more connectors. Since that heavily depends on the
   chosen approach discussed below, more details will be shown there.
   -

   An overhaul of the connector page: In the future, all known connectors
   will be shown on the same page in a similar layout independent of where
   they reside. They could be hosted on external project pages (e.g., Iceberg
   and Hudi), on some company page, or may stay within the main Flink reposi
   tory. Connectors may receive some sort of quality seal such that users
   can quickly access the production-readiness and we could also add which
   community/company promises which kind of support.
   -

   If we take out (some) connectors out of Flink, Flink CI will be faster
   and Flink devs will experience less build stabilities (which mostly come
   from connectors). That would also speed up Flink development.


Now I’d first like to collect your viewpoints on the ideal state. Let’s
first recap which approaches, we currently have:


   -

   We have half of the connectors in the main Flink repository. Relatively
   few of them have received updates in the past couple of months.
   -

   Another large chunk of connectors are in Apache Bahir. It recently has
   seen the first release in 3 years.
   -

   There are a few other (Apache) projects that maintain a Flink connector,
   such as Apache Iceberg, Apache Hudi, and Pravega.
   -

   A few connectors are listed on company-related repositories, such as
   Apache Pulsar on StreamNative and CDC connectors on Ververica.


My personal observation is that having a repository per connector seems to
increase the activity on a connector as it’s easier to maintain. For
example, in Apache Bahir all connectors are built against the same Flink
version, which may not be desirable when certain APIs change; for example,
SinkFunction will be eventually deprecated and removed but new Sink
interface may gain more features.

Now, I'd like to outline different approaches. All approaches will allow
you to host your connector on any kind of personal, project, or company
repository. We still want to provide a default place where users can
contribute their connectors and hopefully grow a community around it. The
approaches are:


   1.

   Create a mono-repo under the Apache umbrella where all connectors will
   reside, for example, github.com/apache/flink-connectors. That repository
   needs to follow its rules: No GitHub issues, no Dependabot or similar
   tools, and a strict manual release process. It would be under the Flink
   community, such that Flink committers can write to that repository but
   no-one else.
   2.

   Create a GitHub organization with small repositories, for example
   github.com/flink-connectors. Since it’s not under the Apache umbrella,
   we are free to use whatever process we deem best (up to a future
   discussion). Each repository can have a shared list of maintainers +
   connector specific committers. We can provide more automation. We may even
   allow different licenses to incorporate things like a connector to Oracle
   that cannot be released under ASL.
   3.

   ??? <- please provide your additional approaches


In both cases, we will provide opinionated module/repository templates
based on a connector testing framework and guidelines. Depending on the
approach, we may need to enforce certain things.

I’d like to first focus on what the community would ideally seek and
minimize the discussions around legal issues, which we would discuss later.
For now, I’d also like to postpone the discussion if we move all or only a
subset of connectors from Flink to the new default place as it seems to be
orthogonal to the fundamental discussion.

PS: If the external repository for connectors is successful, I’d also like
to move out other things like formats, filesystems, and metri

Re: [DISCUSS] Creating an external connector repository

2021-10-15 Thread Ingo Bürk
Hi Arvid,

In general I think breaking up the big repo would be a good move with many
benefits (which you have outlined already). One concern would be how to
proceed with our docs / examples if we were to really separate out all
connectors.

1. More real-life examples would essentially now depend on external
projects. Particularly if hosted outside the ASF, this would feel somewhat
odd. Or to put it differently, if flink-connector-foo is not part of Flink
itself, should the Flink Docs use it for any examples?
2. Generation of documentation (config options) wouldn't be possible unless
the docs depend on these external projects, which would create weird
version dependency cycles (Flink 1.X's docs depend on flink-connector-foo
1.X which depends on Flink 1.X).
3. Documentation would inevitably be much less consistent when split across
many repositories.

As for your approaches, how would (A) allow hosting personal / company
projects if only Flink committers can write to it?

> Connectors may receive some sort of quality seal

This sounds like a lot of work and process, and could easily become a
source of frustration.


Best
Ingo

On Fri, Oct 15, 2021 at 2:47 PM Arvid Heise  wrote:

> Dear community,
>
> Today I would like to kickstart a series of discussions around creating an
> external connector repository. The main idea is to decouple the release
> cycle of Flink with the release cycles of the connectors. This is a common
> approach in other big data analytics projects and seems to scale better
> than the current approach. In particular, it will yield the following
> changes.
>
>
>-
>
>Faster releases of connectors: New features can be added more quickly,
>bugs can be fixed immediately, and we can have faster security patches in
>case of direct or indirect (through dependencies) security flaws.
>-
>
>New features can be added to old Flink versions: If the connector API
>didn’t change, the same connector jar may be used with different Flink
>versions. Thus, new features can also immediately be used with older Flink
>versions. A compatibility matrix on each connector page will help users to
>find suitable connector versions for their Flink versions.
>-
>
>More activity and contributions around connectors: If we ease the
>contribution and development process around connectors, we will see faster
>development and also more connectors. Since that heavily depends on the
>chosen approach discussed below, more details will be shown there.
>-
>
>An overhaul of the connector page: In the future, all known connectors
>will be shown on the same page in a similar layout independent of where
>they reside. They could be hosted on external project pages (e.g., Iceberg
>and Hudi), on some company page, or may stay within the main Flink reposi
>tory. Connectors may receive some sort of quality seal such that users
>can quickly access the production-readiness and we could also add which
>community/company promises which kind of support.
>-
>
>If we take out (some) connectors out of Flink, Flink CI will be faster
>and Flink devs will experience less build stabilities (which mostly come
>from connectors). That would also speed up Flink development.
>
>
> Now I’d first like to collect your viewpoints on the ideal state. Let’s
> first recap which approaches, we currently have:
>
>
>-
>
>We have half of the connectors in the main Flink repository.
>Relatively few of them have received updates in the past couple of months.
>-
>
>Another large chunk of connectors are in Apache Bahir. It recently has
>seen the first release in 3 years.
>-
>
>There are a few other (Apache) projects that maintain a Flink
>connector, such as Apache Iceberg, Apache Hudi, and Pravega.
>-
>
>A few connectors are listed on company-related repositories, such as
>Apache Pulsar on StreamNative and CDC connectors on Ververica.
>
>
> My personal observation is that having a repository per connector seems to
> increase the activity on a connector as it’s easier to maintain. For
> example, in Apache Bahir all connectors are built against the same Flink
> version, which may not be desirable when certain APIs change; for example,
> SinkFunction will be eventually deprecated and removed but new Sink
> interface may gain more features.
>
> Now, I'd like to outline different approaches. All approaches will allow
> you to host your connector on any kind of personal, project, or company
> repository. We still want to provide a default place where users can
> contribute their connectors and hopefully grow a community around it. The
> approaches are:
>
>
>1.
>
>Create a mono-repo under the Apache umbrella where all connectors will
>reside, for example, github.com/apache/flink-connectors. That
>repository needs to follow its rules: No GitHub issues, no Dependabot or
>similar tools, and a strict manual

Re: [DISCUSS] Creating an external connector repository

2021-10-15 Thread Chesnay Schepler
My opinion of splitting the Flink repositories hasn't changed; I'm still 
in favor of it.


While it would technically be possible to release individual connectors 
even if they are part of the Flink repo,
it is quite a hassle to do so and error prone due to the current branch 
structure.

A split would also force us to watch out much more for API stability.

I'm gonna assume that we will move out all connectors:

What I'm concerned about, and which we never really covered in past 
discussions about split repositories, are

a) ways to share infrastructure (e.g., CI/release utilities/codestyle)
b) testing
c) documentation integration

Particularly for b) we still lack any real public utilities.
Even fundamental things such as the MiniClusterResource are not 
annotated in any way.

I would argue that we need to sort this out before a split can happen.
We've seen with the flink-benchmarks repo and recent discussions how 
easily things can break.


Related to that, there is the question on how Flink is then supposed to 
ensure that things don't break. My impression is that we heavily rely on 
the connector tests to that end at the moment.
Similarly, what connector (version) would be used for examples (like the 
WordCount which reads from Kafka) or (e2e) tests that want to read 
something other than a file? You end up with this circular dependency 
which are always troublesome.


As for for the repo structure, I would think that a single one could 
work quite well (because having 10+ connector repositories is just a 
mess), but currently I wouldn't set it up as a single project.
I would rather have something like N + 1 projects (one for each 
connectors + a shared testing project) which are released individually 
as required, without any snapshot dependencies in-between.
Then 1 branch for each major Flink version (again, no snapshot 
dependencies). Individual connectors can be released at any time against 
any of the latest bugfix releases, which due to lack of binaries (and 
python releases) would be a breeze.


I don't like the idea of moving existing connectors out of the Apache 
organization. At the very least, not all of them. While some are 
certainly ill-maintained (e.g., Cassandra) where it would be neat if 
external projects could maintain them, others (like Kafka) are not and 
quite fundamental to actually using Flink.


On 15/10/2021 14:47, Arvid Heise wrote:

Dear community,

Today I would like to kickstart a series of discussions around creating an
external connector repository. The main idea is to decouple the release
cycle of Flink with the release cycles of the connectors. This is a common
approach in other big data analytics projects and seems to scale better
than the current approach. In particular, it will yield the following
changes.


-

Faster releases of connectors: New features can be added more quickly,
bugs can be fixed immediately, and we can have faster security patches in
case of direct or indirect (through dependencies) security flaws.
-

New features can be added to old Flink versions: If the connector API
didn’t change, the same connector jar may be used with different Flink
versions. Thus, new features can also immediately be used with older Flink
versions. A compatibility matrix on each connector page will help users to
find suitable connector versions for their Flink versions.
-

More activity and contributions around connectors: If we ease the
contribution and development process around connectors, we will see faster
development and also more connectors. Since that heavily depends on the
chosen approach discussed below, more details will be shown there.
-

An overhaul of the connector page: In the future, all known connectors
will be shown on the same page in a similar layout independent of where
they reside. They could be hosted on external project pages (e.g., Iceberg
and Hudi), on some company page, or may stay within the main Flink reposi
tory. Connectors may receive some sort of quality seal such that users
can quickly access the production-readiness and we could also add which
community/company promises which kind of support.
-

If we take out (some) connectors out of Flink, Flink CI will be faster
and Flink devs will experience less build stabilities (which mostly come
from connectors). That would also speed up Flink development.


Now I’d first like to collect your viewpoints on the ideal state. Let’s
first recap which approaches, we currently have:


-

We have half of the connectors in the main Flink repository. Relatively
few of them have received updates in the past couple of months.
-

Another large chunk of connectors are in Apache Bahir. It recently has
seen the first release in 3 years.
-

There are a few other (Apache) projects that maintain a Flink connector,
such as Apache Iceberg, Apache Hudi, and Pravega.
-

 

Re: [VOTE] Release 1.13.3, release candidate #1

2021-10-15 Thread Matthias Pohl
Thanks Chesnay for driving this.

+1 (non-binding)

- verified the checksums
- build 1.13.3-rc1 from sources
- went over the pom file diff to see whether we missed newly added
dependency in the NOTICE file
- went over the release blog post
- checked that scala 2.11 and 2.12 artifacts are present in the Maven repo
- Run example jobs without noticing any issues in the logs
- Triggered e2e test run on VVP based on 1.13.3 RC1

On Tue, Oct 12, 2021 at 7:22 PM Chesnay Schepler  wrote:

> Hi everyone,
> Please review and vote on the release candidate #1 for the version
> 1.13.3, as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to
> be deployed to dist.apache.org [2], which are signed with the key with
> fingerprint C2EED7B111D464BA [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.13.3-rc1" [5],
> * website pull request listing the new release and adding announcement
> blog post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Release Manager
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350329
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.3-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1453
> [5] https://github.com/apache/flink/tree/release-1.13.3-rc1
> [6] https://github.com/apache/flink-web/pull/473
>


[jira] [Created] (FLINK-24568) Configuration equals method does not properly check equality

2021-10-15 Thread Jessie Anderson (Jira)
Jessie Anderson created FLINK-24568:
---

 Summary: Configuration equals method does not properly check 
equality
 Key: FLINK-24568
 URL: https://issues.apache.org/jira/browse/FLINK-24568
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Reporter: Jessie Anderson


Configuration's 
[equals|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java#L938]
 method only checks the configuration keys of the object the method is called 
on, meaning that the method will still return true if the object passed as the 
function argument contains additional config parameters. For example:
{code:java}
Configuration a = new Configuration();
Configuration b = new Configuration();

a.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024mb"));
b.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024mb"));
b.set(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY, true);

a.equals(b); // true
b.equals(a); // false {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)