[jira] [Created] (FLINK-19749) document error

2020-10-21 Thread appleyuchi (Jira)
appleyuchi created FLINK-19749:
--

 Summary: document error
 Key: FLINK-19749
 URL: https://issues.apache.org/jira/browse/FLINK-19749
 Project: Flink
  Issue Type: Bug
Reporter: appleyuchi


in this link there's a "description":
Performs a field add operation. Existing fields will be replaced if add columns 
name is the same as the existing column name. Moreover, if the added fields 
have duplicate field name, then the last one is used.

" add columns"->" added column's name"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19750) Deserializer is not opened in Kafka consumer when restoring from state

2020-10-21 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-19750:
-

 Summary: Deserializer is not opened in Kafka consumer when 
restoring from state
 Key: FLINK-19750
 URL: https://issues.apache.org/jira/browse/FLINK-19750
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.11.2, 1.11.1, 1.11.0
Reporter: Qingsheng Ren


When a job using Kafka consumer is recovered from a checkpoint or savepoint, 
the {{open}} method of the record deserializer is not called. This is possibly 
because {{this.deserializer.open}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-21 Thread Konstantin Knauf
Hi Kurt, Hi Shengkai,

thanks for answering my questions and the additional clarifications. I
don't have a strong opinion on whether to extend the "kafka" connector or
to introduce a new connector. So, from my perspective feel free to go with
a separate connector. If we do introduce a new connector I wouldn't call it
"ktable" for aforementioned reasons (In addition, we might suggest that
there is also a "kstreams" connector for symmetry reasons). I don't have a
good alternative name, though, maybe "kafka-compacted" or
"compacted-kafka".

Thanks,

Konstantin


On Wed, Oct 21, 2020 at 4:43 AM Kurt Young  wrote:

> Hi all,
>
> I want to describe the discussion process which drove us to have such
> conclusion, this might make some of
> the design choices easier to understand and keep everyone on the same page.
>
> Back to the motivation, what functionality do we want to provide in the
> first place? We got a lot of feedback and
> questions from mailing lists that people want to write Not-Insert-Only
> messages into kafka. They might be
> intentional or by accident, e.g. wrote an non-windowed aggregate query or
> non-windowed left outer join. And
> some users from KSQL world also asked about why Flink didn't leverage the
> Key concept of every kafka topic
> and make kafka as a dynamic changing keyed table.
>
> To work with kafka better, we were thinking to extend the functionality of
> the current kafka connector by letting it
> accept updates and deletions. But due to the limitation of kafka, the
> update has to be "update by key", aka a table
> with primary key.
>
> This introduces a couple of conflicts with current kafka table's options:
> 1. key.fields: as said above, we need the kafka table to have the primary
> key constraint. And users can also configure
> key.fields freely, this might cause friction. (Sure we can do some sanity
> check on this but it also creates friction.)
> 2. sink.partitioner: to make the semantics right, we need to make sure all
> the updates on the same key are written to
> the same kafka partition, such we should force to use a hash by key
> partition inside such table. Again, this has conflicts
> and creates friction with current user options.
>
> The above things are solvable, though not perfect or most user friendly.
>
> Let's take a look at the reading side. The keyed kafka table contains two
> kinds of messages: upsert or deletion. What upsert
> means is "If the key doesn't exist yet, it's an insert record. Otherwise
> it's an update record". For the sake of correctness or
> simplicity, the Flink SQL engine also needs such information. If we
> interpret all messages to "update record", some queries or
> operators may not work properly. It's weird to see an update record but you
> haven't seen the insert record before.
>
> So what Flink should do is after reading out the records from such table,
> it needs to create a state to record which messages have
> been seen and then generate the correct row type correspondingly. This kind
> of couples the state and the data of the message
> queue, and it also creates conflicts with current kafka connector.
>
> Think about if users suspend a running job (which contains some reading
> state now), and then change the start offset of the reader.
> By changing the reading offset, it actually change the whole story of
> "which records should be insert messages and which records
> should be update messages). And it will also make Flink to deal with
> another weird situation that it might receive a deletion
> on a non existing message.
>
> We were unsatisfied with all the frictions and conflicts it will create if
> we enable the "upsert & deletion" support to the current kafka
> connector. And later we begin to realize that we shouldn't treat it as a
> normal message queue, but should treat it as a changing keyed
> table. We should be able to always get the whole data of such table (by
> disabling the start offset option) and we can also read the
> changelog out of such table. It's like a HBase table with binlog support
> but doesn't have random access capability (which can be fulfilled
> by Flink's state).
>
> So our intention was instead of telling and persuading users what kind of
> options they should or should not use by extending
> current kafka connector when enable upsert support, we are actually create
> a whole new and different connector that has total
> different abstractions in SQL layer, and should be treated totally
> different with current kafka connector.
>
> Hope this can clarify some of the concerns.
>
> Best,
> Kurt
>
>
> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang  wrote:
>
> > Hi devs,
> >
> > As many people are still confused about the difference option behaviours
> > between the Kafka connector and KTable connector, Jark and I list the
> > differences in the doc[1].
> >
> > Best,
> > Shengkai
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
> >
> > Shengkai Fan

[jira] [Created] (FLINK-19751) document error for GroupBy Window Aggregation

2020-10-21 Thread appleyuchi (Jira)
appleyuchi created FLINK-19751:
--

 Summary: document error for GroupBy Window Aggregation
 Key: FLINK-19751
 URL: https://issues.apache.org/jira/browse/FLINK-19751
 Project: Flink
  Issue Type: Bug
Reporter: appleyuchi


I'm learning [Official 
document|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html]

For the part "GroupBy Window Aggregation"

change
.as("w")) -> .as("w")
please

BTW, 
my intellij can not find the function "lit" in above link,
Could this be an error?





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19752) Convert BigDicimal TypeInformation to DataTypes error

2020-10-21 Thread AT-Fieldless (Jira)
AT-Fieldless created FLINK-19752:


 Summary: Convert BigDicimal TypeInformation to DataTypes error
 Key: FLINK-19752
 URL: https://issues.apache.org/jira/browse/FLINK-19752
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.11.2
Reporter: AT-Fieldless


As mentioned in [Flink-15485|https://github.com/apache/flink/pull/10774] when I 
use LegacyTypeInfoDataTypeConverter to convert BigDicimal to corresponsed 
DataTypes. It throws 
_org.apache.flink.table.types.logical.LegacyTypeInformationType cannot be cast 
to org.apache.flink.table.types.logical.DecimalType error_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-21 Thread Jark Wu
Hi,

IMO, if we are going to mix them in one connector,
1) either users need to set some options to a specific value explicitly,
e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc..
This makes the connector awkward to use. Users may face to fix options one
by one according to the exception.
Besides, in the future, it is still possible to use
"sink.partitioner=fixed" (reduce network cost) if users are aware of
the partition routing,
however, it's error-prone to have "fixed" as default for compacted mode.

2) or make those options a different default value when "compacted=true".
This would be more confusing and unpredictable if the default value of
options will change according to other options.
What happens if we have a third mode in the future?

In terms of usage and options, it's very different from the
original "kafka" connector.
It would be more handy to use and less fallible if separating them into two
connectors.
In the implementation layer, we can reuse code as much as possible.

Therefore, I'm still +1 to have a new connector.
The "kafka-compacted" name sounds good to me.

Best,
Jark


On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf  wrote:

> Hi Kurt, Hi Shengkai,
>
> thanks for answering my questions and the additional clarifications. I
> don't have a strong opinion on whether to extend the "kafka" connector or
> to introduce a new connector. So, from my perspective feel free to go with
> a separate connector. If we do introduce a new connector I wouldn't call it
> "ktable" for aforementioned reasons (In addition, we might suggest that
> there is also a "kstreams" connector for symmetry reasons). I don't have a
> good alternative name, though, maybe "kafka-compacted" or
> "compacted-kafka".
>
> Thanks,
>
> Konstantin
>
>
> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young  wrote:
>
> > Hi all,
> >
> > I want to describe the discussion process which drove us to have such
> > conclusion, this might make some of
> > the design choices easier to understand and keep everyone on the same
> page.
> >
> > Back to the motivation, what functionality do we want to provide in the
> > first place? We got a lot of feedback and
> > questions from mailing lists that people want to write Not-Insert-Only
> > messages into kafka. They might be
> > intentional or by accident, e.g. wrote an non-windowed aggregate query or
> > non-windowed left outer join. And
> > some users from KSQL world also asked about why Flink didn't leverage the
> > Key concept of every kafka topic
> > and make kafka as a dynamic changing keyed table.
> >
> > To work with kafka better, we were thinking to extend the functionality
> of
> > the current kafka connector by letting it
> > accept updates and deletions. But due to the limitation of kafka, the
> > update has to be "update by key", aka a table
> > with primary key.
> >
> > This introduces a couple of conflicts with current kafka table's options:
> > 1. key.fields: as said above, we need the kafka table to have the primary
> > key constraint. And users can also configure
> > key.fields freely, this might cause friction. (Sure we can do some sanity
> > check on this but it also creates friction.)
> > 2. sink.partitioner: to make the semantics right, we need to make sure
> all
> > the updates on the same key are written to
> > the same kafka partition, such we should force to use a hash by key
> > partition inside such table. Again, this has conflicts
> > and creates friction with current user options.
> >
> > The above things are solvable, though not perfect or most user friendly.
> >
> > Let's take a look at the reading side. The keyed kafka table contains two
> > kinds of messages: upsert or deletion. What upsert
> > means is "If the key doesn't exist yet, it's an insert record. Otherwise
> > it's an update record". For the sake of correctness or
> > simplicity, the Flink SQL engine also needs such information. If we
> > interpret all messages to "update record", some queries or
> > operators may not work properly. It's weird to see an update record but
> you
> > haven't seen the insert record before.
> >
> > So what Flink should do is after reading out the records from such table,
> > it needs to create a state to record which messages have
> > been seen and then generate the correct row type correspondingly. This
> kind
> > of couples the state and the data of the message
> > queue, and it also creates conflicts with current kafka connector.
> >
> > Think about if users suspend a running job (which contains some reading
> > state now), and then change the start offset of the reader.
> > By changing the reading offset, it actually change the whole story of
> > "which records should be insert messages and which records
> > should be update messages). And it will also make Flink to deal with
> > another weird situation that it might receive a deletion
> > on a non existing message.
> >
> > We were unsatisfied with all the frictions and conflicts it will create
> if
> > we enable the

[jira] [Created] (FLINK-19753) Introduce 'STRCMP' function to compare between two string fields

2020-10-21 Thread hailong wang (Jira)
hailong wang created FLINK-19753:


 Summary: Introduce 'STRCMP' function to compare between two string 
fields
 Key: FLINK-19753
 URL: https://issues.apache.org/jira/browse/FLINK-19753
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.11.0
Reporter: hailong wang


'STRCMP' function have be supported by mysql[1] and calcite[2] which is useful 
to compare two string field.

'STRCMP' function result type is Integer type, and arguments are two string 
fields.

1. 0 if both of the strings are same and returns.

2. -1 when the first argument is smaller than the second according to the 
defined order.

3. 1 when the second one is smaller the first one.

 

[1] 
[https://dev.mysql.com/doc/refman/8.0/en/string-comparison-functions.html#function_strcmp]

[2] https://issues.apache.org/jira/browse/CALCITE-3704



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19754) Cannot have more than one execute() or executeAsync() call in a single environment.

2020-10-21 Thread little-tomato (Jira)
little-tomato created FLINK-19754:
-

 Summary: Cannot have more than one execute() or executeAsync() 
call in a single environment.
 Key: FLINK-19754
 URL: https://issues.apache.org/jira/browse/FLINK-19754
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.2
 Environment: my code is:

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
 ...
 FlinkKafkaConsumer myConsumer = new 
FlinkKafkaConsumer("kafkatopic", new SimpleStringSchema(), properties);
 myConsumer.setStartFromLatest();

DataStream kafkaDataStream = env.addSource(myConsumer);

SingleOutputStreamOperator sourceStream = kafkaDataStream
 .map(new MapFunction() {
 ...
 });

DataStream dataStreamRow = sourceStream.map(new 
MyMapFunction()).filter(new RuleDataProccessFunction()).map(new 
MapFunction() {
 private static final long serialVersionUID = 1L;

@Override
 public Row map(MessageInfo value) throws Exception {
 ...
 }
 }).returns(new RowTypeInfo(rowTypeArr, fieldArr));
 
 tEnv.registerFunction("test",new TestFunction());
 Table table = tEnv.fromDataStream(dataStreamRow, fieldStr);
 tEnv.createTemporaryView("mytable", table);

String ddl = "CREATE TABLE user_log_1155 ...from kafka topic:user_log_1155";
 tEnv.executeSql(ddl);
 
 String ddl1 = "CREATE TABLE user_test_1155 ...from kafka topic:user_test_1155";
 tEnv.executeSql(ddl);

StatementSet stmtSet = tEnv.createStatementSet();
 stmtSet.addInsertSql("INSERT INTO user_log_1155 SELECT xxx from mytable");
 stmtSet.addInsertSql("INSERT INTO user_test_1155 SELECT xxx from mytable");
 stmtSet.execute();
 env.execute(requestPrm.getString("xxx"));
Reporter: little-tomato


i run this code on my Standalone Cluster。when i submit the job,the error log is 
as follows:

2020-10-20 11:53:42,969 WARN 
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
Could not execute application: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot have more than one execute() or executeAsync() call in 
a single environment.
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 [?:1.8.0_221]
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_221]
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221]
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_221]
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_221]
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_221]
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_221]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than 
one execute() or executeAsync() call in a single environment.
 at 
org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 ~[flink-clients_2.12-1.11.0.jar:1.11.0]
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
 at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) 
~[?:?]
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native M

[jira] [Created] (FLINK-19755) flink cep

2020-10-21 Thread jackylau (Jira)
jackylau created FLINK-19755:


 Summary: flink cep 
 Key: FLINK-19755
 URL: https://issues.apache.org/jira/browse/FLINK-19755
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.11.0
 Environment: I think it will exist looping match when coming to 17, 14 
using AFTER MATCH SKIP TO LAST A 
Reporter: jackylau
 Fix For: 1.12.0


{code:java}
 symbol   tax   price rowtime
 = === =
 XYZ  1 7   2018-09-17 10:00:01
 XYZ  2 9   2018-09-17 10:00:02
 XYZ  1 10  2018-09-17 10:00:03
 XYZ  2 5   2018-09-17 10:00:04
 XYZ  2 17  2018-09-17 10:00:05
 XYZ  2 14  2018-09-17 10:00:06


SELECT *
FROM Ticker
MATCH_RECOGNIZE(
PARTITION BY symbol
ORDER BY rowtime
MEASURES
SUM(A.price) AS sumPrice,
FIRST(rowtime) AS startTime,
LAST(rowtime) AS endTime
ONE ROW PER MATCH
[AFTER MATCH STRATEGY]
PATTERN (A+ C)
DEFINE
A AS SUM(A.price) < 30
)
 

{code}
h5. {{}}
{code:java}
AFTER MATCH SKIP TO LAST A 

symbol   sumPricestartTime  endTime
 == = =
 XYZ  26 2018-09-17 10:00:01   2018-09-17 10:00:04
 XYZ  15 2018-09-17 10:00:03   2018-09-17 10:00:05
 XYZ  22 2018-09-17 10:00:04   2018-09-17 10:00:06
 XYZ  17 2018-09-17 10:00:05   2018-09-17 10:00:06

Again, the first result matched against the rows #1, #2, #3, #4.Compared to the 
previous strategy, the next match includes only row #3 (mapped to A) again for 
the next matching.

Therefore, the second result matched against the rows #3, #4, #5.

The third result matched against the rows #4, #5, #6.

The last result matched against the rows #5, #6.{code}
h5. {{}}
h5. {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19756) Use multi-input optimization by default

2020-10-21 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-19756:
---

 Summary: Use multi-input optimization by default
 Key: FLINK-19756
 URL: https://issues.apache.org/jira/browse/FLINK-19756
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Caizhi Weng
 Fix For: 1.12.0


After the multiple input operator is introduced we should use this optimization 
by default. This will affect a large amount of plan tests so we will do this in 
an independent subtask.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19757) TimeStampData can cause time inconsistent problem

2020-10-21 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-19757:
-

 Summary: TimeStampData can cause time inconsistent problem
 Key: FLINK-19757
 URL: https://issues.apache.org/jira/browse/FLINK-19757
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.11.1
Reporter: xiaogang zhou


when we check jdk LocalDateTime code,we find that

public static LocalDateTime ofEpochSecond(long epochSecond, int nanoOfSecond, 
ZoneOffset offset) {
 Objects.requireNonNull(offset, "offset");
 NANO_OF_SECOND.checkValidValue(nanoOfSecond);
 long localSecond = epochSecond + offset.getTotalSeconds(); // overflow caught 
later
 long localEpochDay = Math.floorDiv(localSecond, SECONDS_PER_DAY);
 int secsOfDay = (int)Math.floorMod(localSecond, SECONDS_PER_DAY);
 LocalDate date = LocalDate.ofEpochDay(localEpochDay);
 LocalTime time = LocalTime.ofNanoOfDay(secsOfDay * NANOS_PER_SECOND + 
nanoOfSecond);
 return new LocalDateTime(date, time);
}

 

offset.getTotalSeconds() they add the offset, but in the TimeStampData

toLocalDateTime, we don't add a offset.

 

I'd like to add a TimeZone.getDefault().getRawOffset() in the 

toLocalDateTime()

and minus a TimeZone.getDefault().getRawOffset() in the 

fromLocalDateTime



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19758) Implement a new unified File Sink based on the new Sink API

2020-10-21 Thread Yun Gao (Jira)
Yun Gao created FLINK-19758:
---

 Summary: Implement a new unified File Sink based on the new Sink 
API
 Key: FLINK-19758
 URL: https://issues.apache.org/jira/browse/FLINK-19758
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Connectors / FileSystem
Affects Versions: 1.12.0
Reporter: Yun Gao
 Fix For: 1.12.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19759) DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange is instable

2020-10-21 Thread Dian Fu (Jira)
Dian Fu created FLINK-19759:
---

 Summary: DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange 
is instable
 Key: FLINK-19759
 URL: https://issues.apache.org/jira/browse/FLINK-19759
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: Dian Fu
 Fix For: 1.12.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8052&view=logs&j=e25d5e7e-2a9c-5589-4940-0b638d75a414&t=a6e0f756-5bb9-5ea8-a468-5f60db442a29

{code}
[ERROR]   DeadlockBreakupTest.testSubplanReuse_AddSingletonExchange:217 
planAfter expected:<...=[>(cnt, 3)])
:  +- [SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
reuse_id=[1])
: +- Exchange(distribution=[single])
:+- LocalSort]Aggregate(select=[Pa...> but was:<...=[>(cnt, 3)])
:  +- [HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS cnt], 
reuse_id=[1])
: +- Exchange(distribution=[single])
:+- LocalHash]Aggregate(select=[Pa...>
[INFO] 
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19760) Make the `GlobalCommitter` a standalone interface that does not extend the `Committer`

2020-10-21 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19760:
-

 Summary: Make the `GlobalCommitter` a standalone interface that 
does not extend the `Committer`
 Key: FLINK-19760
 URL: https://issues.apache.org/jira/browse/FLINK-19760
 Project: Flink
  Issue Type: Sub-task
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)