Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink

2020-01-10 Thread Zhijiang
+1, it is really nice to have the N-Ary stream operator which is meaningful in 
some scenarios.

best,
Zhijiang


--
From:Jingsong Li 
Send Time:2020 Jan. 10 (Fri.) 11:00
To:dev 
Subject:Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink

+1 non-binding to the N-Ary Stream Operator. Thanks Piotr for driving.
Looks like the previous FLIP-92 did not change the "Next FLIP Number" in
FLIP page.

Best,
Jingsong Lee

On Fri, Jan 10, 2020 at 8:40 AM Benchao Li  wrote:

> Hi Piotr,
>
> It seems that we have the 'FLIP-92' already.
> see:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
>
>
> Piotr Nowojski  于2020年1月9日周四 下午11:25写道:
>
> > Hi,
> >
> > I would like to start a vote for adding the N-Ary Stream Operator in
> Flink
> > as discussed in the discussion thread [1].
> >
> > This vote will be opened at least until Wednesday, January 15th 8:00 UTC.
> >
> > Piotrek
> >
> > [1]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
> > <
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
> > >
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


-- 
Best, Jingsong Lee



[jira] [Created] (FLINK-15547) Support access to Hive avro table

2020-01-10 Thread Rui Li (Jira)
Rui Li created FLINK-15547:
--

 Summary: Support access to Hive avro table
 Key: FLINK-15547
 URL: https://issues.apache.org/jira/browse/FLINK-15547
 Project: Flink
  Issue Type: Task
  Components: Connectors / Hive
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink

2020-01-10 Thread Arvid Heise
non-binding +1

On Fri, Jan 10, 2020 at 9:11 AM Zhijiang 
wrote:

> +1, it is really nice to have the N-Ary stream operator which is
> meaningful in some scenarios.
>
> best,
> Zhijiang
>
>
> --
> From:Jingsong Li 
> Send Time:2020 Jan. 10 (Fri.) 11:00
> To:dev 
> Subject:Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink
>
> +1 non-binding to the N-Ary Stream Operator. Thanks Piotr for driving.
> Looks like the previous FLIP-92 did not change the "Next FLIP Number" in
> FLIP page.
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 10, 2020 at 8:40 AM Benchao Li  wrote:
>
> > Hi Piotr,
> >
> > It seems that we have the 'FLIP-92' already.
> > see:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> >
> >
> > Piotr Nowojski  于2020年1月9日周四 下午11:25写道:
> >
> > > Hi,
> > >
> > > I would like to start a vote for adding the N-Ary Stream Operator in
> > Flink
> > > as discussed in the discussion thread [1].
> > >
> > > This vote will be opened at least until Wednesday, January 15th 8:00
> UTC.
> > >
> > > Piotrek
> > >
> > > [1]
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
> > > <
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
> > > >
> >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>
>
> --
> Best, Jingsong Lee
>
>


Re: [DISCUSS] A mechanism to validate the precision of columns for connectors

2020-01-10 Thread Zhenghua Gao
Hi Jingsong Lee

You are right that the connectors don't validate data types either now.
We seems lack a mechanism to validate with properties[1], data types, etc
for CREATE TABLE.

[1] https://issues.apache.org/jira/browse/FLINK-15509

*Best Regards,*
*Zhenghua Gao*


On Fri, Jan 10, 2020 at 2:59 PM Jingsong Li  wrote:

> Hi Zhenghua,
>
> I think it's not just about precision of type. Connectors not validate the
> types either.
> Now there is "SchemaValidator", this validator is just used to validate
> type properties. But not for connector type support.
> I think we can have something like "DataTypeValidator" to help connectors
> validating their type support.
>
> Consider current validator design, validator is called by connector itself.
> it's more like a util class than a mechanism.
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 10, 2020 at 11:47 AM Zhenghua Gao  wrote:
>
> > Hi dev,
> >
> > I'd like to kick off a discussion on a mechanism to validate the
> precision
> > of columns for some connectors.
> >
> > We come to an agreement that the user should be informed if the connector
> > does not support the desired precision. And from the connector
> developer's
> > view, there are 3-levels information to be considered:
> >
> >-  the ability of external systems (e.g. Apache Derby support
> >TIMESTAMP(9), Mysql support TIMESTAMP(6), etc)
> >
> > Connector developers should use this information to validate user's DDL
> and
> > make sure throw an exception if concrete column is out of range.
> >
> >
> >- schema of referenced tables in external systems
> >
> > If the schema information of referenced tables is available in Compile
> > Time, connector developers could use it to find the mismatch between DDL.
> > But in most cases, the schema information is unavailable because of
> network
> > isolation or authority management. We should use it with caution.
> >
> >
> >- schema-less external systems (e.g. HBase)
> >
> > If the external systems is schema-less like HBase, the connector
> developer
> > should make sure the connector doesn't cause precision loss (e.g.
> > flink-hbase serializes java.sql.Timestamp to long in bytes which only
> keep
> > millisecond's precision.)
> >
> > To make it more specific, some scenarios of JDBC Connector are list as
> > following:
> >
> >- The underlying DB supports DECIMAL(65, 30), which is out of the
> range
> >of Flink's Decimal
> >- The underlying DB supports TIMESTAMP(6), and user want to define a
> >table with TIMESTAMP(9) in Flink
> >- User defines a table with DECIMAL(10, 4) in underlying DB, and want
> to
> >define a table with DECIMAL(5, 2) in Flink
> >- The precision of the underlying DB varies between different versions
> >
> >
> > What do you think about this? any feedback are appreciates.
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
>
>
> --
> Best, Jingsong Lee
>


[jira] [Created] (FLINK-15548) Make KeyedCoProcessOperatorWithWatermarkDelay extends KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator

2020-01-10 Thread wangsan (Jira)
wangsan created FLINK-15548:
---

 Summary: Make KeyedCoProcessOperatorWithWatermarkDelay extends 
KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator
 Key: FLINK-15548
 URL: https://issues.apache.org/jira/browse/FLINK-15548
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.10.0
Reporter: wangsan
 Fix For: 1.10.0


`LegacyKeyedCoProcessOperator` is marked as deprecated, we should make 
`KeyedCoProcessOperatorWithWatermarkDelay` extends `KeyedCoProcessOperator` 
instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15549) integer overflow in SpillingResettableMutableObjectIterator

2020-01-10 Thread caojian0613 (Jira)
caojian0613 created FLINK-15549:
---

 Summary: integer overflow in 
SpillingResettableMutableObjectIterator
 Key: FLINK-15549
 URL: https://issues.apache.org/jira/browse/FLINK-15549
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.9.1, 1.9.0
Reporter: caojian0613


The SpillingResettableMutableObjectIterator has a data overflow problem if the 
number of elements in a single input exceeds Integer.MAX_VALUE.

The reason is inside the SpillingResettableMutableObjectIterator, it track the 
total number of elements and the number of elements currently read with two int 
type fileds (elementCount and currentElementNum), and if the number of elements 
exceeds Integer.MAX_VALUE, it will overflow.

If there is an overflow, then in the next iteration, after reset the input , 
the data will not be read or only part of the data will be read.

Therefore, we should changing the type of these two fields of 
SpillingResettableMutableObjectIterator from int to long, and we also need a 
pre-check mechanism before such numerical.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure

2020-01-10 Thread Yun Tang (Jira)
Yun Tang created FLINK-15550:


 Summary: testCancelTaskExceptionAfterTaskMarkedFailed failed on 
azure
 Key: FLINK-15550
 URL: https://issues.apache.org/jira/browse/FLINK-15550
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.11.0
Reporter: Yun Tang


Instance: 
https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241&view=ms.vss-test-web.build-test-results-tab&runId=12434&resultId=108939&paneView=debug


{code:java}
java.lang.AssertionError: expected: but was:
at 
org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525)
{code}


{code:java}
expected: but was:
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15551) Streaming File Sink s3 end-to-end test FAIL

2020-01-10 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-15551:
--

 Summary: Streaming File Sink s3 end-to-end test FAIL
 Key: FLINK-15551
 URL: https://issues.apache.org/jira/browse/FLINK-15551
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.10.0
Reporter: Leonard Xu
 Fix For: 1.10.0


This issue happens in latest release-1.10 branch[1], 
 this issue is similar to FLINK-455
but has different cause:
{code:java}
Caused by: java.lang.NoSuchMethodError: 
org.apache.hadoop.conf.Configuration.getTimeDuration(Ljava/lang/String;Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J
at 
org.apache.hadoop.fs.s3a.S3ARetryPolicy.(S3ARetryPolicy.java:113)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:257)
at 
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126)
at 
org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at 
org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage.(MemoryBackendCheckpointStorage.java:85)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createCheckpointStorage(MemoryStateBackend.java:295)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:279)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:205)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:253)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:225)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:213)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:117)
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
... 10 more

End of exception on server side>]
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
Job could not be submitted.
{code}
 

[1][https://api.travis-ci.org/v3/job/634842391/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory

2020-01-10 Thread Terry Wang (Jira)
Terry Wang created FLINK-15552:
--

 Summary: SQL Client can not correctly create kafka table using 
--library to indicate a kafka connector directory
 Key: FLINK-15552
 URL: https://issues.apache.org/jira/browse/FLINK-15552
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client, Table SQL / Runtime
Reporter: Terry Wang


How to Reproduce:
first, I start a sql client and using `-l` to point to a kafka connector 
directory.

`
 bin/sql-client.sh embedded -l /xx/connectors/kafka/

`

Then, I create a Kafka Table like following 
`
Flink SQL> CREATE TABLE MyUserTable (
>   content String
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv'
>  );
[INFO] Table has been created.
`

Then I select from just created table and an exception been thrown: 

`
Flink SQL> select * from MyUserTable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testGroup
connector.properties.zookeeper.connect=localhost:2181
connector.startup-mode=earliest-offset
connector.topic=test
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=content

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
`
Potential Reasons:
Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a 
CatalogTable to TableSource,  but when call `TableFactoryService.find` we don't 
pass current classLoader to the this method, the defualt loader will be 
BootStrapClassLoader, which can not find our factory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15553) Create table ddl support comment after computed column

2020-01-10 Thread hailong wang (Jira)
hailong wang created FLINK-15553:


 Summary: Create table ddl support  comment after computed column
 Key: FLINK-15553
 URL: https://issues.apache.org/jira/browse/FLINK-15553
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: hailong wang
 Fix For: 1.11.0


For now, we can define computed column in create table ddl, but we can not add 
comment after it just like regular table column, So we should support it,  it's 
grammar as follows:
{code:java}
col_name AS expr  [COMMENT 'string']
{code}
My idea is, we can introduce  class
{code:java}
 SqlTableComputedColumn{code}
to wrap name, expression and comment,  And just get the element from it will be 
ok.

As for parserImpls.ftl, it can be like as follows:
{code:java}
identifier = SimpleIdentifier()

expr = Expression(ExprContext.ACCEPT_NON_QUERY)
[   {
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}]
{
SqlTableComputedColumn tableComputedColumn =
new SqlTableComputedColumn(identifier, expr, comment, getPos());
context.columnList.add(tableComputedColumn);
}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15554) Bump jetty-util-ajax to 9.3.24

2020-01-10 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-15554:


 Summary: Bump jetty-util-ajax to 9.3.24
 Key: FLINK-15554
 URL: https://issues.apache.org/jira/browse/FLINK-15554
 Project: Flink
  Issue Type: Improvement
  Components: Build System, Connectors / FileSystem
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.9.2, 1.10.0


{{flink-fs-hadoop-azure}} has transitive dependency on jetty-util-ajax:9.3.19, 
which has a security vulnerability: 
https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2017-7657

This was fixed in {{9.3.24.v20180605}} 
([source|https://bugs.eclipse.org/bugs/show_bug.cgi?id=535668]). Starting from 
version 3.2.1 {{hadoop-azure}} is using this version as well, but for a quick 
resolution I propose bumping this single dependency for the time being.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15555) Delete TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED option for subplaner reuse

2020-01-10 Thread hailong wang (Jira)
hailong wang created FLINK-1:


 Summary: Delete TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED  option for 
subplaner reuse
 Key: FLINK-1
 URL: https://issues.apache.org/jira/browse/FLINK-1
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.10.0
Reporter: hailong wang
 Fix For: 1.11.0


Blink planner supports subplan reuse. If TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED 
is true, the optimizer will try to find out duplicated sub-plans and reuse 
them. And  if TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED is true, the optimizer will 
try to find out duplicated table sources and reuse them.

The option of TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED used to defined whether 
TableSourceScan should be reused.

But if  the parent's relNode of TableSourceScan can be reused, it will be also 
reused even if TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED is false, just like follow 
sql:
{code:java}
WITH t AS (SELECT a, b, e FROM x, y WHERE x.a = y.d)
SELECT t1.*, t2.* FROM t t1, t t2 WHERE t1.b = t2.e AND t1.a < 10 AND t2.a > 5
{code}
the plan may be as follow:
{code:java}
HashJoin(joinType=[InnerJoin], where=[=(b, e0)], select=[a, b, e, a0, b0, e0], 
build=[right])
:- Exchange(distribution=[hash[b]], shuffle_mode=[BATCH])
:  +- Calc(select=[a, b, e])
: +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], 
build=[left])
::- Exchange(distribution=[hash[a]])
::  +- Calc(select=[a, b], where=[<(a, 10)])
:: +- TableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
:+- Exchange(distribution=[hash[d]], reuse_id=[1])
:   +- Calc(select=[d, e])
:  +- TableSourceScan(table=[[default_catalog, default_database, y, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+- Exchange(distribution=[hash[e]])
   +- Calc(select=[a, b, e])
  +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], 
build=[left])
 :- Exchange(distribution=[hash[a]])
 :  +- Calc(select=[a, b], where=[>(a, 5)])
 : +- TableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 +- Reused(reference_id=[1])
{code}
So I think it is useless to defined this option, only 
TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED will be ok.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15556) Add a switch for PushProjectIntoTableSourceScanRule

2020-01-10 Thread hailong wang (Jira)
hailong wang created FLINK-15556:


 Summary: Add a switch for PushProjectIntoTableSourceScanRule
 Key: FLINK-15556
 URL: https://issues.apache.org/jira/browse/FLINK-15556
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: hailong wang
 Fix For: 1.11.0


In some case, For the project push down, The  digest of tableSource maybe 
different. For example, If we create a jdbc tablesource, and is used twice 
after it for different column, the source can not be reused for the different 
digest. In this case, the query io will more precious, So we can add a switch 
to turn off project push down, and the reused the source.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Integrate Flink Docker image publication into Flink release process

2020-01-10 Thread Till Rohrmann
Thanks a lot for starting this discussion Patrick! I think it is a very
good idea to move Flink's docker image more under the jurisdiction of the
Flink PMC and to make it releasing new docker images part of Flink's
release process (not saying that we cannot release new docker images
independent of Flink's release cycle).

One thing I have no strong opinion about is where to place the Dockerfiles
(apache/flink.git vs. apache/flink-docker.git). I see the point that one
wants to separate concerns (Flink code vs. Dockerfiles) and, hence, that
having separate repositories might help with this objective. But on the
other hand, I don't have a lot of experience with Docker Hub and how to
best host Dockerfiles. Consequently, it would be helpful if others who have
made some experience could share it with us.

Cheers,
Till

On Sat, Dec 21, 2019 at 2:28 PM Hequn Cheng  wrote:

> Hi Patrick,
>
> Thanks a lot for your continued work on the Docker images. That’s really
> really a great job! And I have also benefited from it.
>
> Big +1 for integrating docker image publication into the Flink release
> process since we can leverage the Flink release process to make sure a more
> legitimacy docker publication. We can also check and vote on it during the
> release.
>
> I think the most import thing we need to discuss first is whether to have a
> dedicated git repo for the Dockerfiles.
>
> Although it is convention shared by nearly every other “official” image on
> Docker Hub to have a dedicated repo, I'm still not sure about it. Maybe I
> have missed something important. From my point of view, I think it’s better
> to have the Dockerfiles in the (main)Flink repo.
>   - First, I think the Dockerfiles can be treated as part of the release.
> And it is also natural to put the corresponding version of the Dockerfile
> in the corresponding Flink release.
>   - Second, we can put the Dockerfiles in the path like
> flink/docker-flink/version/ and the version varies in different releases.
> For example, for release 1.8.3, we have a flink/docker-flink/1.8.3
> folder(or maybe flink/docker-flink/1.8). Even though all Dockerfiles for
> supported versions are not in one path but they are still in one Git tree
> with different refs.
>   - Third, it seems the Docker Hub also supports specifying different refs.
> For the file[1], we can change the GitRepo link from
> https://github.com/docker-flink/docker-flink.git to
> https://github.com/apache/flink.git and add a GitFetch for each tag, e.g.,
> GitFetch: refs/tags/release-1.8.3. There are some examples in the file of
> ubuntu[2].
>
> If the above assumptions are right and there are no more obstacles, I'm
> intended to have these Dockerfiles in the main Flink repo. In this case, we
> can reduce the number of repos and reduce the management overhead.
> What do you think?
>
> Best,
> Hequn
>
> [1]
> https://github.com/docker-library/official-images/blob/master/library/flink
> [2]
>
> https://github.com/docker-library/official-images/blob/master/library/ubuntu
>
>
> On Fri, Dec 20, 2019 at 5:29 PM Yang Wang  wrote:
>
> >  Big +1 for this effort.
> >
> > It is really exciting we have started this great work. More and more
> > companies start to
> > use Flink in container environment(docker, Kubernetes, Mesos, even
> > Yarn-3.x). So it is
> > very important that we could have unified official image building and
> > releasing process.
> >
> >
> > The image building process in this proposal is really good and i just
> have
> > the following thoughts.
> >
> > >> Keep a dedicated repo for Dockerfiles to build official image
> > I think this is a good way and we do not need to make some unnecessary
> > changes to Flink repository.
> >
> > >> Integrate building image into the Flink release process
> > It will bring a better experience for container environment users. In my
> > opinion, a complete
> > release includes the official image. It should be verified to work well.
> >
> > >> Nightly building
> > Do we support for all the release branch or just master branch?
> >
> > >> Multiple purpose Flink images
> > It is really indeed. In developing and testing process, we need some
> > profiling tools to help
> > us investigate some problems. Currently, we do not even have jstack/jmap
> in
> > the image.
> >
> > >> Unify the Dockerfile in Flink repository
> > In the current code base, we have flink-contrib/docker-flink/Dockerfile
> to
> > build a image
> > for session cluster. However, it is not updated. For per-job cluster,
> > flink-container/docker/Dockerfile
> > could be used to build a flink image with user artifacts. I think we need
> > to unify them and
> > provide a more powerful build script and entry point.
> >
> >
> >
> > Best,
> > Yang
> >
> > Patrick Lucas  于2019年12月19日周四 下午9:20写道:
> >
> > > Hi everyone,
> > >
> > >
> > > I would like to start a discussion about integrating publication of the
> > > Flink Docker images hosted on Docker Hub[1] more tightly with the Flink
> > > release process. 

[jira] [Created] (FLINK-15557) Cannot connect to Azure Event Hub/Kafka since Jan 5th 2020. Kafka version issue

2020-01-10 Thread Chris (Jira)
Chris created FLINK-15557:
-

 Summary: Cannot connect to Azure Event Hub/Kafka since Jan 5th 
2020. Kafka version issue
 Key: FLINK-15557
 URL: https://issues.apache.org/jira/browse/FLINK-15557
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
 Environment: Java 8, Flink 1.9.1, Azure Event Hub
Reporter: Chris


As of Jan 5th we can no longer consume messages from azure event hub using 
Flink 1.9.1.  I was able to fix the issue on several of our spring boot 
projects by upgrading to spring boot 2.2.2, kafka 2.3.1, and kafka-clients 2.3.1

 
2020-01-10 19:36:30,364 WARN org.apache.kafka.clients.NetworkClient - [Consumer 
clientId=consumer-1, groupId=] Bootstrap broker 
*.servicebus.windows.net:9093 (id: -1 rack: null) disconnected



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

2020-01-10 Thread Bowen Li
Thanks everyone for the prompt feedback. Please see my response below.

> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the java.time.Instant
semantic, and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME
ZONE

Zhenghua, you are right that pg's 'timestamp with timezone' should be
translated into flink's 'timestamp with local timezone'. I don't find 'time
with (local) timezone' though, so we may not support that type from pg in
Flink.

> I suggest that the parameters can be completely consistent with the
JDBCTableSource / JDBCTableSink. If you take a look to JDBC api:
"DriverManager.getConnection".
That allow "default db, username, pwd" things optional. They can included
in URL. Of course JDBC api also allows establishing connections to
different databases in a db instance. So I think we don't need provide a
"base_url", we can just provide a real "url". To be consistent with JDBC
api.

Jingsong, what I'm saying is a builder can be added on demand later if
there's enough user requesting it, and doesn't need to be a core part of
the FLIP.

Besides, unfortunately Postgres doesn't allow changing databases via JDBC.

JDBC provides different connecting options as you mentioned, but I'd like
to keep our design and API simple and having to handle extra parsing logic.
And it doesn't shut the door for what you proposed as a future effort.

> Since the PostgreSQL does not have catalog but schema under database, why
not mapping the PG-database to Flink catalog and PG-schema to Flink database

Danny, because 1) there are frequent use cases where users want to switch
databases or referencing objects across databases in a pg instance 2)
schema is an optional namespace layer in pg, it always has a default value
("public") and can be invisible to users if they'd like to as shown in the
FLIP 3) as you mentioned it is specific to postgres, and I don't feel it's
necessary to map Postgres substantially different than others DBMSs with
additional complexity

>'base_url' configuration: We are following the configuration format
guideline [1] which suggest to use dash (-) instead of underline (_). And
I'm a little confused the meaning of "base_url" at the first glance,
another idea is split it into several configurations: 'driver', 'hostname',
'port'.

Jark, I agreed we should use "base-url" in yaml config.

I'm not sure about having hostname and port separately because you can
specify multiple hosts with ports in jdbc, like
"jdbc:dbms/host1:port1,host2:port2/", for connection failovers. Separating
them would make configurations harder.

I will add clear doc and example to avoid any possible confusion.

> 'default-database' is optional, then which database will be used or what
is the behavior when the default database is not selected.

This should be DBMS specific. For postgres, it will be the 
database.


On Thu, Jan 9, 2020 at 9:48 PM Zhenghua Gao  wrote:

> Hi Bowen, Thanks for driving this.
> I think it would be very convenience to use tables in external DBs with
> JDBC Catalog.
>
> I have one concern about "Flink-Postgres Data Type Mapping" part:
>
> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the java.time.Instant
> semantic,
> and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li 
> wrote:
>
> > Hi Bowen, thanks for reply and updating.
> >
> > > I don't see much value in providing a builder for jdbc catalogs, as
> they
> > only have 4 or 5 required params, no optional ones. I prefer users just
> > provide a base url without default db, usrname, pwd so we don't need to
> > parse url all around, as I mentioned jdbc catalog may need to establish
> > connections to different databases in a db instance,
> >
> > I suggest that the parameters can be completely consistent with the
> > JDBCTableSource / JDBCTableSink.
> > If you take a look to JDBC api: "DriverManager.getConnection".
> > That allow "default db, username, pwd" things optional. They can included
> > in URL. Of course JDBC api also allows establishing connections to
> > different databases in a db instance.
> > So I think we don't need provide a "base_url", we can just provide a real
> > "url".
> > To be consistent with JDBC api.
> >
> > Best,
> > Jingsong Lee
> >
> > On Fri, Jan 10, 2020 at 10:34 AM Jark Wu  wrote:
> >
> > > Thanks Bowen for the reply,
> > >
> > > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good to
> me.
> > >
> > > I have some other minor comments when I went through the updated
> > > documentation:
> > >
> > > 1) 'base_url' configuration: We are following the configuration format
> > > guideline [1] which suggest to use dash (-) instead of underline (_).
> > >  And I'm a little confused the meaning of "base_url" at the first
> > > glance, another idea is split it into several configurations: 'driver',
> > > 'hostname', 'port'.
> > >
> > > 2) 'default-database' is optional, then which database will be used

Re: [DISCUSS] A mechanism to validate the precision of columns for connectors

2020-01-10 Thread Bowen Li
Hi Zhenghua,

For external systems with schema, I think the schema information is
available most of the time and should be the single source of truth to
programmatically mapping column precision via Flink catalogs, to minimize
users efforts creating schema redundantly again and avoid any human errors.
They will be a subset of the systems supported types and precision, and
thus you don't need to validate the 1st category of "the ability of
external system". It would apply to most schema storage system, like
relational DBMS, hive metastore, avro schema in confluent schema registry
for kafka.

>From my observation, the real problem right now is Flink cannot truly
leverage external system schemas via Flink Catalogs, as documented in [1].

I'm not sure if there's any unsolvable network or authorization problems,
as most systems nowadays can be read with simple access id/key pair via
vpc, intranet, or internet. What problems have you ran into?

For schemaless systems, we'd have to rely on full testing coverage in Flink.

[1] https://issues.apache.org/jira/browse/FLINK-15545

On Fri, Jan 10, 2020 at 1:12 AM Zhenghua Gao  wrote:

> Hi Jingsong Lee
>
> You are right that the connectors don't validate data types either now.
> We seems lack a mechanism to validate with properties[1], data types, etc
> for CREATE TABLE.
>
> [1] https://issues.apache.org/jira/browse/FLINK-15509
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Fri, Jan 10, 2020 at 2:59 PM Jingsong Li 
> wrote:
>
> > Hi Zhenghua,
> >
> > I think it's not just about precision of type. Connectors not validate
> the
> > types either.
> > Now there is "SchemaValidator", this validator is just used to validate
> > type properties. But not for connector type support.
> > I think we can have something like "DataTypeValidator" to help connectors
> > validating their type support.
> >
> > Consider current validator design, validator is called by connector
> itself.
> > it's more like a util class than a mechanism.
> >
> > Best,
> > Jingsong Lee
> >
> > On Fri, Jan 10, 2020 at 11:47 AM Zhenghua Gao  wrote:
> >
> > > Hi dev,
> > >
> > > I'd like to kick off a discussion on a mechanism to validate the
> > precision
> > > of columns for some connectors.
> > >
> > > We come to an agreement that the user should be informed if the
> connector
> > > does not support the desired precision. And from the connector
> > developer's
> > > view, there are 3-levels information to be considered:
> > >
> > >-  the ability of external systems (e.g. Apache Derby support
> > >TIMESTAMP(9), Mysql support TIMESTAMP(6), etc)
> > >
> > > Connector developers should use this information to validate user's DDL
> > and
> > > make sure throw an exception if concrete column is out of range.
> > >
> > >
> > >- schema of referenced tables in external systems
> > >
> > > If the schema information of referenced tables is available in Compile
> > > Time, connector developers could use it to find the mismatch between
> DDL.
> > > But in most cases, the schema information is unavailable because of
> > network
> > > isolation or authority management. We should use it with caution.
> > >
> > >
> > >- schema-less external systems (e.g. HBase)
> > >
> > > If the external systems is schema-less like HBase, the connector
> > developer
> > > should make sure the connector doesn't cause precision loss (e.g.
> > > flink-hbase serializes java.sql.Timestamp to long in bytes which only
> > keep
> > > millisecond's precision.)
> > >
> > > To make it more specific, some scenarios of JDBC Connector are list as
> > > following:
> > >
> > >- The underlying DB supports DECIMAL(65, 30), which is out of the
> > range
> > >of Flink's Decimal
> > >- The underlying DB supports TIMESTAMP(6), and user want to define a
> > >table with TIMESTAMP(9) in Flink
> > >- User defines a table with DECIMAL(10, 4) in underlying DB, and
> want
> > to
> > >define a table with DECIMAL(5, 2) in Flink
> > >- The precision of the underlying DB varies between different
> versions
> > >
> > >
> > > What do you think about this? any feedback are appreciates.
> > >
> > > *Best Regards,*
> > > *Zhenghua Gao*
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


[jira] [Created] (FLINK-15558) Bump Elasticsearch version from 7.3.2 to 7.5.1 for es7 connector

2020-01-10 Thread vinoyang (Jira)
vinoyang created FLINK-15558:


 Summary: Bump Elasticsearch version from 7.3.2 to 7.5.1 for es7 
connector
 Key: FLINK-15558
 URL: https://issues.apache.org/jira/browse/FLINK-15558
 Project: Flink
  Issue Type: Wish
  Components: Connectors / ElasticSearch
Reporter: vinoyang


It would be better to track the newest ES 7.x client version just like we have 
done for Kafka universal connector.
Currently, the ES7 connector track version 7.3.2 and the latest ES 7.x version 
is 7.5.1. We can upgrade it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)