Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink
+1, it is really nice to have the N-Ary stream operator which is meaningful in some scenarios. best, Zhijiang -- From:Jingsong Li Send Time:2020 Jan. 10 (Fri.) 11:00 To:dev Subject:Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink +1 non-binding to the N-Ary Stream Operator. Thanks Piotr for driving. Looks like the previous FLIP-92 did not change the "Next FLIP Number" in FLIP page. Best, Jingsong Lee On Fri, Jan 10, 2020 at 8:40 AM Benchao Li wrote: > Hi Piotr, > > It seems that we have the 'FLIP-92' already. > see: > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog > > > Piotr Nowojski 于2020年1月9日周四 下午11:25写道: > > > Hi, > > > > I would like to start a vote for adding the N-Ary Stream Operator in > Flink > > as discussed in the discussion thread [1]. > > > > This vote will be opened at least until Wednesday, January 15th 8:00 UTC. > > > > Piotrek > > > > [1] > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html > > < > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html > > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn > -- Best, Jingsong Lee
[jira] [Created] (FLINK-15547) Support access to Hive avro table
Rui Li created FLINK-15547: -- Summary: Support access to Hive avro table Key: FLINK-15547 URL: https://issues.apache.org/jira/browse/FLINK-15547 Project: Flink Issue Type: Task Components: Connectors / Hive Reporter: Rui Li -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink
non-binding +1 On Fri, Jan 10, 2020 at 9:11 AM Zhijiang wrote: > +1, it is really nice to have the N-Ary stream operator which is > meaningful in some scenarios. > > best, > Zhijiang > > > -- > From:Jingsong Li > Send Time:2020 Jan. 10 (Fri.) 11:00 > To:dev > Subject:Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink > > +1 non-binding to the N-Ary Stream Operator. Thanks Piotr for driving. > Looks like the previous FLIP-92 did not change the "Next FLIP Number" in > FLIP page. > > Best, > Jingsong Lee > > On Fri, Jan 10, 2020 at 8:40 AM Benchao Li wrote: > > > Hi Piotr, > > > > It seems that we have the 'FLIP-92' already. > > see: > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog > > > > > > Piotr Nowojski 于2020年1月9日周四 下午11:25写道: > > > > > Hi, > > > > > > I would like to start a vote for adding the N-Ary Stream Operator in > > Flink > > > as discussed in the discussion thread [1]. > > > > > > This vote will be opened at least until Wednesday, January 15th 8:00 > UTC. > > > > > > Piotrek > > > > > > [1] > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html > > > < > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html > > > > > > > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > -- > Best, Jingsong Lee > >
Re: [DISCUSS] A mechanism to validate the precision of columns for connectors
Hi Jingsong Lee You are right that the connectors don't validate data types either now. We seems lack a mechanism to validate with properties[1], data types, etc for CREATE TABLE. [1] https://issues.apache.org/jira/browse/FLINK-15509 *Best Regards,* *Zhenghua Gao* On Fri, Jan 10, 2020 at 2:59 PM Jingsong Li wrote: > Hi Zhenghua, > > I think it's not just about precision of type. Connectors not validate the > types either. > Now there is "SchemaValidator", this validator is just used to validate > type properties. But not for connector type support. > I think we can have something like "DataTypeValidator" to help connectors > validating their type support. > > Consider current validator design, validator is called by connector itself. > it's more like a util class than a mechanism. > > Best, > Jingsong Lee > > On Fri, Jan 10, 2020 at 11:47 AM Zhenghua Gao wrote: > > > Hi dev, > > > > I'd like to kick off a discussion on a mechanism to validate the > precision > > of columns for some connectors. > > > > We come to an agreement that the user should be informed if the connector > > does not support the desired precision. And from the connector > developer's > > view, there are 3-levels information to be considered: > > > >- the ability of external systems (e.g. Apache Derby support > >TIMESTAMP(9), Mysql support TIMESTAMP(6), etc) > > > > Connector developers should use this information to validate user's DDL > and > > make sure throw an exception if concrete column is out of range. > > > > > >- schema of referenced tables in external systems > > > > If the schema information of referenced tables is available in Compile > > Time, connector developers could use it to find the mismatch between DDL. > > But in most cases, the schema information is unavailable because of > network > > isolation or authority management. We should use it with caution. > > > > > >- schema-less external systems (e.g. HBase) > > > > If the external systems is schema-less like HBase, the connector > developer > > should make sure the connector doesn't cause precision loss (e.g. > > flink-hbase serializes java.sql.Timestamp to long in bytes which only > keep > > millisecond's precision.) > > > > To make it more specific, some scenarios of JDBC Connector are list as > > following: > > > >- The underlying DB supports DECIMAL(65, 30), which is out of the > range > >of Flink's Decimal > >- The underlying DB supports TIMESTAMP(6), and user want to define a > >table with TIMESTAMP(9) in Flink > >- User defines a table with DECIMAL(10, 4) in underlying DB, and want > to > >define a table with DECIMAL(5, 2) in Flink > >- The precision of the underlying DB varies between different versions > > > > > > What do you think about this? any feedback are appreciates. > > > > *Best Regards,* > > *Zhenghua Gao* > > > > > -- > Best, Jingsong Lee >
[jira] [Created] (FLINK-15548) Make KeyedCoProcessOperatorWithWatermarkDelay extends KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator
wangsan created FLINK-15548: --- Summary: Make KeyedCoProcessOperatorWithWatermarkDelay extends KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator Key: FLINK-15548 URL: https://issues.apache.org/jira/browse/FLINK-15548 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.10.0 Reporter: wangsan Fix For: 1.10.0 `LegacyKeyedCoProcessOperator` is marked as deprecated, we should make `KeyedCoProcessOperatorWithWatermarkDelay` extends `KeyedCoProcessOperator` instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15549) integer overflow in SpillingResettableMutableObjectIterator
caojian0613 created FLINK-15549: --- Summary: integer overflow in SpillingResettableMutableObjectIterator Key: FLINK-15549 URL: https://issues.apache.org/jira/browse/FLINK-15549 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.9.1, 1.9.0 Reporter: caojian0613 The SpillingResettableMutableObjectIterator has a data overflow problem if the number of elements in a single input exceeds Integer.MAX_VALUE. The reason is inside the SpillingResettableMutableObjectIterator, it track the total number of elements and the number of elements currently read with two int type fileds (elementCount and currentElementNum), and if the number of elements exceeds Integer.MAX_VALUE, it will overflow. If there is an overflow, then in the next iteration, after reset the input , the data will not be read or only part of the data will be read. Therefore, we should changing the type of these two fields of SpillingResettableMutableObjectIterator from int to long, and we also need a pre-check mechanism before such numerical. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15550) testCancelTaskExceptionAfterTaskMarkedFailed failed on azure
Yun Tang created FLINK-15550: Summary: testCancelTaskExceptionAfterTaskMarkedFailed failed on azure Key: FLINK-15550 URL: https://issues.apache.org/jira/browse/FLINK-15550 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.11.0 Reporter: Yun Tang Instance: https://dev.azure.com/rmetzger/Flink/_build/results?buildId=4241&view=ms.vss-test-web.build-test-results-tab&runId=12434&resultId=108939&paneView=debug {code:java} java.lang.AssertionError: expected: but was: at org.apache.flink.runtime.taskmanager.TaskTest.testCancelTaskExceptionAfterTaskMarkedFailed(TaskTest.java:525) {code} {code:java} expected: but was: {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15551) Streaming File Sink s3 end-to-end test FAIL
Leonard Xu created FLINK-15551: -- Summary: Streaming File Sink s3 end-to-end test FAIL Key: FLINK-15551 URL: https://issues.apache.org/jira/browse/FLINK-15551 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.10.0 Reporter: Leonard Xu Fix For: 1.10.0 This issue happens in latest release-1.10 branch[1], this issue is similar to FLINK-455 but has different cause: {code:java} Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.getTimeDuration(Ljava/lang/String;Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J at org.apache.hadoop.fs.s3a.S3ARetryPolicy.(S3ARetryPolicy.java:113) at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:257) at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126) at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) at org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage.(MemoryBackendCheckpointStorage.java:85) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createCheckpointStorage(MemoryStateBackend.java:295) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:279) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:205) at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:253) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:225) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:213) at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:117) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146) ... 10 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 4 more Job could not be submitted. {code} [1][https://api.travis-ci.org/v3/job/634842391/log.txt] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory
Terry Wang created FLINK-15552: -- Summary: SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory Key: FLINK-15552 URL: https://issues.apache.org/jira/browse/FLINK-15552 Project: Flink Issue Type: Bug Components: Table SQL / Client, Table SQL / Runtime Reporter: Terry Wang How to Reproduce: first, I start a sql client and using `-l` to point to a kafka connector directory. ` bin/sql-client.sh embedded -l /xx/connectors/kafka/ ` Then, I create a Kafka Table like following ` Flink SQL> CREATE TABLE MyUserTable ( > content String > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); [INFO] Table has been created. ` Then I select from just created table and an exception been thrown: ` Flink SQL> select * from MyUserTable; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testGroup connector.properties.zookeeper.connect=localhost:2181 connector.startup-mode=earliest-offset connector.topic=test connector.type=kafka connector.version=universal format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=content The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory ` Potential Reasons: Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to the this method, the defualt loader will be BootStrapClassLoader, which can not find our factory. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15553) Create table ddl support comment after computed column
hailong wang created FLINK-15553: Summary: Create table ddl support comment after computed column Key: FLINK-15553 URL: https://issues.apache.org/jira/browse/FLINK-15553 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.10.0 Reporter: hailong wang Fix For: 1.11.0 For now, we can define computed column in create table ddl, but we can not add comment after it just like regular table column, So we should support it, it's grammar as follows: {code:java} col_name AS expr [COMMENT 'string'] {code} My idea is, we can introduce class {code:java} SqlTableComputedColumn{code} to wrap name, expression and comment, And just get the element from it will be ok. As for parserImpls.ftl, it can be like as follows: {code:java} identifier = SimpleIdentifier() expr = Expression(ExprContext.ACCEPT_NON_QUERY) [ { String p = SqlParserUtil.parseString(token.image); comment = SqlLiteral.createCharString(p, getPos()); }] { SqlTableComputedColumn tableComputedColumn = new SqlTableComputedColumn(identifier, expr, comment, getPos()); context.columnList.add(tableComputedColumn); }{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15554) Bump jetty-util-ajax to 9.3.24
Chesnay Schepler created FLINK-15554: Summary: Bump jetty-util-ajax to 9.3.24 Key: FLINK-15554 URL: https://issues.apache.org/jira/browse/FLINK-15554 Project: Flink Issue Type: Improvement Components: Build System, Connectors / FileSystem Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.9.2, 1.10.0 {{flink-fs-hadoop-azure}} has transitive dependency on jetty-util-ajax:9.3.19, which has a security vulnerability: https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2017-7657 This was fixed in {{9.3.24.v20180605}} ([source|https://bugs.eclipse.org/bugs/show_bug.cgi?id=535668]). Starting from version 3.2.1 {{hadoop-azure}} is using this version as well, but for a quick resolution I propose bumping this single dependency for the time being. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15555) Delete TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED option for subplaner reuse
hailong wang created FLINK-1: Summary: Delete TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED option for subplaner reuse Key: FLINK-1 URL: https://issues.apache.org/jira/browse/FLINK-1 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.10.0 Reporter: hailong wang Fix For: 1.11.0 Blink planner supports subplan reuse. If TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED is true, the optimizer will try to find out duplicated sub-plans and reuse them. And if TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED is true, the optimizer will try to find out duplicated table sources and reuse them. The option of TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED used to defined whether TableSourceScan should be reused. But if the parent's relNode of TableSourceScan can be reused, it will be also reused even if TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED is false, just like follow sql: {code:java} WITH t AS (SELECT a, b, e FROM x, y WHERE x.a = y.d) SELECT t1.*, t2.* FROM t t1, t t2 WHERE t1.b = t2.e AND t1.a < 10 AND t2.a > 5 {code} the plan may be as follow: {code:java} HashJoin(joinType=[InnerJoin], where=[=(b, e0)], select=[a, b, e, a0, b0, e0], build=[right]) :- Exchange(distribution=[hash[b]], shuffle_mode=[BATCH]) : +- Calc(select=[a, b, e]) : +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], build=[left]) ::- Exchange(distribution=[hash[a]]) :: +- Calc(select=[a, b], where=[<(a, 10)]) :: +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) :+- Exchange(distribution=[hash[d]], reuse_id=[1]) : +- Calc(select=[d, e]) : +- TableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +- Exchange(distribution=[hash[e]]) +- Calc(select=[a, b, e]) +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, d, e], build=[left]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b], where=[>(a, 5)]) : +- TableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Reused(reference_id=[1]) {code} So I think it is useless to defined this option, only TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED will be ok. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15556) Add a switch for PushProjectIntoTableSourceScanRule
hailong wang created FLINK-15556: Summary: Add a switch for PushProjectIntoTableSourceScanRule Key: FLINK-15556 URL: https://issues.apache.org/jira/browse/FLINK-15556 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: hailong wang Fix For: 1.11.0 In some case, For the project push down, The digest of tableSource maybe different. For example, If we create a jdbc tablesource, and is used twice after it for different column, the source can not be reused for the different digest. In this case, the query io will more precious, So we can add a switch to turn off project push down, and the reused the source. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Integrate Flink Docker image publication into Flink release process
Thanks a lot for starting this discussion Patrick! I think it is a very good idea to move Flink's docker image more under the jurisdiction of the Flink PMC and to make it releasing new docker images part of Flink's release process (not saying that we cannot release new docker images independent of Flink's release cycle). One thing I have no strong opinion about is where to place the Dockerfiles (apache/flink.git vs. apache/flink-docker.git). I see the point that one wants to separate concerns (Flink code vs. Dockerfiles) and, hence, that having separate repositories might help with this objective. But on the other hand, I don't have a lot of experience with Docker Hub and how to best host Dockerfiles. Consequently, it would be helpful if others who have made some experience could share it with us. Cheers, Till On Sat, Dec 21, 2019 at 2:28 PM Hequn Cheng wrote: > Hi Patrick, > > Thanks a lot for your continued work on the Docker images. That’s really > really a great job! And I have also benefited from it. > > Big +1 for integrating docker image publication into the Flink release > process since we can leverage the Flink release process to make sure a more > legitimacy docker publication. We can also check and vote on it during the > release. > > I think the most import thing we need to discuss first is whether to have a > dedicated git repo for the Dockerfiles. > > Although it is convention shared by nearly every other “official” image on > Docker Hub to have a dedicated repo, I'm still not sure about it. Maybe I > have missed something important. From my point of view, I think it’s better > to have the Dockerfiles in the (main)Flink repo. > - First, I think the Dockerfiles can be treated as part of the release. > And it is also natural to put the corresponding version of the Dockerfile > in the corresponding Flink release. > - Second, we can put the Dockerfiles in the path like > flink/docker-flink/version/ and the version varies in different releases. > For example, for release 1.8.3, we have a flink/docker-flink/1.8.3 > folder(or maybe flink/docker-flink/1.8). Even though all Dockerfiles for > supported versions are not in one path but they are still in one Git tree > with different refs. > - Third, it seems the Docker Hub also supports specifying different refs. > For the file[1], we can change the GitRepo link from > https://github.com/docker-flink/docker-flink.git to > https://github.com/apache/flink.git and add a GitFetch for each tag, e.g., > GitFetch: refs/tags/release-1.8.3. There are some examples in the file of > ubuntu[2]. > > If the above assumptions are right and there are no more obstacles, I'm > intended to have these Dockerfiles in the main Flink repo. In this case, we > can reduce the number of repos and reduce the management overhead. > What do you think? > > Best, > Hequn > > [1] > https://github.com/docker-library/official-images/blob/master/library/flink > [2] > > https://github.com/docker-library/official-images/blob/master/library/ubuntu > > > On Fri, Dec 20, 2019 at 5:29 PM Yang Wang wrote: > > > Big +1 for this effort. > > > > It is really exciting we have started this great work. More and more > > companies start to > > use Flink in container environment(docker, Kubernetes, Mesos, even > > Yarn-3.x). So it is > > very important that we could have unified official image building and > > releasing process. > > > > > > The image building process in this proposal is really good and i just > have > > the following thoughts. > > > > >> Keep a dedicated repo for Dockerfiles to build official image > > I think this is a good way and we do not need to make some unnecessary > > changes to Flink repository. > > > > >> Integrate building image into the Flink release process > > It will bring a better experience for container environment users. In my > > opinion, a complete > > release includes the official image. It should be verified to work well. > > > > >> Nightly building > > Do we support for all the release branch or just master branch? > > > > >> Multiple purpose Flink images > > It is really indeed. In developing and testing process, we need some > > profiling tools to help > > us investigate some problems. Currently, we do not even have jstack/jmap > in > > the image. > > > > >> Unify the Dockerfile in Flink repository > > In the current code base, we have flink-contrib/docker-flink/Dockerfile > to > > build a image > > for session cluster. However, it is not updated. For per-job cluster, > > flink-container/docker/Dockerfile > > could be used to build a flink image with user artifacts. I think we need > > to unify them and > > provide a more powerful build script and entry point. > > > > > > > > Best, > > Yang > > > > Patrick Lucas 于2019年12月19日周四 下午9:20写道: > > > > > Hi everyone, > > > > > > > > > I would like to start a discussion about integrating publication of the > > > Flink Docker images hosted on Docker Hub[1] more tightly with the Flink > > > release process.
[jira] [Created] (FLINK-15557) Cannot connect to Azure Event Hub/Kafka since Jan 5th 2020. Kafka version issue
Chris created FLINK-15557: - Summary: Cannot connect to Azure Event Hub/Kafka since Jan 5th 2020. Kafka version issue Key: FLINK-15557 URL: https://issues.apache.org/jira/browse/FLINK-15557 Project: Flink Issue Type: Bug Components: Connectors / Kafka Environment: Java 8, Flink 1.9.1, Azure Event Hub Reporter: Chris As of Jan 5th we can no longer consume messages from azure event hub using Flink 1.9.1. I was able to fix the issue on several of our spring boot projects by upgrading to spring boot 2.2.2, kafka 2.3.1, and kafka-clients 2.3.1 2020-01-10 19:36:30,364 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=] Bootstrap broker *.servicebus.windows.net:9093 (id: -1 rack: null) disconnected -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog
Thanks everyone for the prompt feedback. Please see my response below. > In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the java.time.Instant semantic, and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE Zhenghua, you are right that pg's 'timestamp with timezone' should be translated into flink's 'timestamp with local timezone'. I don't find 'time with (local) timezone' though, so we may not support that type from pg in Flink. > I suggest that the parameters can be completely consistent with the JDBCTableSource / JDBCTableSink. If you take a look to JDBC api: "DriverManager.getConnection". That allow "default db, username, pwd" things optional. They can included in URL. Of course JDBC api also allows establishing connections to different databases in a db instance. So I think we don't need provide a "base_url", we can just provide a real "url". To be consistent with JDBC api. Jingsong, what I'm saying is a builder can be added on demand later if there's enough user requesting it, and doesn't need to be a core part of the FLIP. Besides, unfortunately Postgres doesn't allow changing databases via JDBC. JDBC provides different connecting options as you mentioned, but I'd like to keep our design and API simple and having to handle extra parsing logic. And it doesn't shut the door for what you proposed as a future effort. > Since the PostgreSQL does not have catalog but schema under database, why not mapping the PG-database to Flink catalog and PG-schema to Flink database Danny, because 1) there are frequent use cases where users want to switch databases or referencing objects across databases in a pg instance 2) schema is an optional namespace layer in pg, it always has a default value ("public") and can be invisible to users if they'd like to as shown in the FLIP 3) as you mentioned it is specific to postgres, and I don't feel it's necessary to map Postgres substantially different than others DBMSs with additional complexity >'base_url' configuration: We are following the configuration format guideline [1] which suggest to use dash (-) instead of underline (_). And I'm a little confused the meaning of "base_url" at the first glance, another idea is split it into several configurations: 'driver', 'hostname', 'port'. Jark, I agreed we should use "base-url" in yaml config. I'm not sure about having hostname and port separately because you can specify multiple hosts with ports in jdbc, like "jdbc:dbms/host1:port1,host2:port2/", for connection failovers. Separating them would make configurations harder. I will add clear doc and example to avoid any possible confusion. > 'default-database' is optional, then which database will be used or what is the behavior when the default database is not selected. This should be DBMS specific. For postgres, it will be the database. On Thu, Jan 9, 2020 at 9:48 PM Zhenghua Gao wrote: > Hi Bowen, Thanks for driving this. > I think it would be very convenience to use tables in external DBs with > JDBC Catalog. > > I have one concern about "Flink-Postgres Data Type Mapping" part: > > In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the java.time.Instant > semantic, > and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE > > *Best Regards,* > *Zhenghua Gao* > > > On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li > wrote: > > > Hi Bowen, thanks for reply and updating. > > > > > I don't see much value in providing a builder for jdbc catalogs, as > they > > only have 4 or 5 required params, no optional ones. I prefer users just > > provide a base url without default db, usrname, pwd so we don't need to > > parse url all around, as I mentioned jdbc catalog may need to establish > > connections to different databases in a db instance, > > > > I suggest that the parameters can be completely consistent with the > > JDBCTableSource / JDBCTableSink. > > If you take a look to JDBC api: "DriverManager.getConnection". > > That allow "default db, username, pwd" things optional. They can included > > in URL. Of course JDBC api also allows establishing connections to > > different databases in a db instance. > > So I think we don't need provide a "base_url", we can just provide a real > > "url". > > To be consistent with JDBC api. > > > > Best, > > Jingsong Lee > > > > On Fri, Jan 10, 2020 at 10:34 AM Jark Wu wrote: > > > > > Thanks Bowen for the reply, > > > > > > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc' sounds good to > me. > > > > > > I have some other minor comments when I went through the updated > > > documentation: > > > > > > 1) 'base_url' configuration: We are following the configuration format > > > guideline [1] which suggest to use dash (-) instead of underline (_). > > > And I'm a little confused the meaning of "base_url" at the first > > > glance, another idea is split it into several configurations: 'driver', > > > 'hostname', 'port'. > > > > > > 2) 'default-database' is optional, then which database will be used
Re: [DISCUSS] A mechanism to validate the precision of columns for connectors
Hi Zhenghua, For external systems with schema, I think the schema information is available most of the time and should be the single source of truth to programmatically mapping column precision via Flink catalogs, to minimize users efforts creating schema redundantly again and avoid any human errors. They will be a subset of the systems supported types and precision, and thus you don't need to validate the 1st category of "the ability of external system". It would apply to most schema storage system, like relational DBMS, hive metastore, avro schema in confluent schema registry for kafka. >From my observation, the real problem right now is Flink cannot truly leverage external system schemas via Flink Catalogs, as documented in [1]. I'm not sure if there's any unsolvable network or authorization problems, as most systems nowadays can be read with simple access id/key pair via vpc, intranet, or internet. What problems have you ran into? For schemaless systems, we'd have to rely on full testing coverage in Flink. [1] https://issues.apache.org/jira/browse/FLINK-15545 On Fri, Jan 10, 2020 at 1:12 AM Zhenghua Gao wrote: > Hi Jingsong Lee > > You are right that the connectors don't validate data types either now. > We seems lack a mechanism to validate with properties[1], data types, etc > for CREATE TABLE. > > [1] https://issues.apache.org/jira/browse/FLINK-15509 > > *Best Regards,* > *Zhenghua Gao* > > > On Fri, Jan 10, 2020 at 2:59 PM Jingsong Li > wrote: > > > Hi Zhenghua, > > > > I think it's not just about precision of type. Connectors not validate > the > > types either. > > Now there is "SchemaValidator", this validator is just used to validate > > type properties. But not for connector type support. > > I think we can have something like "DataTypeValidator" to help connectors > > validating their type support. > > > > Consider current validator design, validator is called by connector > itself. > > it's more like a util class than a mechanism. > > > > Best, > > Jingsong Lee > > > > On Fri, Jan 10, 2020 at 11:47 AM Zhenghua Gao wrote: > > > > > Hi dev, > > > > > > I'd like to kick off a discussion on a mechanism to validate the > > precision > > > of columns for some connectors. > > > > > > We come to an agreement that the user should be informed if the > connector > > > does not support the desired precision. And from the connector > > developer's > > > view, there are 3-levels information to be considered: > > > > > >- the ability of external systems (e.g. Apache Derby support > > >TIMESTAMP(9), Mysql support TIMESTAMP(6), etc) > > > > > > Connector developers should use this information to validate user's DDL > > and > > > make sure throw an exception if concrete column is out of range. > > > > > > > > >- schema of referenced tables in external systems > > > > > > If the schema information of referenced tables is available in Compile > > > Time, connector developers could use it to find the mismatch between > DDL. > > > But in most cases, the schema information is unavailable because of > > network > > > isolation or authority management. We should use it with caution. > > > > > > > > >- schema-less external systems (e.g. HBase) > > > > > > If the external systems is schema-less like HBase, the connector > > developer > > > should make sure the connector doesn't cause precision loss (e.g. > > > flink-hbase serializes java.sql.Timestamp to long in bytes which only > > keep > > > millisecond's precision.) > > > > > > To make it more specific, some scenarios of JDBC Connector are list as > > > following: > > > > > >- The underlying DB supports DECIMAL(65, 30), which is out of the > > range > > >of Flink's Decimal > > >- The underlying DB supports TIMESTAMP(6), and user want to define a > > >table with TIMESTAMP(9) in Flink > > >- User defines a table with DECIMAL(10, 4) in underlying DB, and > want > > to > > >define a table with DECIMAL(5, 2) in Flink > > >- The precision of the underlying DB varies between different > versions > > > > > > > > > What do you think about this? any feedback are appreciates. > > > > > > *Best Regards,* > > > *Zhenghua Gao* > > > > > > > > > -- > > Best, Jingsong Lee > > >
[jira] [Created] (FLINK-15558) Bump Elasticsearch version from 7.3.2 to 7.5.1 for es7 connector
vinoyang created FLINK-15558: Summary: Bump Elasticsearch version from 7.3.2 to 7.5.1 for es7 connector Key: FLINK-15558 URL: https://issues.apache.org/jira/browse/FLINK-15558 Project: Flink Issue Type: Wish Components: Connectors / ElasticSearch Reporter: vinoyang It would be better to track the newest ES 7.x client version just like we have done for Kafka universal connector. Currently, the ES7 connector track version 7.3.2 and the latest ES 7.x version is 7.5.1. We can upgrade it. -- This message was sent by Atlassian Jira (v8.3.4#803005)