[jira] [Created] (FLINK-19749) document error
appleyuchi created FLINK-19749: -- Summary: document error Key: FLINK-19749 URL: https://issues.apache.org/jira/browse/FLINK-19749 Project: Flink Issue Type: Bug Reporter: appleyuchi in this link there's a "description": Performs a field add operation. Existing fields will be replaced if add columns name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used. " add columns"->" added column's name" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19750) Deserializer is not opened in Kafka consumer when restoring from state
Qingsheng Ren created FLINK-19750: - Summary: Deserializer is not opened in Kafka consumer when restoring from state Key: FLINK-19750 URL: https://issues.apache.org/jira/browse/FLINK-19750 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.11.2, 1.11.1, 1.11.0 Reporter: Qingsheng Ren When a job using Kafka consumer is recovered from a checkpoint or savepoint, the {{open}} method of the record deserializer is not called. This is possibly because {{this.deserializer.open}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-149: Introduce the KTable Connector
Hi Kurt, Hi Shengkai, thanks for answering my questions and the additional clarifications. I don't have a strong opinion on whether to extend the "kafka" connector or to introduce a new connector. So, from my perspective feel free to go with a separate connector. If we do introduce a new connector I wouldn't call it "ktable" for aforementioned reasons (In addition, we might suggest that there is also a "kstreams" connector for symmetry reasons). I don't have a good alternative name, though, maybe "kafka-compacted" or "compacted-kafka". Thanks, Konstantin On Wed, Oct 21, 2020 at 4:43 AM Kurt Young wrote: > Hi all, > > I want to describe the discussion process which drove us to have such > conclusion, this might make some of > the design choices easier to understand and keep everyone on the same page. > > Back to the motivation, what functionality do we want to provide in the > first place? We got a lot of feedback and > questions from mailing lists that people want to write Not-Insert-Only > messages into kafka. They might be > intentional or by accident, e.g. wrote an non-windowed aggregate query or > non-windowed left outer join. And > some users from KSQL world also asked about why Flink didn't leverage the > Key concept of every kafka topic > and make kafka as a dynamic changing keyed table. > > To work with kafka better, we were thinking to extend the functionality of > the current kafka connector by letting it > accept updates and deletions. But due to the limitation of kafka, the > update has to be "update by key", aka a table > with primary key. > > This introduces a couple of conflicts with current kafka table's options: > 1. key.fields: as said above, we need the kafka table to have the primary > key constraint. And users can also configure > key.fields freely, this might cause friction. (Sure we can do some sanity > check on this but it also creates friction.) > 2. sink.partitioner: to make the semantics right, we need to make sure all > the updates on the same key are written to > the same kafka partition, such we should force to use a hash by key > partition inside such table. Again, this has conflicts > and creates friction with current user options. > > The above things are solvable, though not perfect or most user friendly. > > Let's take a look at the reading side. The keyed kafka table contains two > kinds of messages: upsert or deletion. What upsert > means is "If the key doesn't exist yet, it's an insert record. Otherwise > it's an update record". For the sake of correctness or > simplicity, the Flink SQL engine also needs such information. If we > interpret all messages to "update record", some queries or > operators may not work properly. It's weird to see an update record but you > haven't seen the insert record before. > > So what Flink should do is after reading out the records from such table, > it needs to create a state to record which messages have > been seen and then generate the correct row type correspondingly. This kind > of couples the state and the data of the message > queue, and it also creates conflicts with current kafka connector. > > Think about if users suspend a running job (which contains some reading > state now), and then change the start offset of the reader. > By changing the reading offset, it actually change the whole story of > "which records should be insert messages and which records > should be update messages). And it will also make Flink to deal with > another weird situation that it might receive a deletion > on a non existing message. > > We were unsatisfied with all the frictions and conflicts it will create if > we enable the "upsert & deletion" support to the current kafka > connector. And later we begin to realize that we shouldn't treat it as a > normal message queue, but should treat it as a changing keyed > table. We should be able to always get the whole data of such table (by > disabling the start offset option) and we can also read the > changelog out of such table. It's like a HBase table with binlog support > but doesn't have random access capability (which can be fulfilled > by Flink's state). > > So our intention was instead of telling and persuading users what kind of > options they should or should not use by extending > current kafka connector when enable upsert support, we are actually create > a whole new and different connector that has total > different abstractions in SQL layer, and should be treated totally > different with current kafka connector. > > Hope this can clarify some of the concerns. > > Best, > Kurt > > > On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang wrote: > > > Hi devs, > > > > As many people are still confused about the difference option behaviours > > between the Kafka connector and KTable connector, Jark and I list the > > differences in the doc[1]. > > > > Best, > > Shengkai > > > > [1] > > > > > https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit > > > > Shengkai Fan
[jira] [Created] (FLINK-19751) document error for GroupBy Window Aggregation
appleyuchi created FLINK-19751: -- Summary: document error for GroupBy Window Aggregation Key: FLINK-19751 URL: https://issues.apache.org/jira/browse/FLINK-19751 Project: Flink Issue Type: Bug Reporter: appleyuchi I'm learning [Official document|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html] For the part "GroupBy Window Aggregation" change .as("w")) -> .as("w") please BTW, my intellij can not find the function "lit" in above link, Could this be an error? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19752) Convert BigDicimal TypeInformation to DataTypes error
AT-Fieldless created FLINK-19752: Summary: Convert BigDicimal TypeInformation to DataTypes error Key: FLINK-19752 URL: https://issues.apache.org/jira/browse/FLINK-19752 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.11.2 Reporter: AT-Fieldless As mentioned in [Flink-15485|https://github.com/apache/flink/pull/10774] when I use LegacyTypeInfoDataTypeConverter to convert BigDicimal to corresponsed DataTypes. It throws _org.apache.flink.table.types.logical.LegacyTypeInformationType cannot be cast to org.apache.flink.table.types.logical.DecimalType error_ -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-149: Introduce the KTable Connector
Hi, IMO, if we are going to mix them in one connector, 1) either users need to set some options to a specific value explicitly, e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc.. This makes the connector awkward to use. Users may face to fix options one by one according to the exception. Besides, in the future, it is still possible to use "sink.partitioner=fixed" (reduce network cost) if users are aware of the partition routing, however, it's error-prone to have "fixed" as default for compacted mode. 2) or make those options a different default value when "compacted=true". This would be more confusing and unpredictable if the default value of options will change according to other options. What happens if we have a third mode in the future? In terms of usage and options, it's very different from the original "kafka" connector. It would be more handy to use and less fallible if separating them into two connectors. In the implementation layer, we can reuse code as much as possible. Therefore, I'm still +1 to have a new connector. The "kafka-compacted" name sounds good to me. Best, Jark On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf wrote: > Hi Kurt, Hi Shengkai, > > thanks for answering my questions and the additional clarifications. I > don't have a strong opinion on whether to extend the "kafka" connector or > to introduce a new connector. So, from my perspective feel free to go with > a separate connector. If we do introduce a new connector I wouldn't call it > "ktable" for aforementioned reasons (In addition, we might suggest that > there is also a "kstreams" connector for symmetry reasons). I don't have a > good alternative name, though, maybe "kafka-compacted" or > "compacted-kafka". > > Thanks, > > Konstantin > > > On Wed, Oct 21, 2020 at 4:43 AM Kurt Young wrote: > > > Hi all, > > > > I want to describe the discussion process which drove us to have such > > conclusion, this might make some of > > the design choices easier to understand and keep everyone on the same > page. > > > > Back to the motivation, what functionality do we want to provide in the > > first place? We got a lot of feedback and > > questions from mailing lists that people want to write Not-Insert-Only > > messages into kafka. They might be > > intentional or by accident, e.g. wrote an non-windowed aggregate query or > > non-windowed left outer join. And > > some users from KSQL world also asked about why Flink didn't leverage the > > Key concept of every kafka topic > > and make kafka as a dynamic changing keyed table. > > > > To work with kafka better, we were thinking to extend the functionality > of > > the current kafka connector by letting it > > accept updates and deletions. But due to the limitation of kafka, the > > update has to be "update by key", aka a table > > with primary key. > > > > This introduces a couple of conflicts with current kafka table's options: > > 1. key.fields: as said above, we need the kafka table to have the primary > > key constraint. And users can also configure > > key.fields freely, this might cause friction. (Sure we can do some sanity > > check on this but it also creates friction.) > > 2. sink.partitioner: to make the semantics right, we need to make sure > all > > the updates on the same key are written to > > the same kafka partition, such we should force to use a hash by key > > partition inside such table. Again, this has conflicts > > and creates friction with current user options. > > > > The above things are solvable, though not perfect or most user friendly. > > > > Let's take a look at the reading side. The keyed kafka table contains two > > kinds of messages: upsert or deletion. What upsert > > means is "If the key doesn't exist yet, it's an insert record. Otherwise > > it's an update record". For the sake of correctness or > > simplicity, the Flink SQL engine also needs such information. If we > > interpret all messages to "update record", some queries or > > operators may not work properly. It's weird to see an update record but > you > > haven't seen the insert record before. > > > > So what Flink should do is after reading out the records from such table, > > it needs to create a state to record which messages have > > been seen and then generate the correct row type correspondingly. This > kind > > of couples the state and the data of the message > > queue, and it also creates conflicts with current kafka connector. > > > > Think about if users suspend a running job (which contains some reading > > state now), and then change the start offset of the reader. > > By changing the reading offset, it actually change the whole story of > > "which records should be insert messages and which records > > should be update messages). And it will also make Flink to deal with > > another weird situation that it might receive a deletion > > on a non existing message. > > > > We were unsatisfied with all the frictions and conflicts it will create > if > > we enable the
[jira] [Created] (FLINK-19753) Introduce 'STRCMP' function to compare between two string fields
hailong wang created FLINK-19753: Summary: Introduce 'STRCMP' function to compare between two string fields Key: FLINK-19753 URL: https://issues.apache.org/jira/browse/FLINK-19753 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.11.0 Reporter: hailong wang 'STRCMP' function have be supported by mysql[1] and calcite[2] which is useful to compare two string field. 'STRCMP' function result type is Integer type, and arguments are two string fields. 1. 0 if both of the strings are same and returns. 2. -1 when the first argument is smaller than the second according to the defined order. 3. 1 when the second one is smaller the first one. [1] [https://dev.mysql.com/doc/refman/8.0/en/string-comparison-functions.html#function_strcmp] [2] https://issues.apache.org/jira/browse/CALCITE-3704 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.
little-tomato created FLINK-19754: - Summary: Cannot have more than one execute() or executeAsync() call in a single environment. Key: FLINK-19754 URL: https://issues.apache.org/jira/browse/FLINK-19754 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.2 Environment: my code is: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); ... FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), properties); myConsumer.setStartFromLatest(); DataStream kafkaDataStream = env.addSource(myConsumer); SingleOutputStreamOperator sourceStream = kafkaDataStream .map(new MapFunction() { ... }); DataStream dataStreamRow = sourceStream.map(new MyMapFunction()).filter(new RuleDataProccessFunction()).map(new MapFunction() { private static final long serialVersionUID = 1L; @Override public Row map(MessageInfo value) throws Exception { ... } }).returns(new RowTypeInfo(rowTypeArr, fieldArr)); tEnv.registerFunction("test",new TestFunction()); Table table = tEnv.fromDataStream(dataStreamRow, fieldStr); tEnv.createTemporaryView("mytable", table); String ddl = "CREATE TABLE user_log_1155 ...from kafka topic:user_log_1155"; tEnv.executeSql(ddl); String ddl1 = "CREATE TABLE user_test_1155 ...from kafka topic:user_test_1155"; tEnv.executeSql(ddl); StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql("INSERT INTO user_log_1155 SELECT xxx from mytable"); stmtSet.addInsertSql("INSERT INTO user_test_1155 SELECT xxx from mytable"); stmtSet.execute(); env.execute(requestPrm.getString("xxx")); Reporter: little-tomato i run this code on my Standalone Cluster。when i submit the job,the error log is as follows: 2020-10-20 11:53:42,969 WARN org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) [?:1.8.0_221] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_221] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_221] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_221] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) ~[flink-clients_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native M
[jira] [Created] (FLINK-19755) flink cep
jackylau created FLINK-19755: Summary: flink cep Key: FLINK-19755 URL: https://issues.apache.org/jira/browse/FLINK-19755 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.11.0 Environment: I think it will exist looping match when coming to 17, 14 using AFTER MATCH SKIP TO LAST A Reporter: jackylau Fix For: 1.12.0 {code:java} symbol tax price rowtime = === = XYZ 1 7 2018-09-17 10:00:01 XYZ 2 9 2018-09-17 10:00:02 XYZ 1 10 2018-09-17 10:00:03 XYZ 2 5 2018-09-17 10:00:04 XYZ 2 17 2018-09-17 10:00:05 XYZ 2 14 2018-09-17 10:00:06 SELECT * FROM Ticker MATCH_RECOGNIZE( PARTITION BY symbol ORDER BY rowtime MEASURES SUM(A.price) AS sumPrice, FIRST(rowtime) AS startTime, LAST(rowtime) AS endTime ONE ROW PER MATCH [AFTER MATCH STRATEGY] PATTERN (A+ C) DEFINE A AS SUM(A.price) < 30 ) {code} h5. {{}} {code:java} AFTER MATCH SKIP TO LAST A symbol sumPricestartTime endTime == = = XYZ 26 2018-09-17 10:00:01 2018-09-17 10:00:04 XYZ 15 2018-09-17 10:00:03 2018-09-17 10:00:05 XYZ 22 2018-09-17 10:00:04 2018-09-17 10:00:06 XYZ 17 2018-09-17 10:00:05 2018-09-17 10:00:06 Again, the first result matched against the rows #1, #2, #3, #4.Compared to the previous strategy, the next match includes only row #3 (mapped to A) again for the next matching. Therefore, the second result matched against the rows #3, #4, #5. The third result matched against the rows #4, #5, #6. The last result matched against the rows #5, #6.{code} h5. {{}} h5. {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19756) Use multi-input optimization by default
Caizhi Weng created FLINK-19756: --- Summary: Use multi-input optimization by default Key: FLINK-19756 URL: https://issues.apache.org/jira/browse/FLINK-19756 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Caizhi Weng Fix For: 1.12.0 After the multiple input operator is introduced we should use this optimization by default. This will affect a large amount of plan tests so we will do this in an independent subtask. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19757) TimeStampData can cause time inconsistent problem
xiaogang zhou created FLINK-19757: - Summary: TimeStampData can cause time inconsistent problem Key: FLINK-19757 URL: https://issues.apache.org/jira/browse/FLINK-19757 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.11.1 Reporter: xiaogang zhou when we check jdk LocalDateTime code,we find that public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, ZoneOffset offset) { Objects.requireNonNull(offset, "offset"); NANO_OF_SECOND.checkValidValue(nanoOfSecond); long localSecond = epochSecond + offset.getTotalSeconds(); // overflow caught later long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY); int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY); LocalDate date = LocalDate.ofEpochDay(localEpochDay); LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + nanoOfSecond); return new LocalDateTime(date, time); } offset.getTotalSeconds() they add the offset, but in the TimeStampData toLocalDateTime, we don't add a offset. I'd like to add a TimeZone.getDefault().getRawOffset() in the toLocalDateTime() and minus a TimeZone.getDefault().getRawOffset() in the fromLocalDateTime -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19758) Implement a new unified File Sink based on the new Sink API
Yun Gao created FLINK-19758: --- Summary: Implement a new unified File Sink based on the new Sink API Key: FLINK-19758 URL: https://issues.apache.org/jira/browse/FLINK-19758 Project: Flink Issue Type: Sub-task Components: API / DataStream, Connectors / FileSystem Affects Versions: 1.12.0 Reporter: Yun Gao Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable
Dian Fu created FLINK-19759: --- Summary: DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable Key: FLINK-19759 URL: https://issues.apache.org/jira/browse/FLINK-19759 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.0 Reporter: Dian Fu Fix For: 1.12.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052&view=logs&j=e25d5e7e-2a9c-5589-4940-0b638d75a414&t=a6e0f756-5bb9-5ea8-a468-5f60db442a29 {code} [ERROR] DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 planAfter expected:<...=[>(cnt, 3)]) : +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], reuse_id=[1]) : +- Exchange(distribution=[single]) :+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)]) : +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], reuse_id=[1]) : +- Exchange(distribution=[single]) :+- LocalHash]Aggregate(select=[Pa...> [INFO] {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19760) Make the `GlobalCommitter` a standalone interface that does not extend the `Committer`
Guowei Ma created FLINK-19760: - Summary: Make the `GlobalCommitter` a standalone interface that does not extend the `Committer` Key: FLINK-19760 URL: https://issues.apache.org/jira/browse/FLINK-19760 Project: Flink Issue Type: Sub-task Reporter: Guowei Ma -- This message was sent by Atlassian Jira (v8.3.4#803005)