Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-09-08 Thread Dawid Wysakowicz
Hey Aljoscha

A couple of thoughts for the two remaining TODOs in the doc:

# Processing Time Support in BATCH/BOUNDED execution mode

I think there are two somewhat orthogonal problems around this topic:
    1. Firing processing timers at the end of the job
    2. Having processing timers in the BATCH mode
The way I see it there are three main use cases for different
combinations of the aforementioned dimensions:
    1. Regular streaming jobs: STREAM mode with UNBOUNDED sources
       - we do want to have processing timers
       - there is no end of the job
    2. Debugging/Testing streaming jobs: STREAM mode with BOUNDED sources
       - we do want to have processing timers
       - we want to fire/wait for the timers at the end
    3. batch jobs with DataStream API:
       - we do **NOT** want to have processing timers either during
processing or at the end. We want to either fail-hard or ignore the
timers. Generally speaking, in BATCH mode the processing timers do not
make sense, therefore it would be better to fail-hard. It would be the
safest option, as some of the user logic might depend on the processing
timers. Failing hard would give the user opportunity to react to the
changed behaviour. On the other hand if we want to make it possible to
run exact same program both in STREAM and BATCH mode we must have an
option to simply ignore processing     timers.
       - we never want to actually trigger the timers. Neither during
runtime nor at the end

Having the above in mind, I am thinking if we should introduce two
separate options:
 * processing-time.timers = ENABLE/FAIL/IGNORE
 * processing-time.on-end-of-input = CANCEL/WAIT/TRIGGER
With the two options we can satisfy all the above cases. The default
settings would be:
STREAM:
      processing-time.timers = ENABLE
      processing-time.on-end-of-input = TRIGGER
BATCH:
 processing-time.timers = IGNORE
 processing-time.on-end-of-input = CANCEL

# Event time triggers
I do say that from the implementation perspective, but I find it hard to
actually ignore the event-time triggers. We would have to adjust the
implementation of WindowOperator to do that. At the same time I see no
problem with simply keeping them working. I am wondering if we
should/could just leave them as they are.

# Broadcast State
As far as I am concerned there are no core semantical problems with the
Broadcast State. As of now, it does not give any guarantees about the
order in which the broadcast and non-broadcast sides are executed even
in streaming. It also does not expose any mechanisms to implement an
event/processing-time alignments (you cannot register timers in the
broadcast side). I can't see any of the guarantees breaking in the BATCH
mode.
I do agree it would give somewhat nicer properties in BATCH if we
consumed the broadcast side first. It would make the operation
deterministic and let users implement a broadcast join properly on top
of this method. Nevertheless I see it as an extension of the DataStream
API for BATCH execution rather than making the DataStream API work for
BATCH.  Therefore I'd be fine with the leaving the Broadcast State out
of the FLIP

What do you think?

On 01/09/2020 13:46, Aljoscha Krettek wrote:
> Hmm, it seems I left out the Dev ML in my mail. Looping that back in..
>
>
> On 28.08.20 13:54, Dawid Wysakowicz wrote:
>> @Aljoscha Let me bring back to the ML some of the points we discussed
>> offline.
>>
>> Ad. 1 Yes I agree it's not just about scheduling. It includes more
>> changes to the runtime. We might need to make it more prominent in the
>> write up.
>>
>> Ad. 2 You have a good point here that switching the default value for
>> TimeCharacteristic to INGESTION time might not be the best option as it
>> might hide problems if we assign ingestion time, which is rarely a right
>> choice for user programs. Maybe we could just go with the EVENT_TIME as
>> the default?
>>
>> Ad. 4 That's a very good point! I do agree with you it would be better
>> to change the behaviour of said methods for batch-style execution. Even
>> though it changes the behaviour, the overall logic is still correct.
>> Moreover I'd also recommend deprecating some of the relational-like
>> methods, which we should rather redirect to the Table API. I added a
>> section about it to the FLIP (mostly copying over your message). Let me
>> know what you think about it.
>>
>> Best,
>>
>> Dawid
>>
>> On 25/08/2020 11:39, Aljoscha Krettek wrote:
>>> Thanks for creating this FLIP! I think the general direction is very
>>> good but I think there are some specifics that we should also put in
>>> there and that we may need to discuss here as well.
>>>
>>> ## About batch vs streaming scheduling
>>>
>>> I think we shouldn't call it "scheduling", because the decision
>>> between bounded and unbounded affects more than just scheduling. It
>>> affects how we do network transfers and the semantics of time, among
>>> other things. So maybe we should differentiate between batch-st

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Timo Walther

Hi everyone,

I updated the FLIP again and hope that I could address the mentioned 
concerns.


@Leonard: Thanks for the explanation. I wasn't aware that ts_ms and 
source.ts_ms have different semantics. I updated the FLIP and expose the 
most commonly used properties separately. So frequently used properties 
are not hidden in the MAP anymore:


debezium-json.ingestion-timestamp
debezium-json.source.timestamp
debezium-json.source.database
debezium-json.source.schema
debezium-json.source.table

However, since other properties depend on the used connector/vendor, the 
remaining options are stored in:


debezium-json.source.properties

And accessed with:

CAST(SYSTEM_METADATA('debezium-json.source.properties') AS MAPSTRING>)['table']


Otherwise it is not possible to figure out the value and column type 
during validation.


@Jark: You convinced me in relaxing the CAST constraints. I added a 
dedicacated sub-section to the FLIP:


For making the use of SYSTEM_METADATA easier and avoid nested casting we 
allow explicit casting to a target data type:


rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) WITH LOCAL 
TIME ZONE)


A connector still produces and consumes the data type returned by 
`listMetadata()`. The planner will insert necessary explicit casts.


In any case, the user must provide a CAST such that the computed column 
receives a valid data type when constructing the table schema.


"I don't see a reason why `DecodingFormat#applyReadableMetadata` needs a
DataType argument."

Correct he DeserializationSchema doesn't need TypeInfo, it is always 
executed locally. It is the source that needs TypeInfo for serializing 
the record to the next operator. And that's this is what we provide.


@Danny:

“SYSTEM_METADATA("offset")` returns the NULL type by default”

We can also use some other means to represent an UNKNOWN data type. In 
the Flink type system, we use the NullType for it. The important part is 
that the final data type is known for the entire computed column. As I 
mentioned before, I would avoid the suggested option b) that would be 
similar to your suggestion. The CAST should be enough and allows for 
complex expressions in the computed column. Option b) would need parser 
changes.


Regards,
Timo



On 08.09.20 06:21, Leonard Xu wrote:

Hi, Timo

Thanks for you explanation and update,  I have only one question  for the 
latest FLIP.

About the MAP DataType of key 'debezium-json.source', if user 
want to use the table name metadata, they need to write:
tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source') AS MAP)['table']

the expression is a little complex for user, Could we only support necessary 
metas with simple DataType as following?
tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source.table') AS 
STRING),
transactionTime LONG AS CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS 
BIGINT),

In this way, we can simplify the expression, the mainly used metadata in 
changelog format may include 'database','table','source.ts_ms','ts_ms' from my 
side,
maybe we could only support them at first version.

Both Debezium and Canal have above four metadata, and I‘m willing to take some 
subtasks in next development if necessary.

Debezium:
{
   "before": null,
   "after": {  "id": 101,"name": "scooter"},
   "source": {
 "db": "inventory",  # 1. database name the changelog 
belongs to.
 "table": "products",# 2. table name the changelog belongs 
to.
 "ts_ms": 1589355504100, # 3. timestamp of the change happened 
in database system, i.e.: transaction time in database.
 "connector": "mysql",
 ….
   },
   "ts_ms": 1589355606100,  # 4. timestamp when the debezium 
processed the changelog.
   "op": "c",
   "transaction": null
}

Canal:
{
   "data": [{  "id": "102", "name": "car battery" }],
   "database": "inventory",  # 1. database name the changelog belongs to.
   "table": "products",  # 2. table name the changelog belongs to.
   "es": 1589374013000,  # 3. execution time of the change in database 
system, i.e.: transaction time in database.
   "ts": 1589374013680,  # 4. timestamp when the cannal processed the 
changelog.
   "isDdl": false,
   "mysqlType": {},
   
}


Best
Leonard


在 2020年9月8日,11:57,Danny Chan  写道:

Thanks Timo ~

The FLIP was already in pretty good shape, I have only 2 questions here:


1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid read-only 
computed column for Kafka and can be extracted by the planner.”


What is the pros we follow the SQL-SERVER syntax here ? Usually an expression 
return type can be inferred automatically. But I guess SQL-SERVER does not have 
function like SYSTEM_METADATA which actually does not have a specific return 
type.

And why not use the Oracle or MySQL syntax there ?

column_name [datatype] [GENERATED ALWAYS] AS (expression) [VIRTUAL]
Which is more straight-forward.

2. “SYSTEM_METADATA("offset")` r

[jira] [Created] (FLINK-19163) Add building py38 wheel package of PyFlink in Azure CI

2020-09-08 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-19163:


 Summary: Add building py38 wheel package of PyFlink in Azure CI
 Key: FLINK-19163
 URL: https://issues.apache.org/jira/browse/FLINK-19163
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Build System
Reporter: Huang Xingbo
 Fix For: 1.12.0


Add building py38 wheel package of PyFlink in Azure CI



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Jark Wu
Hi Timo,

The updated CAST SYSTEM_METADATA behavior sounds good to me. I just noticed
that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL TIME ZONE".
So maybe we need to support this, or use "TIMESTAMP(3) WITH LOCAL TIME
ZONE" as the defined type of Kafka timestamp? I think this makes sense,
because it represents the milli-seconds since epoch.

Regarding "DeserializationSchema doesn't need TypeInfo", I don't think so.
The DeserializationSchema implements ResultTypeQueryable, thus the
implementation needs to return an output TypeInfo.
Besides, FlinkKafkaConsumer also
calls DeserializationSchema.getProducedType as the produced type of the
source function [1].

Best,
Jark

[1]:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066

On Tue, 8 Sep 2020 at 16:35, Timo Walther  wrote:

> Hi everyone,
>
> I updated the FLIP again and hope that I could address the mentioned
> concerns.
>
> @Leonard: Thanks for the explanation. I wasn't aware that ts_ms and
> source.ts_ms have different semantics. I updated the FLIP and expose the
> most commonly used properties separately. So frequently used properties
> are not hidden in the MAP anymore:
>
> debezium-json.ingestion-timestamp
> debezium-json.source.timestamp
> debezium-json.source.database
> debezium-json.source.schema
> debezium-json.source.table
>
> However, since other properties depend on the used connector/vendor, the
> remaining options are stored in:
>
> debezium-json.source.properties
>
> And accessed with:
>
> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS MAP STRING>)['table']
>
> Otherwise it is not possible to figure out the value and column type
> during validation.
>
> @Jark: You convinced me in relaxing the CAST constraints. I added a
> dedicacated sub-section to the FLIP:
>
> For making the use of SYSTEM_METADATA easier and avoid nested casting we
> allow explicit casting to a target data type:
>
> rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) WITH LOCAL
> TIME ZONE)
>
> A connector still produces and consumes the data type returned by
> `listMetadata()`. The planner will insert necessary explicit casts.
>
> In any case, the user must provide a CAST such that the computed column
> receives a valid data type when constructing the table schema.
>
> "I don't see a reason why `DecodingFormat#applyReadableMetadata` needs a
> DataType argument."
>
> Correct he DeserializationSchema doesn't need TypeInfo, it is always
> executed locally. It is the source that needs TypeInfo for serializing
> the record to the next operator. And that's this is what we provide.
>
> @Danny:
>
> “SYSTEM_METADATA("offset")` returns the NULL type by default”
>
> We can also use some other means to represent an UNKNOWN data type. In
> the Flink type system, we use the NullType for it. The important part is
> that the final data type is known for the entire computed column. As I
> mentioned before, I would avoid the suggested option b) that would be
> similar to your suggestion. The CAST should be enough and allows for
> complex expressions in the computed column. Option b) would need parser
> changes.
>
> Regards,
> Timo
>
>
>
> On 08.09.20 06:21, Leonard Xu wrote:
> > Hi, Timo
> >
> > Thanks for you explanation and update,  I have only one question  for
> the latest FLIP.
> >
> > About the MAP DataType of key 'debezium-json.source', if
> user want to use the table name metadata, they need to write:
> > tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source') AS
> MAP)['table']
> >
> > the expression is a little complex for user, Could we only support
> necessary metas with simple DataType as following?
> > tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
> STRING),
> > transactionTime LONG AS
> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT),
> >
> > In this way, we can simplify the expression, the mainly used metadata in
> changelog format may include 'database','table','source.ts_ms','ts_ms' from
> my side,
> > maybe we could only support them at first version.
> >
> > Both Debezium and Canal have above four metadata, and I‘m willing to
> take some subtasks in next development if necessary.
> >
> > Debezium:
> > {
> >"before": null,
> >"after": {  "id": 101,"name": "scooter"},
> >"source": {
> >  "db": "inventory",  # 1. database name the
> changelog belongs to.
> >  "table": "products",# 2. table name the changelog
> belongs to.
> >  "ts_ms": 1589355504100, # 3. timestamp of the change
> happened in database system, i.e.: transaction time in database.
> >  "connector": "mysql",
> >  ….
> >},
> >"ts_ms": 1589355606100,  # 4. timestamp when the debezium
> processed the changelog.
> >"op": "c",
> >"transaction": null
> > }
> >
> > Canal:
> > {
> >"data": [{  "id": "102",

Re: [VOTE] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-08 Thread Aljoscha Krettek

+1

We just need to make sure to find a good name before the release but 
shouldn't block any work on this.


Aljoscha

On 08.09.20 07:59, Xintong Song wrote:

Thanks for the vote, @Jincheng.


Concerning the namings, the original idea was, as you suggested, to have
separate configuration names for batch and rocksdb while only one of them
will take effect at a time.


It was then in the discussion thread [1] that @Stepahn suggested to combine
these two.


 We never have batch algos and RocksDB mixed, having this as separate
options is confusing as it suggests this can be combined arbitrarily. I
also think that a slim possibility that we may ever combine this in the
future is not enough reason to make it more complex/confusing.



This suggestion was also supported by others in the discussion thread.
That's why we are trying to come up with a name that covers both batch and
rocksdb memory consumers.


Thank you~

Xintong Song


[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-tp44146p44253.html

On Tue, Sep 8, 2020 at 1:37 PM jincheng sun 
wrote:


+1 for the proposal!

Regarding the name of `BATCH_OP/ROCKSDB`, we can reserve the configuration
names for batch and rocksdb respectively, ` batch_ OP` for batch, "ROCKSDB"
for roockdb. and the default value as follows:

{
 BATCH_OP: 70,
 ROCKSDB : 70,
 PYTHON : 30
}

Only one of `BATCH_ OP` and `ROCKSDB` will work. What do you think?

Best,
Jincheng


Xintong Song  于2020年9月7日周一 下午1:46写道:


Thanks for the votes.

Concerning the name for batch/RocksDB memory consumer, how about

"execution

memory"?
We can further explain in docs and config option description that this is
used for job execution, which is currently dedicated to rocksdb in
streaming and batch algorithms in batch.

Thank you~

Xintong Song



On Mon, Sep 7, 2020 at 11:43 AM Yangze Guo  wrote:


+1

Best,
Yangze Guo

On Mon, Sep 7, 2020 at 10:54 AM Zhu Zhu  wrote:


+1

Thanks,
Zhu

Dian Fu  于2020年9月7日周一 上午10:34写道:


+1


在 2020年9月3日,下午8:46,Till Rohrmann  写道:

Hi Xintong,

thanks for starting the vote.

+1 for the proposal given that we find a proper name for the
different memory consumers (specifically the batch/RocksDB

consumer)

and

their corresponding weights.

Cheers,
Till

On Thu, Sep 3, 2020 at 12:43 PM Xintong Song <

tonysong...@gmail.com>

wrote:



Hi devs,

I'd like to start a voting thread on FLIP-141[1], which proposes

how

managed memory should be shared by various use cases within a

slot.

The

proposal has been discussed in [2].

The vote will be open for at least 72h + weekends. I'll try to

close it

on

September 8, unless there is an objection or not enough votes.

Thank you~

Xintong Song


[1]









https://cwiki.apache.org/confluence/display/FLINK/FLIP-141%3A+Intra-Slot+Managed+Memory+Sharing#FLIP141:IntraSlotManagedMemorySharing-compatibility


[2]









http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-td44146.html

















Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-08 Thread Dawid Wysakowicz
Hey Kurt,

Thank you for comments!

Ad. 1 I might have missed something here, but as far as I see it is that
using the current execution stack with regular state backends (RocksDB
in particular if we want to have spilling capabilities) is equivalent to
hash-based execution. I can see a different spilling state backend
implementation in the future, but I think it is not batch specifc. Or am
I missing something?

Ad. 2 Totally agree that normalized keys are important to the
performance. I think though TypeComparators are not a necessity to have
that. Actually  this proposal is heading towards only ever performing
"normalized keys" comparison. I have not included in the proposal the
binary format which we will use for sorting (partially because I forgot,
and partially because I thought it was too much of an implementation
detail). Let me include it here though, as it might clear the situation
a bit here.

In DataSet, at times we have KeySelectors which extract keys based on
field indices or names. This allows in certain situation to extract the
key from serialized records. Compared to DataSet, in DataStream, the key
is always described with a black-box KeySelector, or differently with a
function which extracts a key from a deserialized record.  In turn there
is no way to create a comparator that could compare records by
extracting the key from a serialized record (neither with, nor without
key normalization). We suggest that the input for the sorter will be

 +  + 

Without having the key prepended we would have to deserialize the record
for every key comparison.

Therefore if we agree that we perform binary comparison for keys (which
are always prepended), it is actually equivalent to a DataSet with
TypeComparators that support key normalization.

Let me know if that is clear, or I have missed something here.

Best,

Dawid

On 08/09/2020 03:39, Kurt Young wrote:
> Hi Dawid, thanks for bringing this up, it's really exciting to see that
> batch execution is introduced in DataStream. From the flip, it seems
> we are sticking with sort based execution mode (at least for now), which
> will sort the whole input data before any *keyed* operation is
> executed. I have two comments here:
>
> 1. Do we want to introduce hash-based execution in the future? Sort is a
> safe choice but not the best in lots of cases. IIUC we only need
> to make sure that before the framework finishes dealing with one key, the
> operator doesn't see any data belonging to other keys, thus
> hash-based execution would also do the trick. Oon tricky thing the
> framework might need to deal with is memory constraint and spilling
> in the hash map, but Flink also has some good knowledge about these stuff.
>
> 2. Going back to sort-based execution and how to sort keys. From my
> experience, the performance of sorting would be one the most important
> things if we want to achieve good performance of batch execution. And
> normalized keys are actually the key of the performance of sorting.
> If we want to get rid of TypeComparator, I think we still need to find a
> way to introduce this back.
>
> Best,
> Kurt
>
>
> On Tue, Sep 8, 2020 at 3:04 AM Aljoscha Krettek  wrote:
>
>> Yes, I think we can address the problem of indeterminacy in a separate
>> FLIP because we're already in it.
>>
>> Aljoscha
>>
>> On 07.09.20 17:00, Dawid Wysakowicz wrote:
>>> @Seth That's a very good point. I agree that RocksDB has the same
>>> problem. I think we can use the same approach for the sorted shuffles
>>> then. @Aljoscha I agree we should think about making it more resilient,
>>> as I guess users might have problems already if they use keys with
>>> non-deterministic binary representation. How do you feel about
>>> addressing that separately purely to limit the scope of this FLIP?
>>>
>>> @Aljoscha I tend to agree with you that the best place to actually place
>>> the sorting would be in the InputProcessor(s). If there are no more
>>> suggestions in respect to that issue. I'll put this proposal for voting.
>>>
>>> @all Thank you for the feedback so far. I'd like to start a voting
>>> thread on the proposal tomorrow. Therefore I'd appreciate if you comment
>>> before that, if you still have some outstanding ideas.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 04/09/2020 17:13, Aljoscha Krettek wrote:
 Seth is right, I was just about to write that as well. There is a
 problem, though, because some of our TypeSerializers are not
 deterministic even though we use them as if they were. Beam excludes
 the FloatCoder, for example, and the AvroCoder in certain cases. I'm
 pretty sure there is also weirdness going on in our KryoSerializer.

 On 04.09.20 14:59, Seth Wiesman wrote:
> There is already an implicit assumption the TypeSerializer for keys is
> stable/deterministic, RocksDB compares keys using their serialized byte
> strings. I think this is a non-issue (or at least it's not changing the
> status quo).
>
> On Fri, Sep 4, 

Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

2020-09-08 Thread Timo Walther

Hi Shengkai,

first of I would not consider the section Problems of 
SupportsWatermarkPushDown" as a "problem". It was planned to update the 
WatermarkProvider once the interfaces are ready. See the comment in 
WatermarkProvider:


// marker interface that will be filled after FLIP-126:
// WatermarkGenerator getWatermarkGenerator();

So far we had no sources that actually implement WatermarkStrategy.

Second, for generating watermarks I don't see a problem in merging the 
two mentioned interfaces SupportsWatermarkPushDown and 
SupportsComputedColumnPushDown into one. The descibed design sounds 
reasonable to me and the impact on performance should not be too large.


However, by merging these two interfaces we are also merging two 
completely separate concepts. Computed columns are not always used for 
generating a rowtime or watermark. Users can and certainly will 
implement more complex logic in there. One example could be decrypting 
encrypted records/columns, performing checksum checks, reading metadata etc.


So in any case we should still provide two interfaces:

SupportsWatermarkPushDown (functionality of computed columns + watermarks)


SupportsComputedColumnPushDown (functionality of computed columns only)

I'm fine with such a design, but it is also confusing for implementers 
that SupportsWatermarkPushDown includes the functionality of the other 
interface.


What do you think?

Regards,
Timo


On 08.09.20 04:32, Jark Wu wrote:

Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces
and solutions.

I think the new proposal, i.e. only pushing the "WatermarkStrategy" is much
cleaner and easier to develop than before.
So I'm +1 to the proposal.

Best,
Jark

On Sat, 5 Sep 2020 at 13:44, Shengkai Fang  wrote:


Hi, all. It seems the format is not normal. So I add a google doc in
link[1]. This discussion is about the interfaces in FLIP-95:  New Table
Source And Table Sink and propose to merge two interfaces
SupportsWatermarkPushDown and SupportsComputedColumnPushDown.

I am looking forward to any opinions and suggestions from the community.

Regards,
Shengkai

[1]

https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#

Shengkai Fang  于2020年9月4日周五 下午2:58写道:


Hi, all. Currently, we use two seperated interfaces
SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design.

The

interface SupportsWatermarkPushDown relies on
SupportsComputedColumnPushDown when watermark is defined on a computed
column. During the implementation, we find the method in
SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider
and the duplication of SupportsComputedColumnPushDown and
SupportsProjectionPushDown. Therefore, we decide to propose a new
interface of SupportsWatermarkPushDown to solve the problems we

mentioned.



*Problems of SupportsComputedColumnPushDown and

SupportsWatermarkPushDown*Problems

of SupportsWatermarkPushDown

SupportsWatermarkPushDown uses an inner interface named

WatermarkProvider to

register WatermarkGenerator into DynamicTableSource now. However, the
community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to
create watermark generators in FLIP-126. WatermarkStrategy is a factory
of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it
is used to generate deprecated AssignerWithPeriodicWatermarks and
PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
suitable to use the WatermarkProvider any more.


Problems of SupportsComputedColumnPushDown

There are two problems around when using SupportsComputedColumnPushDown
  alone.

First, planner will transform the computed column and query such as

select

a+b to a LogicalProject. When it comes to the optimization phase, we have
no means to distinguish whether the Rexnode in the projection is from
computed columns or query. So SupportsComputedColumnPushDown in reality
will push not only the computed column but also the calculation in the
query.

Second, when a plan matches the rule
PushComputedColumnIntoTableSourceScanRule, we have to build a new

RowData to

place all fields we require. However, both two rules
PushComputedColumnIntoTableSourceScanRule and
PushProjectIntoTableSourceScanRule will do the same work that prune the
records that read from source. It seems that we have two duplicate rules

in

planner. But I think we should use the rule
PushProjectIntoTableSourceScanRule rather than
PushComputedColumnIntoTableSourceScanRule if we don't support watermark
push down. Compared to PushComputedColumnIntoTableSourceScanRule,
PushProjectIntoTableSourceScanRule is much lighter and we can read pruned
data from source rather than use a map function in flink.

Therefore, we think it's not a good idea to use two interfaces rather

than

one.


*New Proposal*

First of all, let us addr

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Timo Walther

Hi Jark,

according to Flink's and Calcite's casting definition in [1][2] 
TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If not, 
we will make it possible ;-)


I'm aware of DeserializationSchema.getProducedType but I think that this 
method is actually misplaced. The type should rather be passed to the 
source itself.


For our Kafka SQL source, we will also not use this method because the 
Kafka source will add own metadata in addition to the 
DeserializationSchema. So DeserializationSchema.getProducedType will 
never be read.


For now I suggest to leave out the `DataType` from 
DecodingFormat.applyReadableMetadata. Also because the format's physical 
type is passed later in `createRuntimeDecoder`. If necessary, it can be 
computed manually by consumedType + metadata types. We will provide a 
metadata utility class for that.


Regards,
Timo


[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
[2] 
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254



On 08.09.20 10:52, Jark Wu wrote:

Hi Timo,

The updated CAST SYSTEM_METADATA behavior sounds good to me. I just noticed
that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL TIME ZONE".
So maybe we need to support this, or use "TIMESTAMP(3) WITH LOCAL TIME
ZONE" as the defined type of Kafka timestamp? I think this makes sense,
because it represents the milli-seconds since epoch.

Regarding "DeserializationSchema doesn't need TypeInfo", I don't think so.
The DeserializationSchema implements ResultTypeQueryable, thus the
implementation needs to return an output TypeInfo.
Besides, FlinkKafkaConsumer also
calls DeserializationSchema.getProducedType as the produced type of the
source function [1].

Best,
Jark

[1]:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066

On Tue, 8 Sep 2020 at 16:35, Timo Walther  wrote:


Hi everyone,

I updated the FLIP again and hope that I could address the mentioned
concerns.

@Leonard: Thanks for the explanation. I wasn't aware that ts_ms and
source.ts_ms have different semantics. I updated the FLIP and expose the
most commonly used properties separately. So frequently used properties
are not hidden in the MAP anymore:

debezium-json.ingestion-timestamp
debezium-json.source.timestamp
debezium-json.source.database
debezium-json.source.schema
debezium-json.source.table

However, since other properties depend on the used connector/vendor, the
remaining options are stored in:

debezium-json.source.properties

And accessed with:

CAST(SYSTEM_METADATA('debezium-json.source.properties') AS MAP)['table']

Otherwise it is not possible to figure out the value and column type
during validation.

@Jark: You convinced me in relaxing the CAST constraints. I added a
dedicacated sub-section to the FLIP:

For making the use of SYSTEM_METADATA easier and avoid nested casting we
allow explicit casting to a target data type:

rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) WITH LOCAL
TIME ZONE)

A connector still produces and consumes the data type returned by
`listMetadata()`. The planner will insert necessary explicit casts.

In any case, the user must provide a CAST such that the computed column
receives a valid data type when constructing the table schema.

"I don't see a reason why `DecodingFormat#applyReadableMetadata` needs a
DataType argument."

Correct he DeserializationSchema doesn't need TypeInfo, it is always
executed locally. It is the source that needs TypeInfo for serializing
the record to the next operator. And that's this is what we provide.

@Danny:

“SYSTEM_METADATA("offset")` returns the NULL type by default”

We can also use some other means to represent an UNKNOWN data type. In
the Flink type system, we use the NullType for it. The important part is
that the final data type is known for the entire computed column. As I
mentioned before, I would avoid the suggested option b) that would be
similar to your suggestion. The CAST should be enough and allows for
complex expressions in the computed column. Option b) would need parser
changes.

Regards,
Timo



On 08.09.20 06:21, Leonard Xu wrote:

Hi, Timo

Thanks for you explanation and update,  I have only one question  for

the latest FLIP.


About the MAP DataType of key 'debezium-json.source', if

user want to use the table name metadata, they need to write:

tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source') AS

MAP)['table']


the expression is a little complex for user, Could we only support

necessary metas with simple DataType as following?

tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source.table') AS

STRING),

transactionTime LONG AS

CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT),


In t

Re: [DISCUSS] FLIP-33: Standardize connector metrics

2020-09-08 Thread Konstantin Knauf
Hi Becket,

Thank you for picking up this FLIP. I have a few questions:

* two thoughts on naming:
   * idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" has
been introduced in https://issues.apache.org/jira/browse/FLINK-16864. They
have a similar name, but different definitions of idleness,
e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is
backpressured. Can we make it clearer that these two metrics mean different
things?

  * "current(Fetch)Latency" I am wondering if "eventTimeLag(Before|After)"
is more descriptive/clear. What do others think?

  * Current(Fetch)Latency implies that the timestamps are directly
extracted in the source connector, right? Will this be the default for
FLIP-27 sources anyway?

* Does FLIP-33 also include the implementation of these metrics (to the
extent possible) for all connectors currently available in Apache Flink or
is the "per-connector implementation" out-of-scope?

Thanks,

Konstantin





On Fri, Sep 4, 2020 at 4:56 PM Becket Qin  wrote:

> Hi all,
>
> To complete the Source refactoring work, I'd like to revive this
> discussion. Since the mail thread has been dormant for more than a year,
> just to recap the motivation of the FLIP:
>
> 1. The FLIP proposes to standardize the connector metrics by giving
> guidance on the metric specifications, including the name, type and meaning
> of the metrics.
> 2. It is OK for a connector to not emit some of the metrics in the metric
> guidance, but if a metric of the same semantic is emitted, the
> implementation should follow the guidance.
> 3. It is OK for a connector to emit more metrics than what are listed in
> the FLIP. This includes having an alias for a metric specified in the FLIP.
> 4. We will implement some of the metrics out of the box in the default
> implementation of FLIP-27, as long as it is applicable.
>
> The FLIP wiki is following:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33
> %3A+Standardize+Connector+Metrics
>
> Any thoughts?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Fri, Jun 14, 2019 at 2:29 PM Piotr Nowojski 
> wrote:
>
> > > we will need to revisit the convention list and adjust them accordingly
> > when FLIP-27 is ready
> >
> >
> > Yes, this sounds good :)
> >
> > Piotrek
> >
> > > On 13 Jun 2019, at 13:35, Becket Qin  wrote:
> > >
> > > Hi Piotr,
> > >
> > > That's great to know. Chances are that we will need to revisit the
> > > convention list and adjust them accordingly when FLIP-27 is ready, At
> > that
> > > point we can mark some of the metrics as available by default for
> > > connectors implementing the new interface.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Jun 13, 2019 at 3:51 PM Piotr Nowojski 
> > wrote:
> > >
> > >> Thanks for driving this. I’ve just noticed one small thing. With new
> > >> SourceReader interface Flink will be able to provide `idleTime` metric
> > >> automatically.
> > >>
> > >> Piotrek
> > >>
> > >>> On 13 Jun 2019, at 03:30, Becket Qin  wrote:
> > >>>
> > >>> Thanks all for the feedback and discussion.
> > >>>
> > >>> Since there wasn't any concern raised, I've started the voting thread
> > for
> > >>> this FLIP, but please feel free to continue the discussion here if
> you
> > >>> think something still needs to be addressed.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Jiangjie (Becket) Qin
> > >>>
> > >>>
> > >>>
> > >>> On Mon, Jun 10, 2019 at 9:10 AM Becket Qin 
> > wrote:
> > >>>
> >  Hi Piotr,
> > 
> >  Thanks for the comments. Yes, you are right. Users will have to look
> > at
> >  other metrics to decide whether the pipeline is healthy or not in
> the
> > >> first
> >  place before they can use the time-based metric to fix the
> bottleneck.
> > 
> >  I agree that once we have FLIP-27 ready, some of the metrics can
> just
> > be
> >  reported by the abstract implementation.
> > 
> >  I've updated FLIP-33 wiki page to add the pendingBytes and
> > >> pendingRecords
> >  metric. Please let me know if you have any concern over the updated
> > >> metric
> >  convention proposal.
> > 
> >  @Chesnay Schepler  @Stephan Ewen
> >   will you also have time to take a look at
> the
> >  proposed metric convention? If there is no further concern I'll
> start
> > a
> >  voting thread for this FLIP in two days.
> > 
> >  Thanks,
> > 
> >  Jiangjie (Becket) Qin
> > 
> > 
> > 
> >  On Wed, Jun 5, 2019 at 6:54 PM Piotr Nowojski 
> > >> wrote:
> > 
> > > Hi Becket,
> > >
> > > Thanks for the answer :)
> > >
> > >> By time-based metric, I meant the portion of time spent on
> producing
> > >> the
> > >> record to downstream. For example, a source connector can report
> > that
> > > it's
> > >> spending 80% of time to emit record to downstream processing
> > pipeline.
> > > In
> > >> another case, a sink connector may report that its spending 30% of
> > >> time
> 

1.11.1 Hive connector doesn't work with Hive 1.0 or 1.1

2020-09-08 Thread Rui Li
Hello dev,

A user hits the following issue when using Flink 1.11.1 to connect to Hive
1.1.0:

java.lang.NoSuchMethodError:
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(Lorg/apache/hadoop/conf/Configuration;)V
> at
> org.apache.flink.table.catalog.hive.client.HiveShimV100.getHiveMetastoreClient(HiveShimV100.java:97)


The HiveMetastoreClient constructor takes a HiveConf parameter in Hive
2.3.4 [1], which is the default hive version we build with. But in Hive
3.1.1, the signature changed and takes a Configuration parameter instead
[2].

So I suspect the hive connector jar we published in 1.11.1 was somehow
built against Hive 3.x -- we do have a Hive-3.1.1 maven profile to build &
test with 3.1.1. If that's the case, then the 1.11.1 hive connector won't
work with Hive 1.0 and 1.1. I asked the user to fall back to 1.11.0 and the
problem goes away.

Could anybody let me know how we built the maven artifacts for the release?
Thanks.

[1]
https://github.com/apache/hive/blob/rel/release-2.3.4/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java#L128
[2]
https://github.com/apache/hive/blob/rel/release-3.1.1/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java#L136

-- 
Best regards!
Rui Li


Re: 1.11.1 Hive connector doesn't work with Hive 1.0 or 1.1

2020-09-08 Thread Dian Fu
Hi Rui, 

The maven artifacts are built using the script: 
releasing/deploy_staging_jars.sh [1]. 

Regards,
Dian

[1] https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release

> 在 2020年9月8日,下午7:19,Rui Li  写道:
> 
> maven artifacts



Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

2020-09-08 Thread Jark Wu
Hi Timo,

Regarding "pushing other computed columns into source, e.g. encrypted
records/columns, performing checksum checks, reading metadata etc.",
I'm not sure about this.
1. the planner don't know which computed column should be pushed into source
2. it seems that we can't improve performances if we pushdown complex logic
into source, we still need to calculate them anyway.
3. the computed column is a regular expression, if the computed column
should be pushed down, then shall we push the expressions in the following
Projection too?
If yes, then the name of "SupportsComputedColumnPushDown" might be not
correct.
4. regarding reading metadata, according to FLIP-107, we don't use the
existing SupportsComputedColumnPushDown, but a new interface.

Therefore, I don't find a strong use case that needs this interface so far.

Best,
Jark




On Tue, 8 Sep 2020 at 17:13, Timo Walther  wrote:

> Hi Shengkai,
>
> first of I would not consider the section Problems of
> SupportsWatermarkPushDown" as a "problem". It was planned to update the
> WatermarkProvider once the interfaces are ready. See the comment in
> WatermarkProvider:
>
> // marker interface that will be filled after FLIP-126:
> // WatermarkGenerator getWatermarkGenerator();
>
> So far we had no sources that actually implement WatermarkStrategy.
>
> Second, for generating watermarks I don't see a problem in merging the
> two mentioned interfaces SupportsWatermarkPushDown and
> SupportsComputedColumnPushDown into one. The descibed design sounds
> reasonable to me and the impact on performance should not be too large.
>
> However, by merging these two interfaces we are also merging two
> completely separate concepts. Computed columns are not always used for
> generating a rowtime or watermark. Users can and certainly will
> implement more complex logic in there. One example could be decrypting
> encrypted records/columns, performing checksum checks, reading metadata
> etc.
>
> So in any case we should still provide two interfaces:
>
> SupportsWatermarkPushDown (functionality of computed columns + watermarks)
>
>
> SupportsComputedColumnPushDown (functionality of computed columns only)
>
> I'm fine with such a design, but it is also confusing for implementers
> that SupportsWatermarkPushDown includes the functionality of the other
> interface.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 08.09.20 04:32, Jark Wu wrote:
> > Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces
> > and solutions.
> >
> > I think the new proposal, i.e. only pushing the "WatermarkStrategy" is
> much
> > cleaner and easier to develop than before.
> > So I'm +1 to the proposal.
> >
> > Best,
> > Jark
> >
> > On Sat, 5 Sep 2020 at 13:44, Shengkai Fang  wrote:
> >
> >> Hi, all. It seems the format is not normal. So I add a google doc in
> >> link[1]. This discussion is about the interfaces in FLIP-95:  New Table
> >> Source And Table Sink and propose to merge two interfaces
> >> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
> >>
> >> I am looking forward to any opinions and suggestions from the community.
> >>
> >> Regards,
> >> Shengkai
> >>
> >> [1]
> >>
> >>
> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
> >>
> >> Shengkai Fang  于2020年9月4日周五 下午2:58写道:
> >>
> >>> Hi, all. Currently, we use two seperated interfaces
> >>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design.
> >> The
> >>> interface SupportsWatermarkPushDown relies on
> >>> SupportsComputedColumnPushDown when watermark is defined on a computed
> >>> column. During the implementation, we find the method in
> >>> SupportsWatermarkPushDown uses an out-of-date interface
> WatermarkProvider
> >>> and the duplication of SupportsComputedColumnPushDown and
> >>> SupportsProjectionPushDown. Therefore, we decide to propose a new
> >>> interface of SupportsWatermarkPushDown to solve the problems we
> >> mentioned.
> >>>
> >>>
> >>> *Problems of SupportsComputedColumnPushDown and
> >> SupportsWatermarkPushDown*Problems
> >>> of SupportsWatermarkPushDown
> >>>
> >>> SupportsWatermarkPushDown uses an inner interface named
> >> WatermarkProvider to
> >>> register WatermarkGenerator into DynamicTableSource now. However, the
> >>> community uses org.apache.flink.api.common.eventtime.WatermarkStrategy
> to
> >>> create watermark generators in FLIP-126. WatermarkStrategy is a factory
> >>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
> >>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
> >>> Kafka-partition-aware watermarks. As for the origin WatermarkProvider,
> it
> >>> is used to generate deprecated AssignerWithPeriodicWatermarks and
> >>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
> >>> suitable to use the WatermarkProvider any more.
> >>>
> >>>
> >>> Problems of SupportsComputedColumnPushDown
> >>>
> >>> There are two problems around when using Supp

Re: 1.11.1 Hive connector doesn't work with Hive 1.0 or 1.1

2020-09-08 Thread Rui Li
Thanks Dian. The script looks all right to me. I'll double check with the
user whether the issue is related to his building environment.

On Tue, Sep 8, 2020 at 7:36 PM Dian Fu  wrote:

> Hi Rui,
>
> The maven artifacts are built using the
> script: releasing/deploy_staging_jars.sh [1].
>
> Regards,
> Dian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release
>
> 在 2020年9月8日,下午7:19,Rui Li  写道:
>
> maven artifacts
>
>
>

-- 
Best regards!
Rui Li


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-08 Thread Danny Chan
Hi, Timo ~

"It is not about changelog mode compatibility, it is about the type
compatibility.”

For fromDataStream(dataStream, Schema), there should not be compatibility 
problem or data type inconsistency. We know the logical type from Schema and 
physical type from the dataStream itself.

For toDataStream(table, AbstractDataType), we can get the logical type from 
the table itself
and the physical type from the passed data type.

If both behavior are deterministic, what's the problem for type compatibility 
and safety?

My concern is that in most of the cases, people use the "insert stream", they 
do not need to care about
the data stream ChangelogMode, so there is no need to distinguish them from the 
APIs, an optional param is enough. If we introduces 2 new API there, people 
have to choose between them, and can fromChangelogStream()
accept an insert stream ? What is the behavior if fromInsertStream() accepts a 
changelog stream ?


"This means potentially duplicate definition of fields and their data types etc”

I agree, because table already has an underlying schema there.

Best,
Danny Chan
在 2020年9月3日 +0800 PM8:12,Timo Walther ,写道:
> Hi Danny,
>
> "if ChangelogMode.INSERT is the default, existing pipelines should be
> compatible"
>
> It is not about changelog mode compatibility, it is about the type
> compatibility. The renaming to `toInsertStream` is only to have a mean
> of dealing with data type inconsistencies that could break existing
> pipelines.
>
> As the FLIP describes, the following new behavior should be implemented:
>
> - It does this by translating the TypeInformation to DataType.
> - This will happen with a new TypeInfoDataTypeConverter that will no
> longer produce LegacyTypeInformationType.
> - All types from DataStream API should be supported by this converter.
> - TupleTypeInfoBase will be translated into a proper RowType or
> StructuredType.
> - BigDecimals will be converted to DECIMAL(38,18) by default.
> - Composite types (tuples, POJOs, rows) will be flattened by default if
> they are used as top-level records (similar to the old behavior).
> - The order of POJO field's is determined by the DataTypeExtractor and
> must not be defined manually anymore.
> - GenericTypeInfo is converted to RawType immediately by considering the
> current configuration.
> - A DataStream that originated from Table API will keep its DataType
> information due to ExternalTypeInfo implementing DataTypeQueryable.
>
> I would feel safer if we do this under a new method name.
>
> "toDataStream(table, schema.bindTo(DataType))"
>
> This is what I meant with "integrate the DataType into the Schema class
> itself". Yes, we can do that if everybody is fine with it. But why
> should a user specify both a schema and a data type? This means
> potentially duplicate definition of fields and their data types etc.
>
> Regards,
> Timo
>
>
> On 03.09.20 11:31, Danny Chan wrote:
> > "It is a more conservative approach to introduce that in a
> > new method rather than changing the existing one under the hood and
> > potentially break existing pipelines silently”
> >
> > I like the idea actually, but if ChangelogMode.INSERT is the default, 
> > existing pipelines should be compatible. We can see the other kinds of 
> > ChangelogMode as an extension.
> >
> > “for `toDataStream` users need to be
> > able to express whether they would prefer Row, POJO or atomic”
> >
> > I think most of the cases people do not need to convert the stream to a Row 
> > or POJO, because the table projection always returns a flatternned internal 
> > row, if people did want a POJO there, how about we bind the DataType to the 
> > existing schema, like this
> >
> > toDataStream(table, schema.bindTo(DataType))
> >
> > Best,
> > Danny Chan
> > 在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:
> > >
> > > It is a more conservative approach to introduce that in a
> > > new method rather than changing the existing one under the hood and
> > > potentially break existing pipelines silently
> >
>


Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

2020-09-08 Thread Shengkai Fang
Hi Timo and Jark.Thanks for your replies.

Maybe I don't explain clearly in doc. I think the main reason behind is we
have no means to distinguish the calc in LogicalProject. Let me give you an
example to illustrate the reason. Assume we have 2 cases:


case 1:

create table MyTable (

int a,

int b

) with (

...

)


we use sql "select a, b, a+b as c from MyTable" to get the results.


and


case 2:

create table MyTableWithComputedColumn (

a int,

b int,

c as a + b

) with (

...

)


we use sql "select a, b, c from MyTableWithComputedColumn" to get the
results.


When coming to planner, the two sqls will have the same plan, which means
we will also push calculation from query to scan if we support computed
column push down.


As a supplement of Jark's response, currently
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy)
uses WatermarkStrategy to register watermark generator supplier. I think
it's ok to use WatermarkStrategy directly because FLIP-126 has been
finished.



Jark Wu  于2020年9月8日周二 下午7:38写道:

> Hi Timo,
>
> Regarding "pushing other computed columns into source, e.g. encrypted
> records/columns, performing checksum checks, reading metadata etc.",
> I'm not sure about this.
> 1. the planner don't know which computed column should be pushed into
> source
> 2. it seems that we can't improve performances if we pushdown complex logic
> into source, we still need to calculate them anyway.
> 3. the computed column is a regular expression, if the computed column
> should be pushed down, then shall we push the expressions in the following
> Projection too?
> If yes, then the name of "SupportsComputedColumnPushDown" might be not
> correct.
> 4. regarding reading metadata, according to FLIP-107, we don't use the
> existing SupportsComputedColumnPushDown, but a new interface.
>
> Therefore, I don't find a strong use case that needs this interface so far.
>
> Best,
> Jark
>
>
>
>
> On Tue, 8 Sep 2020 at 17:13, Timo Walther  wrote:
>
> > Hi Shengkai,
> >
> > first of I would not consider the section Problems of
> > SupportsWatermarkPushDown" as a "problem". It was planned to update the
> > WatermarkProvider once the interfaces are ready. See the comment in
> > WatermarkProvider:
> >
> > // marker interface that will be filled after FLIP-126:
> > // WatermarkGenerator getWatermarkGenerator();
> >
> > So far we had no sources that actually implement WatermarkStrategy.
> >
> > Second, for generating watermarks I don't see a problem in merging the
> > two mentioned interfaces SupportsWatermarkPushDown and
> > SupportsComputedColumnPushDown into one. The descibed design sounds
> > reasonable to me and the impact on performance should not be too large.
> >
> > However, by merging these two interfaces we are also merging two
> > completely separate concepts. Computed columns are not always used for
> > generating a rowtime or watermark. Users can and certainly will
> > implement more complex logic in there. One example could be decrypting
> > encrypted records/columns, performing checksum checks, reading metadata
> > etc.
> >
> > So in any case we should still provide two interfaces:
> >
> > SupportsWatermarkPushDown (functionality of computed columns +
> watermarks)
> >
> >
> > SupportsComputedColumnPushDown (functionality of computed columns only)
> >
> > I'm fine with such a design, but it is also confusing for implementers
> > that SupportsWatermarkPushDown includes the functionality of the other
> > interface.
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> >
> > On 08.09.20 04:32, Jark Wu wrote:
> > > Thanks to Shengkai for summarizing the problems on the FLIP-95
> interfaces
> > > and solutions.
> > >
> > > I think the new proposal, i.e. only pushing the "WatermarkStrategy" is
> > much
> > > cleaner and easier to develop than before.
> > > So I'm +1 to the proposal.
> > >
> > > Best,
> > > Jark
> > >
> > > On Sat, 5 Sep 2020 at 13:44, Shengkai Fang  wrote:
> > >
> > >> Hi, all. It seems the format is not normal. So I add a google doc in
> > >> link[1]. This discussion is about the interfaces in FLIP-95:  New
> Table
> > >> Source And Table Sink and propose to merge two interfaces
> > >> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
> > >>
> > >> I am looking forward to any opinions and suggestions from the
> community.
> > >>
> > >> Regards,
> > >> Shengkai
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
> > >>
> > >> Shengkai Fang  于2020年9月4日周五 下午2:58写道:
> > >>
> > >>> Hi, all. Currently, we use two seperated interfaces
> > >>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in
> design.
> > >> The
> > >>> interface SupportsWatermarkPushDown relies on
> > >>> SupportsComputedColumnPushDown when watermark is defined on a
> computed
> > >>> column. During the implementation, we find the method in
> > >>> SupportsWaterma

Re: [VOTE] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-08 Thread Yu Li
+1

Best Regards,
Yu


On Tue, 8 Sep 2020 at 17:03, Aljoscha Krettek  wrote:

> +1
>
> We just need to make sure to find a good name before the release but
> shouldn't block any work on this.
>
> Aljoscha
>
> On 08.09.20 07:59, Xintong Song wrote:
> > Thanks for the vote, @Jincheng.
> >
> >
> > Concerning the namings, the original idea was, as you suggested, to have
> > separate configuration names for batch and rocksdb while only one of them
> > will take effect at a time.
> >
> >
> > It was then in the discussion thread [1] that @Stepahn suggested to
> combine
> > these two.
> >
> >>  We never have batch algos and RocksDB mixed, having this as
> separate
> >> options is confusing as it suggests this can be combined arbitrarily. I
> >> also think that a slim possibility that we may ever combine this in the
> >> future is not enough reason to make it more complex/confusing.
> >>
> >
> > This suggestion was also supported by others in the discussion thread.
> > That's why we are trying to come up with a name that covers both batch
> and
> > rocksdb memory consumers.
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-tp44146p44253.html
> >
> > On Tue, Sep 8, 2020 at 1:37 PM jincheng sun 
> > wrote:
> >
> >> +1 for the proposal!
> >>
> >> Regarding the name of `BATCH_OP/ROCKSDB`, we can reserve the
> configuration
> >> names for batch and rocksdb respectively, ` batch_ OP` for batch,
> "ROCKSDB"
> >> for roockdb. and the default value as follows:
> >>
> >> {
> >>  BATCH_OP: 70,
> >>  ROCKSDB : 70,
> >>  PYTHON : 30
> >> }
> >>
> >> Only one of `BATCH_ OP` and `ROCKSDB` will work. What do you think?
> >>
> >> Best,
> >> Jincheng
> >>
> >>
> >> Xintong Song  于2020年9月7日周一 下午1:46写道:
> >>
> >>> Thanks for the votes.
> >>>
> >>> Concerning the name for batch/RocksDB memory consumer, how about
> >> "execution
> >>> memory"?
> >>> We can further explain in docs and config option description that this
> is
> >>> used for job execution, which is currently dedicated to rocksdb in
> >>> streaming and batch algorithms in batch.
> >>>
> >>> Thank you~
> >>>
> >>> Xintong Song
> >>>
> >>>
> >>>
> >>> On Mon, Sep 7, 2020 at 11:43 AM Yangze Guo  wrote:
> >>>
>  +1
> 
>  Best,
>  Yangze Guo
> 
>  On Mon, Sep 7, 2020 at 10:54 AM Zhu Zhu  wrote:
> >
> > +1
> >
> > Thanks,
> > Zhu
> >
> > Dian Fu  于2020年9月7日周一 上午10:34写道:
> >
> >> +1
> >>
> >>> 在 2020年9月3日,下午8:46,Till Rohrmann  写道:
> >>>
> >>> Hi Xintong,
> >>>
> >>> thanks for starting the vote.
> >>>
> >>> +1 for the proposal given that we find a proper name for the
> >>> different memory consumers (specifically the batch/RocksDB
> >>> consumer)
>  and
> >>> their corresponding weights.
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Thu, Sep 3, 2020 at 12:43 PM Xintong Song <
> >>> tonysong...@gmail.com>
> >> wrote:
> >>>
>  Hi devs,
> 
>  I'd like to start a voting thread on FLIP-141[1], which proposes
> >>> how
>  managed memory should be shared by various use cases within a
> >>> slot.
>  The
>  proposal has been discussed in [2].
> 
>  The vote will be open for at least 72h + weekends. I'll try to
>  close it
> >> on
>  September 8, unless there is an objection or not enough votes.
> 
>  Thank you~
> 
>  Xintong Song
> 
> 
>  [1]
> 
> 
> >>
> 
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-141%3A+Intra-Slot+Managed+Memory+Sharing#FLIP141:IntraSlotManagedMemorySharing-compatibility
> 
>  [2]
> 
> 
> >>
> 
> >>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-td44146.html
> 
> >>
> >>
> 
> >>>
> >>
> >
>
>


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-08 Thread Timo Walther

Hi Danny,

Your proposed signatures sound good to me.

fromDataStream(dataStream, Schema)
toDataStream(table, AbstractDataType)

They address all my concerns. The API would not be symmetric anymore, 
but this is fine with me. Others raised concerns about deprecating 
`fromDataStream(dataStream, Expression)`. Are they fine with this as well?


If there are no objections, I would update the FLIP with the methods 
above. Bu let me briefly summarize my thoughts on this again, so that we 
are all on the same page:

- The biggest discussion point seems the fromInsertStream/toInsertStream.
- I don’t have a strong opinion on naming, from/toDataStream would be 
fine for me. But:
- It slightly different type mappings and might break existing pipelines 
silently. This point can be neglected as the differences are only minor.
- We need a way of declaring the rowtime attribute but without declaring 
all columns again. Reduce manual schema work as much as possible.
- Both Dawid and I don’t like the current either “position based” or 
“name based” expression logic that looks like a projection but is not.
- Actually name based expressions are not necessary, since we have 
positions for all new data types.
- Schema is not suitable to influence the output type for toDataStream. 
It should be DataType.


All items are solved by Danny's suggestion.

Regards,
Timo



On 08.09.20 14:04, Danny Chan wrote:

Hi, Timo ~

"It is not about changelog mode compatibility, it is about the type
compatibility.”

For fromDataStream(dataStream, Schema), there should not be compatibility 
problem or data type inconsistency. We know the logical type from Schema and 
physical type from the dataStream itself.

For toDataStream(table, AbstractDataType), we can get the logical type from 
the table itself
and the physical type from the passed data type.

If both behavior are deterministic, what's the problem for type compatibility 
and safety?

My concern is that in most of the cases, people use the "insert stream", they 
do not need to care about
the data stream ChangelogMode, so there is no need to distinguish them from the 
APIs, an optional param is enough. If we introduces 2 new API there, people 
have to choose between them, and can fromChangelogStream()
accept an insert stream ? What is the behavior if fromInsertStream() accepts a 
changelog stream ?


"This means potentially duplicate definition of fields and their data types etc”

I agree, because table already has an underlying schema there.

Best,
Danny Chan
在 2020年9月3日 +0800 PM8:12,Timo Walther ,写道:

Hi Danny,

"if ChangelogMode.INSERT is the default, existing pipelines should be
compatible"

It is not about changelog mode compatibility, it is about the type
compatibility. The renaming to `toInsertStream` is only to have a mean
of dealing with data type inconsistencies that could break existing
pipelines.

As the FLIP describes, the following new behavior should be implemented:

- It does this by translating the TypeInformation to DataType.
- This will happen with a new TypeInfoDataTypeConverter that will no
longer produce LegacyTypeInformationType.
- All types from DataStream API should be supported by this converter.
- TupleTypeInfoBase will be translated into a proper RowType or
StructuredType.
- BigDecimals will be converted to DECIMAL(38,18) by default.
- Composite types (tuples, POJOs, rows) will be flattened by default if
they are used as top-level records (similar to the old behavior).
- The order of POJO field's is determined by the DataTypeExtractor and
must not be defined manually anymore.
- GenericTypeInfo is converted to RawType immediately by considering the
current configuration.
- A DataStream that originated from Table API will keep its DataType
information due to ExternalTypeInfo implementing DataTypeQueryable.

I would feel safer if we do this under a new method name.

"toDataStream(table, schema.bindTo(DataType))"

This is what I meant with "integrate the DataType into the Schema class
itself". Yes, we can do that if everybody is fine with it. But why
should a user specify both a schema and a data type? This means
potentially duplicate definition of fields and their data types etc.

Regards,
Timo


On 03.09.20 11:31, Danny Chan wrote:

"It is a more conservative approach to introduce that in a
new method rather than changing the existing one under the hood and
potentially break existing pipelines silently”

I like the idea actually, but if ChangelogMode.INSERT is the default, existing 
pipelines should be compatible. We can see the other kinds of ChangelogMode as 
an extension.

“for `toDataStream` users need to be
able to express whether they would prefer Row, POJO or atomic”

I think most of the cases people do not need to convert the stream to a Row or 
POJO, because the table projection always returns a flatternned internal row, 
if people did want a POJO there, how about we bind the DataType to the existing 
schema, like this

toDataStream(table, schema.bindTo(

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-08 Thread Kurt Young
Regarding #1, yes the state backend is definitely hash-based execution.
However there are some differences between
batch hash-based execution. The key difference is *random access &
read/write mixed workload". For example, by using
state backend in streaming execution, one have to mix the read and write
operations and all of them are actually random
access. But in a batch hash execution, we could divide the phases into
write and read. For example, we can build the
hash table first, with only write operations. And once the build is done,
we can start to read and trigger the user codes.
Take hash aggregation which blink planner implemented as an example, during
building phase, as long as the hash map
could fit into memory, we will update the accumulators directly in the hash
map. And once we are running out of memory,
we then fall back to sort based execution. It improves the performance a
lot if the incoming data can be processed in
memory.

Regarding #2, IIUC you are actually describing a binary format of key, not
normalized key which is used in DataSet. I will
take String for example. If we have lots of keys with length all greater
than, let's say 20. In your proposal, you will encode
the whole string in the prefix of your composed data (  + 
+  ). And when you compare
records, you will actually compare the *whole* key of the record. For
normalized key, it's fixed-length in this case, IIRC it will
take 8 bytes to represent the string. And the sorter will store the
normalized key and offset in a dedicated array. When doing
the sorting, it only sorts this *small* array. If the normalized keys are
different, you could immediately tell which is greater from
normalized keys. You only have to compare the full keys if the normalized
keys are equal and you know in this case the normalized
key couldn't represent the full key. The reason why Dataset is doing this
is it's super cache efficient by sorting the *small* array.
The idea is borrowed from this paper [1]. Let me know if I missed or
misunderstood anything.

[1] https://dl.acm.org/doi/10./615232.615237 (AlphaSort: a
cache-sensitive parallel external sort)

Best,
Kurt


On Tue, Sep 8, 2020 at 5:05 PM Dawid Wysakowicz 
wrote:

> Hey Kurt,
>
> Thank you for comments!
>
> Ad. 1 I might have missed something here, but as far as I see it is that
> using the current execution stack with regular state backends (RocksDB
> in particular if we want to have spilling capabilities) is equivalent to
> hash-based execution. I can see a different spilling state backend
> implementation in the future, but I think it is not batch specifc. Or am
> I missing something?
>
> Ad. 2 Totally agree that normalized keys are important to the
> performance. I think though TypeComparators are not a necessity to have
> that. Actually  this proposal is heading towards only ever performing
> "normalized keys" comparison. I have not included in the proposal the
> binary format which we will use for sorting (partially because I forgot,
> and partially because I thought it was too much of an implementation
> detail). Let me include it here though, as it might clear the situation
> a bit here.
>
> In DataSet, at times we have KeySelectors which extract keys based on
> field indices or names. This allows in certain situation to extract the
> key from serialized records. Compared to DataSet, in DataStream, the key
> is always described with a black-box KeySelector, or differently with a
> function which extracts a key from a deserialized record.  In turn there
> is no way to create a comparator that could compare records by
> extracting the key from a serialized record (neither with, nor without
> key normalization). We suggest that the input for the sorter will be
>
>  +  + 
>
> Without having the key prepended we would have to deserialize the record
> for every key comparison.
>
> Therefore if we agree that we perform binary comparison for keys (which
> are always prepended), it is actually equivalent to a DataSet with
> TypeComparators that support key normalization.
>
> Let me know if that is clear, or I have missed something here.
>
> Best,
>
> Dawid
>
> On 08/09/2020 03:39, Kurt Young wrote:
> > Hi Dawid, thanks for bringing this up, it's really exciting to see that
> > batch execution is introduced in DataStream. From the flip, it seems
> > we are sticking with sort based execution mode (at least for now), which
> > will sort the whole input data before any *keyed* operation is
> > executed. I have two comments here:
> >
> > 1. Do we want to introduce hash-based execution in the future? Sort is a
> > safe choice but not the best in lots of cases. IIUC we only need
> > to make sure that before the framework finishes dealing with one key, the
> > operator doesn't see any data belonging to other keys, thus
> > hash-based execution would also do the trick. Oon tricky thing the
> > framework might need to deal with is memory constraint and spilling
> > in the hash map, but Flink also has some

Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

2020-09-08 Thread Timo Walther

Hi Jark, Hi Shengkai,

"shall we push the expressions in the following Projection too?"

This is something that we should at least consider.

I also don't find a strong use case. But what I see is that we are 
merging concepts that actually can be separated. And we are executing 
the same code twice. Regardless of what kind of code is executed (simple 
timestamp casting or more complex stuff).


In any case, we cannot model ingestion time with the merged interfaces. 
Because the computed timestamp column is evaluated twice. If a function 
in the computed column is not deterministic, a watermark_rowtime != 
projection_rowtime mismatch is very hard to debug.


Regards,
Timo



On 08.09.20 14:11, Shengkai Fang wrote:

Hi Timo and Jark.Thanks for your replies.

Maybe I don't explain clearly in doc. I think the main reason behind is we
have no means to distinguish the calc in LogicalProject. Let me give you an
example to illustrate the reason. Assume we have 2 cases:


case 1:

create table MyTable (

int a,

int b

) with (

...

)


we use sql "select a, b, a+b as c from MyTable" to get the results.


and


case 2:

create table MyTableWithComputedColumn (

a int,

b int,

c as a + b

) with (

...

)


we use sql "select a, b, c from MyTableWithComputedColumn" to get the
results.


When coming to planner, the two sqls will have the same plan, which means
we will also push calculation from query to scan if we support computed
column push down.


As a supplement of Jark's response, currently
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy)
uses WatermarkStrategy to register watermark generator supplier. I think
it's ok to use WatermarkStrategy directly because FLIP-126 has been
finished.



Jark Wu  于2020年9月8日周二 下午7:38写道:


Hi Timo,

Regarding "pushing other computed columns into source, e.g. encrypted
records/columns, performing checksum checks, reading metadata etc.",
I'm not sure about this.
1. the planner don't know which computed column should be pushed into
source
2. it seems that we can't improve performances if we pushdown complex logic
into source, we still need to calculate them anyway.
3. the computed column is a regular expression, if the computed column
should be pushed down, then shall we push the expressions in the following
Projection too?
 If yes, then the name of "SupportsComputedColumnPushDown" might be not
correct.
4. regarding reading metadata, according to FLIP-107, we don't use the
existing SupportsComputedColumnPushDown, but a new interface.

Therefore, I don't find a strong use case that needs this interface so far.

Best,
Jark




On Tue, 8 Sep 2020 at 17:13, Timo Walther  wrote:


Hi Shengkai,

first of I would not consider the section Problems of
SupportsWatermarkPushDown" as a "problem". It was planned to update the
WatermarkProvider once the interfaces are ready. See the comment in
WatermarkProvider:

// marker interface that will be filled after FLIP-126:
// WatermarkGenerator getWatermarkGenerator();

So far we had no sources that actually implement WatermarkStrategy.

Second, for generating watermarks I don't see a problem in merging the
two mentioned interfaces SupportsWatermarkPushDown and
SupportsComputedColumnPushDown into one. The descibed design sounds
reasonable to me and the impact on performance should not be too large.

However, by merging these two interfaces we are also merging two
completely separate concepts. Computed columns are not always used for
generating a rowtime or watermark. Users can and certainly will
implement more complex logic in there. One example could be decrypting
encrypted records/columns, performing checksum checks, reading metadata
etc.

So in any case we should still provide two interfaces:

SupportsWatermarkPushDown (functionality of computed columns +

watermarks)



SupportsComputedColumnPushDown (functionality of computed columns only)

I'm fine with such a design, but it is also confusing for implementers
that SupportsWatermarkPushDown includes the functionality of the other
interface.

What do you think?

Regards,
Timo


On 08.09.20 04:32, Jark Wu wrote:

Thanks to Shengkai for summarizing the problems on the FLIP-95

interfaces

and solutions.

I think the new proposal, i.e. only pushing the "WatermarkStrategy" is

much

cleaner and easier to develop than before.
So I'm +1 to the proposal.

Best,
Jark

On Sat, 5 Sep 2020 at 13:44, Shengkai Fang  wrote:


Hi, all. It seems the format is not normal. So I add a google doc in
link[1]. This discussion is about the interfaces in FLIP-95:  New

Table

Source And Table Sink and propose to merge two interfaces
SupportsWatermarkPushDown and SupportsComputedColumnPushDown.

I am looking forward to any opinions and suggestions from the

community.


Regards,
Shengkai

[1]





https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#


Shengkai Fang  于2020年9月4日周五 下午2:58写道:


 Hi, all. Currently, 

Re: [DISCUSS] FLIP-33: Standardize connector metrics

2020-09-08 Thread Becket Qin
On Tue, Sep 8, 2020 at 6:55 PM Konstantin Knauf  wrote:

> Hi Becket,
>
> Thank you for picking up this FLIP. I have a few questions:
>
> * two thoughts on naming:
>* idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" has
> been introduced in https://issues.apache.org/jira/browse/FLINK-16864. They
> have a similar name, but different definitions of idleness,
> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is
> backpressured. Can we make it clearer that these two metrics mean different
> things?
>
>   * "current(Fetch)Latency" I am wondering if "eventTimeLag(Before|After)"
> is more descriptive/clear. What do others think?
>
>   * Current(Fetch)Latency implies that the timestamps are directly
> extracted in the source connector, right? Will this be the default for
> FLIP-27 sources anyway?
>
> * Does FLIP-33 also include the implementation of these metrics (to the
> extent possible) for all connectors currently available in Apache Flink or
> is the "per-connector implementation" out-of-scope?
>
> Thanks,
>
> Konstantin
>
>
>
>
>
> On Fri, Sep 4, 2020 at 4:56 PM Becket Qin  wrote:
>
> > Hi all,
> >
> > To complete the Source refactoring work, I'd like to revive this
> > discussion. Since the mail thread has been dormant for more than a year,
> > just to recap the motivation of the FLIP:
> >
> > 1. The FLIP proposes to standardize the connector metrics by giving
> > guidance on the metric specifications, including the name, type and
> meaning
> > of the metrics.
> > 2. It is OK for a connector to not emit some of the metrics in the metric
> > guidance, but if a metric of the same semantic is emitted, the
> > implementation should follow the guidance.
> > 3. It is OK for a connector to emit more metrics than what are listed in
> > the FLIP. This includes having an alias for a metric specified in the
> FLIP.
> > 4. We will implement some of the metrics out of the box in the default
> > implementation of FLIP-27, as long as it is applicable.
> >
> > The FLIP wiki is following:
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33
> > %3A+Standardize+Connector+Metrics
> >
> > Any thoughts?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Fri, Jun 14, 2019 at 2:29 PM Piotr Nowojski 
> > wrote:
> >
> > > > we will need to revisit the convention list and adjust them
> accordingly
> > > when FLIP-27 is ready
> > >
> > >
> > > Yes, this sounds good :)
> > >
> > > Piotrek
> > >
> > > > On 13 Jun 2019, at 13:35, Becket Qin  wrote:
> > > >
> > > > Hi Piotr,
> > > >
> > > > That's great to know. Chances are that we will need to revisit the
> > > > convention list and adjust them accordingly when FLIP-27 is ready, At
> > > that
> > > > point we can mark some of the metrics as available by default for
> > > > connectors implementing the new interface.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Thu, Jun 13, 2019 at 3:51 PM Piotr Nowojski 
> > > wrote:
> > > >
> > > >> Thanks for driving this. I’ve just noticed one small thing. With new
> > > >> SourceReader interface Flink will be able to provide `idleTime`
> metric
> > > >> automatically.
> > > >>
> > > >> Piotrek
> > > >>
> > > >>> On 13 Jun 2019, at 03:30, Becket Qin  wrote:
> > > >>>
> > > >>> Thanks all for the feedback and discussion.
> > > >>>
> > > >>> Since there wasn't any concern raised, I've started the voting
> thread
> > > for
> > > >>> this FLIP, but please feel free to continue the discussion here if
> > you
> > > >>> think something still needs to be addressed.
> > > >>>
> > > >>> Thanks,
> > > >>>
> > > >>> Jiangjie (Becket) Qin
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Mon, Jun 10, 2019 at 9:10 AM Becket Qin 
> > > wrote:
> > > >>>
> > >  Hi Piotr,
> > > 
> > >  Thanks for the comments. Yes, you are right. Users will have to
> look
> > > at
> > >  other metrics to decide whether the pipeline is healthy or not in
> > the
> > > >> first
> > >  place before they can use the time-based metric to fix the
> > bottleneck.
> > > 
> > >  I agree that once we have FLIP-27 ready, some of the metrics can
> > just
> > > be
> > >  reported by the abstract implementation.
> > > 
> > >  I've updated FLIP-33 wiki page to add the pendingBytes and
> > > >> pendingRecords
> > >  metric. Please let me know if you have any concern over the
> updated
> > > >> metric
> > >  convention proposal.
> > > 
> > >  @Chesnay Schepler  @Stephan Ewen
> > >   will you also have time to take a look at
> > the
> > >  proposed metric convention? If there is no further concern I'll
> > start
> > > a
> > >  voting thread for this FLIP in two days.
> > > 
> > >  Thanks,
> > > 
> > >  Jiangjie (Becket) Qin
> > > 
> > > 
> > > 
> > >  On Wed, Jun 5, 2019 at 6:54 PM Piotr Nowojski <
> pi...@ververica.com>
> > > >> wrote:
> > > 
> > > > Hi Becket,
> > > >
> > > > Thanks for the answer :)
> 

Re: Merge SupportsComputedColumnPushDown and SupportsWatermarkPushDown

2020-09-08 Thread Shengkai Fang
Hi, Timo.
I agree with you that the concepts Watermark and ComputedColumn should be
separated. However, we are merging the interface SupportsCalcPushDown and
SupportsWatermarkPushDown actually. The concept computed column has
disappeared in optimization.
As for the drawback you mentiond, I have already given a solution to solve.
We can place the calculated computed timestamp column on StreamRecord and
replace the calculation in LogicalProject with a call of
StreamRecordTimestampSqlFunction which will read the timestamp on
StreamRecord.


Regards,
Shengkai.

Timo Walther  于2020年9月8日周二 下午8:51写道:

> Hi Jark, Hi Shengkai,
>
> "shall we push the expressions in the following Projection too?"
>
> This is something that we should at least consider.
>
> I also don't find a strong use case. But what I see is that we are
> merging concepts that actually can be separated. And we are executing
> the same code twice. Regardless of what kind of code is executed (simple
> timestamp casting or more complex stuff).
>
> In any case, we cannot model ingestion time with the merged interfaces.
> Because the computed timestamp column is evaluated twice. If a function
> in the computed column is not deterministic, a watermark_rowtime !=
> projection_rowtime mismatch is very hard to debug.
>
> Regards,
> Timo
>
>
>
> On 08.09.20 14:11, Shengkai Fang wrote:
> > Hi Timo and Jark.Thanks for your replies.
> >
> > Maybe I don't explain clearly in doc. I think the main reason behind is
> we
> > have no means to distinguish the calc in LogicalProject. Let me give you
> an
> > example to illustrate the reason. Assume we have 2 cases:
> >
> >
> > case 1:
> >
> > create table MyTable (
> >
> > int a,
> >
> > int b
> >
> > ) with (
> >
> > ...
> >
> > )
> >
> >
> > we use sql "select a, b, a+b as c from MyTable" to get the results.
> >
> >
> > and
> >
> >
> > case 2:
> >
> > create table MyTableWithComputedColumn (
> >
> > a int,
> >
> > b int,
> >
> > c as a + b
> >
> > ) with (
> >
> > ...
> >
> > )
> >
> >
> > we use sql "select a, b, c from MyTableWithComputedColumn" to get the
> > results.
> >
> >
> > When coming to planner, the two sqls will have the same plan, which means
> > we will also push calculation from query to scan if we support computed
> > column push down.
> >
> >
> > As a supplement of Jark's response, currently
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy)
> > uses WatermarkStrategy to register watermark generator supplier. I think
> > it's ok to use WatermarkStrategy directly because FLIP-126 has been
> > finished.
> >
> >
> >
> > Jark Wu  于2020年9月8日周二 下午7:38写道:
> >
> >> Hi Timo,
> >>
> >> Regarding "pushing other computed columns into source, e.g. encrypted
> >> records/columns, performing checksum checks, reading metadata etc.",
> >> I'm not sure about this.
> >> 1. the planner don't know which computed column should be pushed into
> >> source
> >> 2. it seems that we can't improve performances if we pushdown complex
> logic
> >> into source, we still need to calculate them anyway.
> >> 3. the computed column is a regular expression, if the computed column
> >> should be pushed down, then shall we push the expressions in the
> following
> >> Projection too?
> >>  If yes, then the name of "SupportsComputedColumnPushDown" might be
> not
> >> correct.
> >> 4. regarding reading metadata, according to FLIP-107, we don't use the
> >> existing SupportsComputedColumnPushDown, but a new interface.
> >>
> >> Therefore, I don't find a strong use case that needs this interface so
> far.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >>
> >>
> >> On Tue, 8 Sep 2020 at 17:13, Timo Walther  wrote:
> >>
> >>> Hi Shengkai,
> >>>
> >>> first of I would not consider the section Problems of
> >>> SupportsWatermarkPushDown" as a "problem". It was planned to update the
> >>> WatermarkProvider once the interfaces are ready. See the comment in
> >>> WatermarkProvider:
> >>>
> >>> // marker interface that will be filled after FLIP-126:
> >>> // WatermarkGenerator getWatermarkGenerator();
> >>>
> >>> So far we had no sources that actually implement WatermarkStrategy.
> >>>
> >>> Second, for generating watermarks I don't see a problem in merging the
> >>> two mentioned interfaces SupportsWatermarkPushDown and
> >>> SupportsComputedColumnPushDown into one. The descibed design sounds
> >>> reasonable to me and the impact on performance should not be too large.
> >>>
> >>> However, by merging these two interfaces we are also merging two
> >>> completely separate concepts. Computed columns are not always used for
> >>> generating a rowtime or watermark. Users can and certainly will
> >>> implement more complex logic in there. One example could be decrypting
> >>> encrypted records/columns, performing checksum checks, reading metadata
> >>> etc.
> >>>
> >>> So in any case we should still provide two interfaces:
> >>>
> >>> SupportsWatermarkPushDown (functionality of computed colum

Re: [DISCUSS] FLIP-33: Standardize connector metrics

2020-09-08 Thread Becket Qin
Hey Konstantin,

Thanks for the feedback and suggestions. Please see the reply below.

* idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" has
> been introduced in https://issues.apache.org/jira/browse/FLINK-16864. They
> have a similar name, but different definitions of idleness,
> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is
> backpressured. Can we make it clearer that these two metrics mean different
> things?


That is a good point. I did not notice this metric earlier. It seems that
both metrics are useful to the users. One tells them how busy the source is
and how much more throughput the source can handle. The other tells the
users how long since the source has seen the last record, which is useful
for debugging. I'll update the FLIP to make it clear.

  * "current(Fetch)Latency" I am wondering if "eventTimeLag(Before|After)"
> is more descriptive/clear. What do others think?


I am quite open to the ideas on these names. In fact I also feel
"current(Fetch)Latency" are not super intuitive. So it would be great if we
can have better names.

  * Current(Fetch)Latency implies that the timestamps are directly
> extracted in the source connector, right? Will this be the default for
> FLIP-27 sources anyway?


The "currentFetchLatency" will probably be reported by each source
implementation, because the data fetching is done by SplitReaders and there
is no base implementation. The "currentLatency", on the other hand, can be
reported by the SourceReader base implementation. However, since developers
can actually implement their own source connector without using our base
implementation, these metric guidance are still useful for the connector
developers in that case.

* Does FLIP-33 also include the implementation of these metrics (to the
> extent possible) for all connectors currently available in Apache Flink or
> is the "per-connector implementation" out-of-scope?


FLIP-33 itself does not specify any implementation of those metrics. But
the connectors we provide in Apache Flink will follow the guidance of
FLIP-33 to emit those metrics when applicable. Maybe We can have some
public static Strings defined for the metric names to help other connector
developers follow the same guidance. I can also add that to the public
interface section of the FLIP if we decide to do that.

Thanks,

Jiangjie (Becket) Qin

On Tue, Sep 8, 2020 at 9:02 PM Becket Qin  wrote:

>
>
> On Tue, Sep 8, 2020 at 6:55 PM Konstantin Knauf  wrote:
>
>> Hi Becket,
>>
>> Thank you for picking up this FLIP. I have a few questions:
>>
>> * two thoughts on naming:
>>* idleTime: In the meantime, a similar metric "idleTimeMsPerSecond" has
>> been introduced in https://issues.apache.org/jira/browse/FLINK-16864.
>> They
>> have a similar name, but different definitions of idleness,
>> e.g. "idleTimeMsPerSecond" considers the SourceTask idle, when it is
>> backpressured. Can we make it clearer that these two metrics mean
>> different
>> things?
>>
>>   * "current(Fetch)Latency" I am wondering if "eventTimeLag(Before|After)"
>> is more descriptive/clear. What do others think?
>>
>>   * Current(Fetch)Latency implies that the timestamps are directly
>> extracted in the source connector, right? Will this be the default for
>> FLIP-27 sources anyway?
>>
>> * Does FLIP-33 also include the implementation of these metrics (to the
>> extent possible) for all connectors currently available in Apache Flink or
>> is the "per-connector implementation" out-of-scope?
>>
>> Thanks,
>>
>> Konstantin
>>
>>
>>
>>
>>
>> On Fri, Sep 4, 2020 at 4:56 PM Becket Qin  wrote:
>>
>> > Hi all,
>> >
>> > To complete the Source refactoring work, I'd like to revive this
>> > discussion. Since the mail thread has been dormant for more than a year,
>> > just to recap the motivation of the FLIP:
>> >
>> > 1. The FLIP proposes to standardize the connector metrics by giving
>> > guidance on the metric specifications, including the name, type and
>> meaning
>> > of the metrics.
>> > 2. It is OK for a connector to not emit some of the metrics in the
>> metric
>> > guidance, but if a metric of the same semantic is emitted, the
>> > implementation should follow the guidance.
>> > 3. It is OK for a connector to emit more metrics than what are listed in
>> > the FLIP. This includes having an alias for a metric specified in the
>> FLIP.
>> > 4. We will implement some of the metrics out of the box in the default
>> > implementation of FLIP-27, as long as it is applicable.
>> >
>> > The FLIP wiki is following:
>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33
>> > %3A+Standardize+Connector+Metrics
>> >
>> > Any thoughts?
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> > On Fri, Jun 14, 2019 at 2:29 PM Piotr Nowojski 
>> > wrote:
>> >
>> > > > we will need to revisit the convention list and adjust them
>> accordingly
>> > > when FLIP-27 is ready
>> > >
>> > >
>> > > Yes, this sounds good :)
>> > >
>> > > Piotrek
>> > >
>

[jira] [Created] (FLINK-19164) Release scripts break other dependency versions unintentionally

2020-09-08 Thread Serhat Soydan (Jira)
Serhat Soydan created FLINK-19164:
-

 Summary: Release scripts break other dependency versions 
unintentionally
 Key: FLINK-19164
 URL: https://issues.apache.org/jira/browse/FLINK-19164
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Scripts, Release System
Reporter: Serhat Soydan


All the scripts below has a line to change the old version to new version in 
pom files.

[https://github.com/apache/flink/blob/master/tools/change-version.sh#L31]

[https://github.com/apache/flink/blob/master/tools/releasing/create_release_branch.sh#L60]

[https://github.com/apache/flink/blob/master/tools/releasing/update_branch_version.sh#L52]

 

It works like find & replace so it is prone to unintentional errors. Any 
dependency with a version equals to "old version" might be automatically 
changed to "new version". See below to see how to produce a similar case. 

 

+How to re-produce the bug:+
 * Clone/Fork Flink repo and for example checkout version v*1.11.1* 
 * Apply any changes you need
 * Run "create_release_branch.sh" script with OLD_VERSION=*1.11.1* 
NEW_VERSION={color:#de350b}*1.12.0*{color}
 ** In parent pom.xml, an auto find&replace of maven-dependency-analyzer 
version will be done automatically and *unintentionally* which will break the 
build.

 

org.apache.maven.shared
maven-dependency-analyzer
*1.11.1*

 

org.apache.maven.shared
maven-dependency-analyzer
{color:#de350b}*1.12.0*{color}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Leonard Xu
HI, Timo

Thanks for driving this FLIP.

Sorry but I have a concern about Writing metadata via DynamicTableSink section:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED,
  headers AS CAST(SYSTEM_METADATA("headers") AS MAP) PERSISTED
) WITH (
  ...
)
An insert statement could look like:

INSERT INTO kafka_table VALUES (
  (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
)

The proposed INERT syntax does not make sense to me, because it contains 
computed(generated) column.
Both SQL server and Postgresql do not allow to insert value to computed columns 
even they are persisted, this boke the generated column semantics and may 
confuse user much. 

For SQL server computed column[1]:
> column_name AS computed_column_expression [ PERSISTED [ NOT NULL ] ]...  
> NOTE: A computed column cannot be the target of an INSERT or UPDATE statement.

For Postgresql generated column[2]:
>  height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) STORED
> NOTE: A generated column cannot be written to directly. In INSERT or UPDATE 
> commands, a value cannot be specified for a generated column, but the keyword 
> DEFAULT may be specified.

It shouldn't be allowed to set/update value for generated column after lookup 
the SQL 2016:
>  ::=
> INSERT INTO  
>
> If  CTTVC is specified, then 
> every  value constructor element> simply contained in CTTVC whose positionally 
> corresponding 
> in  references a column of which some underlying column 
> is a generated column shall
> be a .
> A  specifies the default value of some associated item.


[1] 
https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
 

 
[2] https://www.postgresql.org/docs/12/ddl-generated-columns.html 
 

> 在 2020年9月8日,17:31,Timo Walther  写道:
> 
> Hi Jark,
> 
> according to Flink's and Calcite's casting definition in [1][2] TIMESTAMP 
> WITH LOCAL TIME ZONE should be castable from BIGINT. If not, we will make it 
> possible ;-)
> 
> I'm aware of DeserializationSchema.getProducedType but I think that this 
> method is actually misplaced. The type should rather be passed to the source 
> itself.
> 
> For our Kafka SQL source, we will also not use this method because the Kafka 
> source will add own metadata in addition to the DeserializationSchema. So 
> DeserializationSchema.getProducedType will never be read.
> 
> For now I suggest to leave out the `DataType` from 
> DecodingFormat.applyReadableMetadata. Also because the format's physical type 
> is passed later in `createRuntimeDecoder`. If necessary, it can be computed 
> manually by consumedType + metadata types. We will provide a metadata utility 
> class for that.
> 
> Regards,
> Timo
> 
> 
> [1] 
> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
> [2] 
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
> 
> 
> On 08.09.20 10:52, Jark Wu wrote:
>> Hi Timo,
>> The updated CAST SYSTEM_METADATA behavior sounds good to me. I just noticed
>> that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL TIME ZONE".
>> So maybe we need to support this, or use "TIMESTAMP(3) WITH LOCAL TIME
>> ZONE" as the defined type of Kafka timestamp? I think this makes sense,
>> because it represents the milli-seconds since epoch.
>> Regarding "DeserializationSchema doesn't need TypeInfo", I don't think so.
>> The DeserializationSchema implements ResultTypeQueryable, thus the
>> implementation needs to return an output TypeInfo.
>> Besides, FlinkKafkaConsumer also
>> calls DeserializationSchema.getProducedType as the produced type of the
>> source function [1].
>> Best,
>> Jark
>> [1]:
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
>> On Tue, 8 Sep 2020 at 16:35, Timo Walther  wrote:
>>> Hi everyone,
>>> 
>>> I updated the FLIP again and hope that I could address the mentioned
>>> concerns.
>>> 
>>> @Leonard: Thanks for the explanation. I wasn't aware that ts_ms and
>>> source.ts_ms have different semantics. I updated the FLIP and expose the
>>> most commonly used properties separately. So frequently used properties
>>> are not hidden in the MAP anymore:
>>> 
>>> debezium-json.ingestion-timestamp
>>> debezium-json.source.timestamp
>>> debezium-json.source.database
>>> debezium-json.source.schema
>>> debezium-json.source.table
>>> 
>>> However, since other properties depend on the used connector/vendor, the
>>> remaining options are stored in:
>>> 
>>> debezium-j

Re: 1.11.1 Hive connector doesn't work with Hive 1.0 or 1.1

2020-09-08 Thread Rui Li
Verified the issue was related to the building environment. The published
jar is good. Thanks Dian for the help.

On Tue, Sep 8, 2020 at 7:49 PM Rui Li  wrote:

> Thanks Dian. The script looks all right to me. I'll double check with the
> user whether the issue is related to his building environment.
>
> On Tue, Sep 8, 2020 at 7:36 PM Dian Fu  wrote:
>
>> Hi Rui,
>>
>> The maven artifacts are built using the
>> script: releasing/deploy_staging_jars.sh [1].
>>
>> Regards,
>> Dian
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release
>>
>> 在 2020年9月8日,下午7:19,Rui Li  写道:
>>
>> maven artifacts
>>
>>
>>
>
> --
> Best regards!
> Rui Li
>


-- 
Best regards!
Rui Li


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-08 Thread Dawid Wysakowicz
Ad. 1

Yes, you are right in principle.

Let me though clarify my proposal a bit. The proposed sort-style
execution aims at a generic KeyedProcessFunction were all the
"aggregations" are actually performed in the user code. It tries to
improve the performance by actually removing the need to use RocksDB e.g.:

    private static final class Summer
    extends KeyedProcessFunction,
Tuple2> {

        

    @Override
    public void processElement(
    Tuple2 value,
    Context ctx,
    Collector> out) throws Exception {
    if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) {
    ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
    timerRegistered.update(true);
    }
    Integer v = counter.value();
    Integer incomingValue = value.f1;
    if (v != null) {
    v += incomingValue;
    } else {
    v = incomingValue;
    }
    counter.update(v);
    }

        

   }

Therefore I don't think the first part of your reply with separating the
write and read workload applies here. We do not aim to create a
competing API with the Table API. We think operations such as joins or
analytical aggregations should be performed in Table API.

As for the second part I agree it would be nice to fall back to the
sorting approach only if a certain threshold of memory in a State
Backend is used. This has some problems though. We would need a way to
estimate the size of the occupied memory to tell when the threshold is
reached. That is not easily doable by default e.g. in a
MemoryStateBackend, as we do not serialize the values in the state
backend by default. We would have to add that, but this would add the
overhead of the serialization.

This proposal aims at the cases where we do have a large state that will
not fit into the memory and without the change users are forced to use
RocksDB. If the state fits in memory I agree it will be better to do
hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
think it is important to give users the choice to use one or the other
approach. We might discuss which approach should be the default for
RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
user configured state backend or sorting-based with a single key at a
time backend. Moreover we could think if we should let users choose the
sort vs hash "state backend" per operator. Would that suffice?

Ad. 2

I still think we can just use the first X bytes of the serialized form
as the normalized key and fallback to comparing full keys on clashes. It
is because we are actually not interested in a logical order, but we
care only about the "grouping" aspect of the sorting. Therefore I think
its enough to compare only parts of the full key as the normalized key.

Thanks again for the really nice and thorough feedback!

Best,

Dawid

On 08/09/2020 14:47, Kurt Young wrote:
> Regarding #1, yes the state backend is definitely hash-based execution.
> However there are some differences between
> batch hash-based execution. The key difference is *random access &
> read/write mixed workload". For example, by using
> state backend in streaming execution, one have to mix the read and write
> operations and all of them are actually random
> access. But in a batch hash execution, we could divide the phases into
> write and read. For example, we can build the
> hash table first, with only write operations. And once the build is done,
> we can start to read and trigger the user codes.
> Take hash aggregation which blink planner implemented as an example, during
> building phase, as long as the hash map
> could fit into memory, we will update the accumulators directly in the hash
> map. And once we are running out of memory,
> we then fall back to sort based execution. It improves the performance a
> lot if the incoming data can be processed in
> memory.
>
> Regarding #2, IIUC you are actually describing a binary format of key, not
> normalized key which is used in DataSet. I will
> take String for example. If we have lots of keys with length all greater
> than, let's say 20. In your proposal, you will encode
> the whole string in the prefix of your composed data (  + 
> +  ). And when you compare
> records, you will actually compare the *whole* key of the record. For
> normalized key, it's fixed-length in this case, IIRC it will
> take 8 bytes to represent the string. And the sorter will store the
> normalized key and offset in a dedicated array. When doing
> the sorting, it only sorts this *small* array. If the normalized keys are
> different, you could immediately tell which is greater from
> normalized keys. You only have to compare the full keys if the normalized
> keys are equal and you know in this case the normalized
> key couldn't represent the full key. The reason why Dataset is doing this
> is it's super cache efficient 

Re: Flink stateful functions : compensating callback to invoked functions on a timeout

2020-09-08 Thread Igal Shilman
Hi, Thanks for posting this interesting question, and welcome to StateFun!
:-)

The first thing that I would like to mention is that, if your original
motivation for that scenario is a concern of a transient failures such as:
- did function Y ever received a message sent by function X ?
- did sending a message failed?
- did the target function is there to accept a message sent to it?
- did the order of message got mixed up?
- etc'

Then, StateFun eliminates all of these problems and a whole class of
transient errors that otherwise you would have to deal with by yourself in
your business logic (like retries, backoffs, service discovery etc').

Now if your motivating scenario is not about transient errors but more
about transactional workflows, then as Dawid mentioned you would have to
implement
this in your application logic. I think that the way you have described the
flow should map directly to a coordinating function (per flow instance)
that keeps track of results/timeouts in its internal state.

Here is a sketch:

1. A Flow Coordinator Function - it would be invoked with the input
necessary to kick off a flow. It would start invoking the relevant
functions (as defined by the flow's DAG) and would keep an internal state
indicating
what functions (addresses) were invoked and their completion statues.
When the flow completes successfully the coordinator can safely discard its
state.
In any case that the coordinator decides to abort the flow (an internal
timeout / an external message / etc') it would have to check its internal
state and kick off a compensating workflow (sending a special message to
the already succeed/in progress functions)

2. Each function in the flow has to accept a message from the coordinator,
in turn, and reply with either a success or a failure.

Let me know if you have any followup/clarifying questions,
Good luck!
Igal.

On Mon, Sep 7, 2020 at 6:17 PM Dawid Wysakowicz 
wrote:

> Hi,
>
> I am not an expert on Stateful functions, but I doubt there is something
> in particular that would solve you use case. I think you need to
> implement that in the user space.
>
> You will need some way of keeping track of which state changes come from
> which originating event and then have some transactional protocol where
> you could commit or revoke the state on the time out.
>
> I am cc'ing Gordon and Igal as well, who work on the Stateful functions.
> They might have some other ideas or pointers.
>
> Best,
>
> Dawid
>
> On 07/09/2020 08:43, Mazen Ezzeddine wrote:
> > Hi all,
> >
> > I am implementing a use case in Flink stateful functions. My
> specification
> > highlights that starting from a *stateful function f* a business workflow
> > (in other words a group of stateful functions f1, f2, … fn are called
> either
> > sequentially or in parallel or both ). Stateful function f waits for a
> > result to be returned to update a local state, it as well starts a
> timeout
> > callback i.e. a message to itself. At timeout, f checks if the local
> state
> > is updated (it has received a result), if this is the case life is good.
> >
> > However, if at timeout f discovers that it has not received a result
> yet, it
> > has to launch a compensating workflow to undo any changes that /stateful
> > functions f1, f2, … fn/ might have received.
> >
> > Does Flink stateful functions framework support such as a design
> pattern/use
> > case, or it should be implemented at the application level? What is the
> > simplest design to achieve such a solution? For instance, how to know
> what
> > functions of the workflow stateful functions f1, f2, … fn were affected
> by
> > the timedout invocation (where the control flow has been timed out)? How
> > does Flink sateful functions and the concept of* integrated messaging and
> > state* facilitate such a pattern?
> >
> > Thank you.
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
>


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Timo Walther

Hi Leonard,

the only alternative I see is that we introduce a concept that is 
completely different to computed columns. This is also mentioned in the 
rejected alternative section of the FLIP. Something like:


CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp INT SYSTEM_METADATA("timestamp") PERSISTED,
headers MAP SYSTEM_METADATA("headers") PERSISTED
) WITH (
   ...
)

This way we would avoid confusion at all and can easily map columns to 
metadata columns. The disadvantage is that users cannot call UDFs or 
parse timestamps. This would need to be done in a real computed column.


I'm happy about better alternatives.

Regards,
Timo


On 08.09.20 15:37, Leonard Xu wrote:

HI, Timo

Thanks for driving this FLIP.

Sorry but I have a concern about Writing metadata via DynamicTableSink section:

CREATE TABLE kafka_table (
   id BIGINT,
   name STRING,
   timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED,
   headers AS CAST(SYSTEM_METADATA("headers") AS MAP) PERSISTED
) WITH (
   ...
)
An insert statement could look like:

INSERT INTO kafka_table VALUES (
   (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
)

The proposed INERT syntax does not make sense to me, because it contains 
computed(generated) column.
Both SQL server and Postgresql do not allow to insert value to computed columns 
even they are persisted, this boke the generated column semantics and may 
confuse user much.

For SQL server computed column[1]:

column_name AS computed_column_expression [ PERSISTED [ NOT NULL ] ]...
NOTE: A computed column cannot be the target of an INSERT or UPDATE statement.


For Postgresql generated column[2]:

  height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) STORED
NOTE: A generated column cannot be written to directly. In INSERT or UPDATE 
commands, a value cannot be specified for a generated column, but the keyword 
DEFAULT may be specified.


It shouldn't be allowed to set/update value for generated column after lookup 
the SQL 2016:

 ::=
INSERT INTO  

If  CTTVC is specified, then every 
 simply contained in CTTVC whose positionally corresponding 

in  references a column of which some underlying column is 
a generated column shall
be a .
A  specifies the default value of some associated item.



[1] 
https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
 

[2] https://www.postgresql.org/docs/12/ddl-generated-columns.html 



在 2020年9月8日,17:31,Timo Walther  写道:

Hi Jark,

according to Flink's and Calcite's casting definition in [1][2] TIMESTAMP WITH 
LOCAL TIME ZONE should be castable from BIGINT. If not, we will make it 
possible ;-)

I'm aware of DeserializationSchema.getProducedType but I think that this method 
is actually misplaced. The type should rather be passed to the source itself.

For our Kafka SQL source, we will also not use this method because the Kafka 
source will add own metadata in addition to the DeserializationSchema. So 
DeserializationSchema.getProducedType will never be read.

For now I suggest to leave out the `DataType` from 
DecodingFormat.applyReadableMetadata. Also because the format's physical type 
is passed later in `createRuntimeDecoder`. If necessary, it can be computed 
manually by consumedType + metadata types. We will provide a metadata utility 
class for that.

Regards,
Timo


[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
[2] 
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254


On 08.09.20 10:52, Jark Wu wrote:

Hi Timo,
The updated CAST SYSTEM_METADATA behavior sounds good to me. I just noticed
that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL TIME ZONE".
So maybe we need to support this, or use "TIMESTAMP(3) WITH LOCAL TIME
ZONE" as the defined type of Kafka timestamp? I think this makes sense,
because it represents the milli-seconds since epoch.
Regarding "DeserializationSchema doesn't need TypeInfo", I don't think so.
The DeserializationSchema implements ResultTypeQueryable, thus the
implementation needs to return an output TypeInfo.
Besides, FlinkKafkaConsumer also
calls DeserializationSchema.getProducedType as the produced type of the
source function [1].
Best,
Jark
[1]:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
On Tue, 8 Sep 2020 at 16:35, Timo Walther  wrote:

Hi everyone,

I updated the FLIP again and hope that I could address the mentioned
concerns.

@Leonard: Thanks for the explanation. I wasn't aware

[DISCUSS] FLIP-142: Disentangle StateBackends from Checkpointing

2020-09-08 Thread Seth Wiesman
Hi Devs,

I'd like to propose an update to how state backends and checkpoint storage
are configured to help users better understand Flink.

Apache Flink's durability story is a mystery to many users. One of the most
common recurring questions from users comes from not understanding the
relationship between state, state backends, and snapshots. Some of this
confusion can be abated with learning material but the question is so
pervasive that we believe Flink’s user APIs should be better communicate
what different components are responsible for.


https://cwiki.apache.org/confluence/display/FLINK/FLIP-142%3A+Disentangle+StateBackends+from+Checkpointing


I look forward to a healthy discussion.


Seth


[jira] [Created] (FLINK-19165) Clean up the UnilateralSortMerger

2020-09-08 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-19165:


 Summary: Clean up the UnilateralSortMerger
 Key: FLINK-19165
 URL: https://issues.apache.org/jira/browse/FLINK-19165
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.12.0


This is a preparation step for 
[FLIP-140|https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+bounded+style+execution+for+keyed+streams].
 The purpose of the task is two-folds:
* break down the implementation into a more composable pieces
* introduce a way to produce records in a push-based manner instead of 
pull-based with additional reading thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-33: Standardize connector metrics

2020-09-08 Thread Becket Qin
Hi Stephan,

Thanks for the input. Just a few more clarifications / questions.

*Num Bytes / Records Metrics*

1. At this point, the *numRecordsIn(Rate)* metrics exist in both
OperatorIOMetricGroup and TaskIOMetricGroup. I did not find
*numRecordsIn(Rate)* in the TaskIOMetricGroup updated anywhere other than
in the unit tests. Am I missing something?

2. *numBytesIn(Rate)* metrics only exist in TaskIOMetricGroup. At this
point, the SourceReaders only has access to a SourceReaderContext which
provides an OperatorMetricGroup. So it seems that the connector developers
are not able to update the *numBytesIn(Rate). *With the multiple Source
chaining support, it is possible that there are multiple Sources are in the
same task. So it looks that we need to add *numBytesIn(Rate)* to the
operator metrics as well.


*Current (Fetch) Latency*

*currentFetchLatency* helps clearly tell whether the latency is caused by
Flink or not. Backpressure is not the only reason that we see fetch
latency. Even if there is no back pressure, the records may have passed a
long pipeline before they entered Flink. For example, say the *currentLatency
*is 10 seconds and there is no backpressure. Does that mean the record
spent 10 seconds in the Source operator? If not, how much did Flink
contribute to that 10 seconds of latency? These questions are frequently
asked and hard to tell without the fetch latency.

For "currentFetchLatency", we would need to understand timestamps before
> the records are decoded. That is only possible for some sources, where the
> client gives us the records in a (partially) decoded from already (like
> Kafka). Then, some work has been done between the fetch time and the time
> we update the metric already, so it is already a bit closer to the
> "currentFetchLatency". I think following this train of thought, there is
> diminished benefit from that specific metric.


We may not have to report the fetch latency before records are decoded. One
solution is to remember the* FetchTime* when the encoded records are
fetched, and report the fetch latency after the records are decoded by
computing (*FetchTime - EventTime*). An approximate implementation would be
adding a *FetchTime *field to the *RecordsWithSplitIds* assuming that all
the records in that data structure are fetched at the same time.

Thoughts?

Thanks,

Jiangjie (Becket) Qin

On Wed, Sep 9, 2020 at 12:42 AM Stephan Ewen  wrote:

> Thanks for reviving this, Becket!
>
> I think Konstantin's comments are great. I'd add these points:
>
> *Num Bytes / Records Metrics*
>
> For "numBytesIn" and "numRecordsIn", we should reuse the OperatorIOMetric
> group, then it also gets reported to the overview page in the Web UI.
>
> The "numBytesInPerSecond" and "numRecordsInPerSecond" are automatically
> derived metrics, no need to do anything once we populate the above two
> metrics
>
>
> *Current (Fetch) Latency*
>
> I would really go for "eventTimeLag" rather than "fetchLatency". I think
> "eventTimeLag" is a term that has some adoption in the Flink community and
> beyond.
>
> I am not so sure that I see the benefit between "currentLatency" and
> "currentFetchLatency", (or event time lag before/after) as this only is
> different by the time it takes to emit a batch.
>  - In a non-backpressured case, these should be virtually identical
> (and both dominated by watermark lag, not the actual time it takes the
> fetch to be emitted)
>  - In a backpressured case, why do you care about when data was
> fetched, as opposed to emitted? Emitted time is relevant for application
> semantics and checkpoints. Fetch time seems to be an implementation detail
> (how much does the source buffer).
>
> The "currentLatency" (eventTimeLagAfter) can be computed out-of-the-box,
> independent of a source implementation, so that is also a good argument to
> make it the main metric.
> We know timestamps and watermarks in the source. Except for cases where no
> watermarks have been defined at all (batch jobs or pure processing time
> jobs), in which case this metric should probably be "Infinite".
>
> For "currentFetchLatency", we would need to understand timestamps before
> the records are decoded. That is only possible for some sources, where the
> client gives us the records in a (partially) decoded from already (like
> Kafka). Then, some work has been done between the fetch time and the time
> we update the metric already, so it is already a bit closer to the
> "currentFetchLatency". I think following this train of thought, there is
> diminished benefit from that specific metric.
>
>
> *Idle Time*
>
> I agree, it would be great to rename this. Maybe to "sourceWaitTime" or
> "sourceIdleTime" so to make clear that this is not exactly the time that
> Flink's processing pipeline is idle, but the time where the source does not
> have new data.
>
> This is not an easy metric to collect, though (except maybe for the
> sources that are only idle while they have no split assigned, like
> contin

[jira] [Created] (FLINK-19166) StreamingFileWriter should register Listener before the initialization of buckets

2020-09-08 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-19166:


 Summary: StreamingFileWriter should register Listener before the 
initialization of buckets
 Key: FLINK-19166
 URL: https://issues.apache.org/jira/browse/FLINK-19166
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Affects Versions: 1.11.1
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: 1.11.2


In 
[http://apache-flink.147419.n8.nabble.com/StreamingFileSink-hive-metadata-td6898.html]

The feedback of User indicates that some partitions have not been committed 
since the job failed.

This maybe due to FLINK-18110, in FLINK-18110, it has fixed Buckets, but forgot 
fixing {{StreamingFileWriter}} , it should register Listener before the 
initialization of buckets, otherwise, will loose listening too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-08 Thread Jingsong Li
Hi Zhu Zhu,

Add a new blocker: https://issues.apache.org/jira/browse/FLINK-19166

Will fix it soon.

Best,
Jingsong

On Tue, Sep 8, 2020 at 12:26 AM Zhu Zhu  wrote:

> Hi All,
>
> Since there are still two 1.11.2 blockers items in progress, RC1 creation
> will be postponed to tomorrow.
>
> Thanks,
> Zhu
>
> Zhu Zhu  于2020年9月4日周五 下午3:50写道:
>
> > @Dawid
> > Thanks for the information and preparing a fix for FLINK-19133!
> > I have made it a blocker for 1.11.2 and will keep tracking its status.
> >
> > @Till
> > Thanks for the updates and efforts for FLINK-18959!
> >
> > Thanks,
> > Zhu
> >
> > Till Rohrmann  于2020年9月4日周五 下午3:41写道:
> >
> >> Fyi, FLINK-18959 has been merged into the release-1.10 branch.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Sep 3, 2020 at 2:38 PM Dawid Wysakowicz  >
> >> wrote:
> >>
> >>> User has just reported another issue FLINK-19133 which I think should
> be
> >>> a blocker for the 1.11.2 release. I'll try to prepare a fix as soon as
> >>> possible.
> >>>
> >>> On 03/09/2020 09:36, Zhu Zhu wrote:
> >>> > Thanks for the inputs!
> >>> > I have made FLINK-14942 and FLINK-18641 blockers for 1.11.2.
> >>> >
> >>> > And thanks a lot for offering help, zhijiang!
> >>> >
> >>> > Thanks,
> >>> > Zhu
> >>> >
> >>> > Congxian Qiu  于2020年9月3日周四 下午3:18写道:
> >>> >
> >>> >> Hi
> >>> >> I'd like to include FLINK-14942 into 1.11.2. FLINK-14942 (this
> >>> fixes a
> >>> >> bug introduce in 1.11.0), there is a pr for it already.
> >>> >> Best,
> >>> >> Congxian
> >>> >>
> >>> >>
> >>> >> Zhou, Brian  于2020年9月3日周四 上午11:21写道:
> >>> >>
> >>> >>> Hi,
> >>> >>>
> >>> >>> Thanks Becket for addressing the issue. FLINK-18641 is now a
> blocker
> >>> for
> >>> >>> Pravega connector integration, hope we can have it included in
> 1.11.2
> >>> >>> release.
> >>> >>>
> >>> >>> Best Regards,
> >>> >>> Brian
> >>> >>>
> >>> >>> -Original Message-
> >>> >>> From: Becket Qin 
> >>> >>> Sent: Thursday, September 3, 2020 11:18
> >>> >>> To: dev
> >>> >>> Cc: khachatryan.ro...@gmail.com; Till Rohrmann; david; Jingsong Li
> >>> >>> Subject: Re: [DISCUSS] Releasing Flink 1.11.2
> >>> >>>
> >>> >>>
> >>> >>> [EXTERNAL EMAIL]
> >>> >>>
> >>> >>> Hi Zhuzhu,
> >>> >>>
> >>> >>> Thanks for starting the discussion.
> >>> >>>
> >>> >>> I'd like to include FLINK-18641 into 1.11.2 as well. It is a
> >>> regression
> >>> >>> from previous versions and is currently blocking the development of
> >>> >> Pravega
> >>> >>> source on top of FLIP-27.
> >>> >>>
> >>> >>> Thanks,
> >>> >>>
> >>> >>> Jiangjie (Becket) Qin
> >>> >>>
> >>> >>> On Wed, Sep 2, 2020 at 11:13 PM Zhu Zhu  wrote:
> >>> >>>
> >>>  Thank you all for the inputs!
> >>> 
> >>>  I agree with Till that we should set a soft deadline first.
> >>>  I'd like to propose next Monday if no new blocker issue pops up.
> >>>  But feel free to raise your concerns if you feel next Monday as a
> >>>  deadline may not work for fixes which should be a blocker for
> >>> 1.11.2.
> >>> 
> >>>  Here's a summary of the wanted/blocker but still open fixes:
> >>>  - FLINK-19121 Avoid accessing HDFS frequently in
> >>> HiveBulkWriterFactory
> >>>  - FLINK-19109 Split Reader eats chained periodic watermarks
> >>>  - (not a strict blocker) FLINK-18959 Fail to archiveExecutionGraph
> >>>  because job is not finished when dispatcher close
> >>>  - FLINK-18934 Idle stream does not advance watermark in connected
> >>>  stream
> >>> 
> >>>  Thanks,
> >>>  Zhu
> >>> 
> >>>  Konstantin Knauf  于2020年9月2日周三 下午9:00写道:
> >>> 
> >>> > I think it would be nice to include a fix for
> >>> > https://issues.apache.org/jira/browse/FLINK-18934, too, as it
> >>> > affects a highly requested feature of Flink 1.11 quite severely.
> >>> >
> >>> > On Wed, Sep 2, 2020 at 2:51 PM Till Rohrmann <
> trohrm...@apache.org
> >>> >
> >>>  wrote:
> >>> >> Thanks a lot for starting this discussion Zhu Zhu and for
> >>> >> volunteering
> >>>  as
> >>> >> the release manager. Big +1 for creating the next 1.11 bug fix
> >>>  release. I
> >>> >> think we already collected quite a bit of fixes which our users
> >>> >> will benefit from.
> >>> >>
> >>> >> For the pending fixes, I would suggest setting a soft deadline
> >>> >> (maybe
> >>> > until
> >>> >> beginning of next week) and then starting to cut the release
> >>> >> (given
> >>>  that
> >>> > no
> >>> >> other blocker issues pop up). Maybe we are able to resolve the
> >>> >> issues
> >>> > even
> >>> >> earlier which would allow us to cut the release also earlier.
> >>> >>
> >>> >> From my side I would like to include FLINK-18959 in the release.
> >>> >> But it
> >>> > is
> >>> >> not a strict release blocker.
> >>> >>
> >>> >> Cheers,
> >>> >> Till
> >>> >>
> >>> >> On Wed, Sep 2, 2020 at 2:40 PM David Anderson
> >>> >> 
> >>> >> wrote:
> >>

Re: [VOTE] FLIP-137: Support Pandas UDAF in PyFlink

2020-09-08 Thread Xingbo Huang
Hi all,

Thanks a lot for the discussion and votes. I will summarize the result in a
separate email.

Best,
Xingbo

Zhu Zhu  于2020年9月8日周二 上午11:16写道:

> +1
>
> Thanks,
> Zhu
>
> Hequn Cheng  于2020年9月8日周二 上午8:57写道:
>
> > +1 (binding)
> >
> >
> > On Tue, Sep 8, 2020 at 7:43 AM jincheng sun 
> > wrote:
> >
> > > +1(binding)
> > >
> > > Best,
> > > Jincheng
> > >
> > >
> > > Shuiqiang Chen  于2020年9月7日周一 下午3:50写道:
> > >
> > > > +1 (non-binding)
> > > >
> > > > > 在 2020年9月7日,下午2:38,Wei Zhong  写道:
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > >> 在 2020年9月7日,10:00,Dian Fu  写道:
> > > > >>
> > > > >> +1
> > > > >>
> > > > >>> 在 2020年9月4日,上午11:12,Xingbo Huang  写道:
> > > > >>>
> > > > >>> Hi all,
> > > > >>> I would like to start the vote for FLIP-137[1], which is
> discussed
> > > and
> > > > >>> reached a consensus in the discussion thread[2].
> > > > >>>
> > > > >>> The vote will be open for at least 72h, unless there is an
> > objection
> > > > or not
> > > > >>> enough votes.
> > > > >>>
> > > > >>> Best,
> > > > >>> Xingbo
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-137%3A+Support+Pandas+UDAF+in+PyFlink
> > > > >>> [2]
> > > > >>>
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-137-Support-Pandas-UDAF-in-PyFlink-tt44060.html
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>


[RESULT][VOTE] FLIP-137: Support Pandas UDAF in PyFlink

2020-09-08 Thread Xingbo Huang
Hi all,

The voting time for FLIP-137[1] has passed. I'm closing the vote now.

There 6 + 1 votes, 4 of which are binding:
- Dian (binding)
- Jincheng (binding)
- Hequn (binding)
- Zhu Zhu (binding)
- Wei (non-binding)
- Shuiqiang (non-binding)

There are no disapproving votes.

Thus, FLIP-137 has been accepted.

Thanks a lot for everyone for joining this discussion and the votes.

Best,
Xingbo

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-137%3A+Support+Pandas+UDAF+in+PyFlink


Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-08 Thread Jingsong Li
Hi Zhu Zhu,

Replenish its[1] influence:
For HiveStreamingSink & FileSystemSink in Table/SQL, partition commit make
partition visible for downstream Hive/Spark engines.
But due to FLINK-19166, will lose some partitions to commit after Job
failover in some cases, especially for short partitions.
In the user's opinion, the data is lost, which does not conform to
exactly-once.

[1]https://issues.apache.org/jira/browse/FLINK-19166
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit

Best,
Jingsong

On Wed, Sep 9, 2020 at 11:08 AM Jingsong Li  wrote:

> Hi Zhu Zhu,
>
> Add a new blocker: https://issues.apache.org/jira/browse/FLINK-19166
>
> Will fix it soon.
>
> Best,
> Jingsong
>
> On Tue, Sep 8, 2020 at 12:26 AM Zhu Zhu  wrote:
>
>> Hi All,
>>
>> Since there are still two 1.11.2 blockers items in progress, RC1 creation
>> will be postponed to tomorrow.
>>
>> Thanks,
>> Zhu
>>
>> Zhu Zhu  于2020年9月4日周五 下午3:50写道:
>>
>> > @Dawid
>> > Thanks for the information and preparing a fix for FLINK-19133!
>> > I have made it a blocker for 1.11.2 and will keep tracking its status.
>> >
>> > @Till
>> > Thanks for the updates and efforts for FLINK-18959!
>> >
>> > Thanks,
>> > Zhu
>> >
>> > Till Rohrmann  于2020年9月4日周五 下午3:41写道:
>> >
>> >> Fyi, FLINK-18959 has been merged into the release-1.10 branch.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Thu, Sep 3, 2020 at 2:38 PM Dawid Wysakowicz <
>> dwysakow...@apache.org>
>> >> wrote:
>> >>
>> >>> User has just reported another issue FLINK-19133 which I think should
>> be
>> >>> a blocker for the 1.11.2 release. I'll try to prepare a fix as soon as
>> >>> possible.
>> >>>
>> >>> On 03/09/2020 09:36, Zhu Zhu wrote:
>> >>> > Thanks for the inputs!
>> >>> > I have made FLINK-14942 and FLINK-18641 blockers for 1.11.2.
>> >>> >
>> >>> > And thanks a lot for offering help, zhijiang!
>> >>> >
>> >>> > Thanks,
>> >>> > Zhu
>> >>> >
>> >>> > Congxian Qiu  于2020年9月3日周四 下午3:18写道:
>> >>> >
>> >>> >> Hi
>> >>> >> I'd like to include FLINK-14942 into 1.11.2. FLINK-14942 (this
>> >>> fixes a
>> >>> >> bug introduce in 1.11.0), there is a pr for it already.
>> >>> >> Best,
>> >>> >> Congxian
>> >>> >>
>> >>> >>
>> >>> >> Zhou, Brian  于2020年9月3日周四 上午11:21写道:
>> >>> >>
>> >>> >>> Hi,
>> >>> >>>
>> >>> >>> Thanks Becket for addressing the issue. FLINK-18641 is now a
>> blocker
>> >>> for
>> >>> >>> Pravega connector integration, hope we can have it included in
>> 1.11.2
>> >>> >>> release.
>> >>> >>>
>> >>> >>> Best Regards,
>> >>> >>> Brian
>> >>> >>>
>> >>> >>> -Original Message-
>> >>> >>> From: Becket Qin 
>> >>> >>> Sent: Thursday, September 3, 2020 11:18
>> >>> >>> To: dev
>> >>> >>> Cc: khachatryan.ro...@gmail.com; Till Rohrmann; david; Jingsong
>> Li
>> >>> >>> Subject: Re: [DISCUSS] Releasing Flink 1.11.2
>> >>> >>>
>> >>> >>>
>> >>> >>> [EXTERNAL EMAIL]
>> >>> >>>
>> >>> >>> Hi Zhuzhu,
>> >>> >>>
>> >>> >>> Thanks for starting the discussion.
>> >>> >>>
>> >>> >>> I'd like to include FLINK-18641 into 1.11.2 as well. It is a
>> >>> regression
>> >>> >>> from previous versions and is currently blocking the development
>> of
>> >>> >> Pravega
>> >>> >>> source on top of FLIP-27.
>> >>> >>>
>> >>> >>> Thanks,
>> >>> >>>
>> >>> >>> Jiangjie (Becket) Qin
>> >>> >>>
>> >>> >>> On Wed, Sep 2, 2020 at 11:13 PM Zhu Zhu 
>> wrote:
>> >>> >>>
>> >>>  Thank you all for the inputs!
>> >>> 
>> >>>  I agree with Till that we should set a soft deadline first.
>> >>>  I'd like to propose next Monday if no new blocker issue pops up.
>> >>>  But feel free to raise your concerns if you feel next Monday as a
>> >>>  deadline may not work for fixes which should be a blocker for
>> >>> 1.11.2.
>> >>> 
>> >>>  Here's a summary of the wanted/blocker but still open fixes:
>> >>>  - FLINK-19121 Avoid accessing HDFS frequently in
>> >>> HiveBulkWriterFactory
>> >>>  - FLINK-19109 Split Reader eats chained periodic watermarks
>> >>>  - (not a strict blocker) FLINK-18959 Fail to
>> archiveExecutionGraph
>> >>>  because job is not finished when dispatcher close
>> >>>  - FLINK-18934 Idle stream does not advance watermark in connected
>> >>>  stream
>> >>> 
>> >>>  Thanks,
>> >>>  Zhu
>> >>> 
>> >>>  Konstantin Knauf  于2020年9月2日周三 下午9:00写道:
>> >>> 
>> >>> > I think it would be nice to include a fix for
>> >>> > https://issues.apache.org/jira/browse/FLINK-18934, too, as it
>> >>> > affects a highly requested feature of Flink 1.11 quite severely.
>> >>> >
>> >>> > On Wed, Sep 2, 2020 at 2:51 PM Till Rohrmann <
>> trohrm...@apache.org
>> >>> >
>> >>>  wrote:
>> >>> >> Thanks a lot for starting this discussion Zhu Zhu and for
>> >>> >> volunteering
>> >>>  as
>> >>> >> the release manager. Big +1 for creating the next 1.11 bug fix
>> >>>  release. I
>> >>> >> think we already collected quite a bit of fixes which our user

Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-08 Thread Zhu Zhu
Thanks for reporting this issue and offering to fix it @Jingsong Li

Agreed it is a reasonable blocker. I will postpone 1.11.2 RC1 creation
until it is fixed.

Thanks,
Zhu

Jingsong Li  于2020年9月9日周三 上午11:27写道:

> Hi Zhu Zhu,
>
> Replenish its[1] influence:
> For HiveStreamingSink & FileSystemSink in Table/SQL, partition commit make
> partition visible for downstream Hive/Spark engines.
> But due to FLINK-19166, will lose some partitions to commit after Job
> failover in some cases, especially for short partitions.
> In the user's opinion, the data is lost, which does not conform to
> exactly-once.
>
> [1]https://issues.apache.org/jira/browse/FLINK-19166
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
>
> Best,
> Jingsong
>
> On Wed, Sep 9, 2020 at 11:08 AM Jingsong Li 
> wrote:
>
> > Hi Zhu Zhu,
> >
> > Add a new blocker: https://issues.apache.org/jira/browse/FLINK-19166
> >
> > Will fix it soon.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Sep 8, 2020 at 12:26 AM Zhu Zhu  wrote:
> >
> >> Hi All,
> >>
> >> Since there are still two 1.11.2 blockers items in progress, RC1
> creation
> >> will be postponed to tomorrow.
> >>
> >> Thanks,
> >> Zhu
> >>
> >> Zhu Zhu  于2020年9月4日周五 下午3:50写道:
> >>
> >> > @Dawid
> >> > Thanks for the information and preparing a fix for FLINK-19133!
> >> > I have made it a blocker for 1.11.2 and will keep tracking its status.
> >> >
> >> > @Till
> >> > Thanks for the updates and efforts for FLINK-18959!
> >> >
> >> > Thanks,
> >> > Zhu
> >> >
> >> > Till Rohrmann  于2020年9月4日周五 下午3:41写道:
> >> >
> >> >> Fyi, FLINK-18959 has been merged into the release-1.10 branch.
> >> >>
> >> >> Cheers,
> >> >> Till
> >> >>
> >> >> On Thu, Sep 3, 2020 at 2:38 PM Dawid Wysakowicz <
> >> dwysakow...@apache.org>
> >> >> wrote:
> >> >>
> >> >>> User has just reported another issue FLINK-19133 which I think
> should
> >> be
> >> >>> a blocker for the 1.11.2 release. I'll try to prepare a fix as soon
> as
> >> >>> possible.
> >> >>>
> >> >>> On 03/09/2020 09:36, Zhu Zhu wrote:
> >> >>> > Thanks for the inputs!
> >> >>> > I have made FLINK-14942 and FLINK-18641 blockers for 1.11.2.
> >> >>> >
> >> >>> > And thanks a lot for offering help, zhijiang!
> >> >>> >
> >> >>> > Thanks,
> >> >>> > Zhu
> >> >>> >
> >> >>> > Congxian Qiu  于2020年9月3日周四 下午3:18写道:
> >> >>> >
> >> >>> >> Hi
> >> >>> >> I'd like to include FLINK-14942 into 1.11.2. FLINK-14942
> (this
> >> >>> fixes a
> >> >>> >> bug introduce in 1.11.0), there is a pr for it already.
> >> >>> >> Best,
> >> >>> >> Congxian
> >> >>> >>
> >> >>> >>
> >> >>> >> Zhou, Brian  于2020年9月3日周四 上午11:21写道:
> >> >>> >>
> >> >>> >>> Hi,
> >> >>> >>>
> >> >>> >>> Thanks Becket for addressing the issue. FLINK-18641 is now a
> >> blocker
> >> >>> for
> >> >>> >>> Pravega connector integration, hope we can have it included in
> >> 1.11.2
> >> >>> >>> release.
> >> >>> >>>
> >> >>> >>> Best Regards,
> >> >>> >>> Brian
> >> >>> >>>
> >> >>> >>> -Original Message-
> >> >>> >>> From: Becket Qin 
> >> >>> >>> Sent: Thursday, September 3, 2020 11:18
> >> >>> >>> To: dev
> >> >>> >>> Cc: khachatryan.ro...@gmail.com; Till Rohrmann; david; Jingsong
> >> Li
> >> >>> >>> Subject: Re: [DISCUSS] Releasing Flink 1.11.2
> >> >>> >>>
> >> >>> >>>
> >> >>> >>> [EXTERNAL EMAIL]
> >> >>> >>>
> >> >>> >>> Hi Zhuzhu,
> >> >>> >>>
> >> >>> >>> Thanks for starting the discussion.
> >> >>> >>>
> >> >>> >>> I'd like to include FLINK-18641 into 1.11.2 as well. It is a
> >> >>> regression
> >> >>> >>> from previous versions and is currently blocking the development
> >> of
> >> >>> >> Pravega
> >> >>> >>> source on top of FLIP-27.
> >> >>> >>>
> >> >>> >>> Thanks,
> >> >>> >>>
> >> >>> >>> Jiangjie (Becket) Qin
> >> >>> >>>
> >> >>> >>> On Wed, Sep 2, 2020 at 11:13 PM Zhu Zhu 
> >> wrote:
> >> >>> >>>
> >> >>>  Thank you all for the inputs!
> >> >>> 
> >> >>>  I agree with Till that we should set a soft deadline first.
> >> >>>  I'd like to propose next Monday if no new blocker issue pops
> up.
> >> >>>  But feel free to raise your concerns if you feel next Monday
> as a
> >> >>>  deadline may not work for fixes which should be a blocker for
> >> >>> 1.11.2.
> >> >>> 
> >> >>>  Here's a summary of the wanted/blocker but still open fixes:
> >> >>>  - FLINK-19121 Avoid accessing HDFS frequently in
> >> >>> HiveBulkWriterFactory
> >> >>>  - FLINK-19109 Split Reader eats chained periodic watermarks
> >> >>>  - (not a strict blocker) FLINK-18959 Fail to
> >> archiveExecutionGraph
> >> >>>  because job is not finished when dispatcher close
> >> >>>  - FLINK-18934 Idle stream does not advance watermark in
> connected
> >> >>>  stream
> >> >>> 
> >> >>>  Thanks,
> >> >>>  Zhu
> >> >>> 
> >> >>>  Konstantin Knauf  于2020年9月2日周三 下午9:00写道:
> >> >>> 
> >> >>> > I think it would be nice to include a fix for
> >> >>> > https://issues.apache.org

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-08 Thread Kurt Young
 I doubt that any sorting algorithm would work with only knowing the keys
are different but without
information of which is greater.

Best,
Kurt


On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz 
wrote:

> Ad. 1
>
> Yes, you are right in principle.
>
> Let me though clarify my proposal a bit. The proposed sort-style
> execution aims at a generic KeyedProcessFunction were all the
> "aggregations" are actually performed in the user code. It tries to
> improve the performance by actually removing the need to use RocksDB e.g.:
>
> private static final class Summer
> extends KeyedProcessFunction,
> Tuple2> {
>
> 
>
> @Override
> public void processElement(
> Tuple2 value,
> Context ctx,
> Collector> out) throws Exception {
> if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) {
> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
> timerRegistered.update(true);
> }
> Integer v = counter.value();
> Integer incomingValue = value.f1;
> if (v != null) {
> v += incomingValue;
> } else {
> v = incomingValue;
> }
> counter.update(v);
> }
>
> 
>
>}
>
> Therefore I don't think the first part of your reply with separating the
> write and read workload applies here. We do not aim to create a
> competing API with the Table API. We think operations such as joins or
> analytical aggregations should be performed in Table API.
>
> As for the second part I agree it would be nice to fall back to the
> sorting approach only if a certain threshold of memory in a State
> Backend is used. This has some problems though. We would need a way to
> estimate the size of the occupied memory to tell when the threshold is
> reached. That is not easily doable by default e.g. in a
> MemoryStateBackend, as we do not serialize the values in the state
> backend by default. We would have to add that, but this would add the
> overhead of the serialization.
>
> This proposal aims at the cases where we do have a large state that will
> not fit into the memory and without the change users are forced to use
> RocksDB. If the state fits in memory I agree it will be better to do
> hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> think it is important to give users the choice to use one or the other
> approach. We might discuss which approach should be the default for
> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> user configured state backend or sorting-based with a single key at a
> time backend. Moreover we could think if we should let users choose the
> sort vs hash "state backend" per operator. Would that suffice?
>
> Ad. 2
>
> I still think we can just use the first X bytes of the serialized form
> as the normalized key and fallback to comparing full keys on clashes. It
> is because we are actually not interested in a logical order, but we
> care only about the "grouping" aspect of the sorting. Therefore I think
> its enough to compare only parts of the full key as the normalized key.
>
> Thanks again for the really nice and thorough feedback!
>
> Best,
>
> Dawid
>
> On 08/09/2020 14:47, Kurt Young wrote:
> > Regarding #1, yes the state backend is definitely hash-based execution.
> > However there are some differences between
> > batch hash-based execution. The key difference is *random access &
> > read/write mixed workload". For example, by using
> > state backend in streaming execution, one have to mix the read and write
> > operations and all of them are actually random
> > access. But in a batch hash execution, we could divide the phases into
> > write and read. For example, we can build the
> > hash table first, with only write operations. And once the build is done,
> > we can start to read and trigger the user codes.
> > Take hash aggregation which blink planner implemented as an example,
> during
> > building phase, as long as the hash map
> > could fit into memory, we will update the accumulators directly in the
> hash
> > map. And once we are running out of memory,
> > we then fall back to sort based execution. It improves the performance a
> > lot if the incoming data can be processed in
> > memory.
> >
> > Regarding #2, IIUC you are actually describing a binary format of key,
> not
> > normalized key which is used in DataSet. I will
> > take String for example. If we have lots of keys with length all greater
> > than, let's say 20. In your proposal, you will encode
> > the whole string in the prefix of your composed data (  +
> 
> > +  ). And when you compare
> > records, you will actually compare the *whole* key of the record. For
> > normalized key, it's fixed-length in this case, IIRC it will
> > take 8 bytes to represent the string. And the sorter will store the
> > normalized key and offset in a

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-08 Thread Danny Chan
Thanks for the summary Timo ~

I want to clarify a little bit, what is the conclusion about the 
fromChangelogStream and toChangelogStream, should we use this name or we use 
fromDataStream with an optional ChangelogMode flag ?

Best,
Danny Chan
在 2020年9月8日 +0800 PM8:22,Timo Walther ,写道:
> Hi Danny,
>
> Your proposed signatures sound good to me.
>
> fromDataStream(dataStream, Schema)
> toDataStream(table, AbstractDataType)
>
> They address all my concerns. The API would not be symmetric anymore,
> but this is fine with me. Others raised concerns about deprecating
> `fromDataStream(dataStream, Expression)`. Are they fine with this as well?
>
> If there are no objections, I would update the FLIP with the methods
> above. Bu let me briefly summarize my thoughts on this again, so that we
> are all on the same page:
> - The biggest discussion point seems the fromInsertStream/toInsertStream.
> - I don’t have a strong opinion on naming, from/toDataStream would be
> fine for me. But:
> - It slightly different type mappings and might break existing pipelines
> silently. This point can be neglected as the differences are only minor.
> - We need a way of declaring the rowtime attribute but without declaring
> all columns again. Reduce manual schema work as much as possible.
> - Both Dawid and I don’t like the current either “position based” or
> “name based” expression logic that looks like a projection but is not.
> - Actually name based expressions are not necessary, since we have
> positions for all new data types.
> - Schema is not suitable to influence the output type for toDataStream.
> It should be DataType.
>
> All items are solved by Danny's suggestion.
>
> Regards,
> Timo
>
>
>
> On 08.09.20 14:04, Danny Chan wrote:
> > Hi, Timo ~
> >
> > "It is not about changelog mode compatibility, it is about the type
> > compatibility.”
> >
> > For fromDataStream(dataStream, Schema), there should not be compatibility 
> > problem or data type inconsistency. We know the logical type from Schema 
> > and physical type from the dataStream itself.
> >
> > For toDataStream(table, AbstractDataType), we can get the logical type 
> > from the table itself
> > and the physical type from the passed data type.
> >
> > If both behavior are deterministic, what's the problem for type 
> > compatibility and safety?
> >
> > My concern is that in most of the cases, people use the "insert stream", 
> > they do not need to care about
> > the data stream ChangelogMode, so there is no need to distinguish them from 
> > the APIs, an optional param is enough. If we introduces 2 new API there, 
> > people have to choose between them, and can fromChangelogStream()
> > accept an insert stream ? What is the behavior if fromInsertStream() 
> > accepts a changelog stream ?
> >
> >
> > "This means potentially duplicate definition of fields and their data types 
> > etc”
> >
> > I agree, because table already has an underlying schema there.
> >
> > Best,
> > Danny Chan
> > 在 2020年9月3日 +0800 PM8:12,Timo Walther ,写道:
> > > Hi Danny,
> > >
> > > "if ChangelogMode.INSERT is the default, existing pipelines should be
> > > compatible"
> > >
> > > It is not about changelog mode compatibility, it is about the type
> > > compatibility. The renaming to `toInsertStream` is only to have a mean
> > > of dealing with data type inconsistencies that could break existing
> > > pipelines.
> > >
> > > As the FLIP describes, the following new behavior should be implemented:
> > >
> > > - It does this by translating the TypeInformation to DataType.
> > > - This will happen with a new TypeInfoDataTypeConverter that will no
> > > longer produce LegacyTypeInformationType.
> > > - All types from DataStream API should be supported by this converter.
> > > - TupleTypeInfoBase will be translated into a proper RowType or
> > > StructuredType.
> > > - BigDecimals will be converted to DECIMAL(38,18) by default.
> > > - Composite types (tuples, POJOs, rows) will be flattened by default if
> > > they are used as top-level records (similar to the old behavior).
> > > - The order of POJO field's is determined by the DataTypeExtractor and
> > > must not be defined manually anymore.
> > > - GenericTypeInfo is converted to RawType immediately by considering the
> > > current configuration.
> > > - A DataStream that originated from Table API will keep its DataType
> > > information due to ExternalTypeInfo implementing DataTypeQueryable.
> > >
> > > I would feel safer if we do this under a new method name.
> > >
> > > "toDataStream(table, schema.bindTo(DataType))"
> > >
> > > This is what I meant with "integrate the DataType into the Schema class
> > > itself". Yes, we can do that if everybody is fine with it. But why
> > > should a user specify both a schema and a data type? This means
> > > potentially duplicate definition of fields and their data types etc.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 03.09.20 11:31, Danny Chan wrote:
> > > > "It is a more conservative approach

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-08 Thread Jark Wu
Hi,

I'm +1 to use the naming of from/toDataStream, rather than
from/toInsertStream. So we don't need to deprecate the existing
`fromDataStream`.

I'm +1 to Danny's proposal: fromDataStream(dataStream, Schema) and
toDataStream(table, AbstractDataType)

I think we can also keep the method `createTemporaryView(path,
DataStream)`.
I don't have a strong opinion on deprecating fromDataStream(datastream,
exprs), but slightly prefer to keep them.

Best,
Jark

On Wed, 9 Sep 2020 at 14:34, Danny Chan  wrote:

> Thanks for the summary Timo ~
>
> I want to clarify a little bit, what is the conclusion about the
> fromChangelogStream and toChangelogStream, should we use this name or we
> use fromDataStream with an optional ChangelogMode flag ?
>
> Best,
> Danny Chan
> 在 2020年9月8日 +0800 PM8:22,Timo Walther ,写道:
> > Hi Danny,
> >
> > Your proposed signatures sound good to me.
> >
> > fromDataStream(dataStream, Schema)
> > toDataStream(table, AbstractDataType)
> >
> > They address all my concerns. The API would not be symmetric anymore,
> > but this is fine with me. Others raised concerns about deprecating
> > `fromDataStream(dataStream, Expression)`. Are they fine with this as
> well?
> >
> > If there are no objections, I would update the FLIP with the methods
> > above. Bu let me briefly summarize my thoughts on this again, so that we
> > are all on the same page:
> > - The biggest discussion point seems the fromInsertStream/toInsertStream.
> > - I don’t have a strong opinion on naming, from/toDataStream would be
> > fine for me. But:
> > - It slightly different type mappings and might break existing pipelines
> > silently. This point can be neglected as the differences are only minor.
> > - We need a way of declaring the rowtime attribute but without declaring
> > all columns again. Reduce manual schema work as much as possible.
> > - Both Dawid and I don’t like the current either “position based” or
> > “name based” expression logic that looks like a projection but is not.
> > - Actually name based expressions are not necessary, since we have
> > positions for all new data types.
> > - Schema is not suitable to influence the output type for toDataStream.
> > It should be DataType.
> >
> > All items are solved by Danny's suggestion.
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 08.09.20 14:04, Danny Chan wrote:
> > > Hi, Timo ~
> > >
> > > "It is not about changelog mode compatibility, it is about the type
> > > compatibility.”
> > >
> > > For fromDataStream(dataStream, Schema), there should not be
> compatibility problem or data type inconsistency. We know the logical type
> from Schema and physical type from the dataStream itself.
> > >
> > > For toDataStream(table, AbstractDataType), we can get the logical
> type from the table itself
> > > and the physical type from the passed data type.
> > >
> > > If both behavior are deterministic, what's the problem for type
> compatibility and safety?
> > >
> > > My concern is that in most of the cases, people use the "insert
> stream", they do not need to care about
> > > the data stream ChangelogMode, so there is no need to distinguish them
> from the APIs, an optional param is enough. If we introduces 2 new API
> there, people have to choose between them, and can fromChangelogStream()
> > > accept an insert stream ? What is the behavior if fromInsertStream()
> accepts a changelog stream ?
> > >
> > >
> > > "This means potentially duplicate definition of fields and their data
> types etc”
> > >
> > > I agree, because table already has an underlying schema there.
> > >
> > > Best,
> > > Danny Chan
> > > 在 2020年9月3日 +0800 PM8:12,Timo Walther ,写道:
> > > > Hi Danny,
> > > >
> > > > "if ChangelogMode.INSERT is the default, existing pipelines should be
> > > > compatible"
> > > >
> > > > It is not about changelog mode compatibility, it is about the type
> > > > compatibility. The renaming to `toInsertStream` is only to have a
> mean
> > > > of dealing with data type inconsistencies that could break existing
> > > > pipelines.
> > > >
> > > > As the FLIP describes, the following new behavior should be
> implemented:
> > > >
> > > > - It does this by translating the TypeInformation to DataType.
> > > > - This will happen with a new TypeInfoDataTypeConverter that will no
> > > > longer produce LegacyTypeInformationType.
> > > > - All types from DataStream API should be supported by this
> converter.
> > > > - TupleTypeInfoBase will be translated into a proper RowType or
> > > > StructuredType.
> > > > - BigDecimals will be converted to DECIMAL(38,18) by default.
> > > > - Composite types (tuples, POJOs, rows) will be flattened by default
> if
> > > > they are used as top-level records (similar to the old behavior).
> > > > - The order of POJO field's is determined by the DataTypeExtractor
> and
> > > > must not be defined manually anymore.
> > > > - GenericTypeInfo is converted to RawType immediately by considering
> the
> > > > current configuration.
> > > > - A DataStream that origi

[jira] [Created] (FLINK-19168) Upgrade Kafka client version

2020-09-08 Thread darion yaphet (Jira)
darion yaphet created FLINK-19168:
-

 Summary: Upgrade Kafka client version
 Key: FLINK-19168
 URL: https://issues.apache.org/jira/browse/FLINK-19168
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: darion yaphet


Currently we are using Kafka Client 0.11.0.2 which is released at 2017 and the 
latest version is 2.6.0. I don't know why don't update it ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19167) Proccess Function Example could not work

2020-09-08 Thread tinny cat (Jira)
tinny cat created FLINK-19167:
-

 Summary: Proccess Function Example could not work
 Key: FLINK-19167
 URL: https://issues.apache.org/jira/browse/FLINK-19167
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.11.1
Reporter: tinny cat


Section "*Porccess Function Example*" of 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
 current is:
{code:java}
// Some comments here
@Override
public void processElement(
Tuple2 value, 
Context ctx, 
Collector> out) throws Exception {

// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}

// update the state's count
current.count++;

// set the state's timestamp to the record's assigned event time 
timestamp
current.lastModified = ctx.timestamp();

// write the state back
state.update(current);

// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
}

@Override
public void onTimer(
long timestamp, 
OnTimerContext ctx, 
Collector> out) throws Exception {

// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();

// check if this is an outdated timer or the latest timer
// this will be never happened
if (timestamp == result.lastModified + 6) {
// emit the state on timeout
out.collect(new Tuple2(result.key, result.count));
}
}
{code}
however, it should be:
{code:java}
@Override
public void processElement(
Tuple2 value, 
Context ctx, 
Collector> out) throws Exception {

// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}

// update the state's count
current.count++;

// set the state's timestamp to the record's assigned event time 
timestamp
// it should be the previous watermark
current.lastModified = ctx.timerService().currentWatermark();

// write the state back
state.update(current);

// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
}

@Override
public void onTimer(
long timestamp, 
OnTimerContext ctx, 
Collector> out) throws Exception {

// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();

// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 6) {
// emit the state on timeout
out.collect(new Tuple2(result.key, result.count));
}
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Kurt Young
I also share the concern that reusing the computed column syntax but have
different semantics
would confuse users a lot.

Besides, I think metadata fields are conceptually not the same with
computed columns. The metadata
field is a connector specific thing and it only contains the information
that where does the field come
from (during source) or where does the field need to write to (during
sink). It's more similar with normal
fields, with assumption that all these fields need going to the data part.

Thus I'm more lean to the rejected alternative that Timo mentioned. And I
think we don't need the
PERSISTED keyword, SYSTEM_METADATA should be enough.

During implementation, the framework only needs to pass such  information to the
connector, and the logic of handling such fields inside the connector
should be straightforward.

Regarding the downside Timo mentioned:

> The disadvantage is that users cannot call UDFs or parse timestamps.

I think this is fairly simple to solve. Since the metadata field isn't a
computed column anymore, we can support
referencing such fields in the computed column. For example:

CREATE TABLE kafka_table (
 id BIGINT,
 name STRING,
 timestamp STRING SYSTEM_METADATA("timestamp"),  // get the timestamp
field from metadata
 ts AS to_timestamp(timestamp) // normal computed column, parse the
string to TIMESTAMP type by using the metadata field
) WITH (
...
)

Best,
Kurt


On Tue, Sep 8, 2020 at 11:57 PM Timo Walther  wrote:

> Hi Leonard,
>
> the only alternative I see is that we introduce a concept that is
> completely different to computed columns. This is also mentioned in the
> rejected alternative section of the FLIP. Something like:
>
> CREATE TABLE kafka_table (
>  id BIGINT,
>  name STRING,
>  timestamp INT SYSTEM_METADATA("timestamp") PERSISTED,
>  headers MAP SYSTEM_METADATA("headers") PERSISTED
> ) WITH (
> ...
> )
>
> This way we would avoid confusion at all and can easily map columns to
> metadata columns. The disadvantage is that users cannot call UDFs or
> parse timestamps. This would need to be done in a real computed column.
>
> I'm happy about better alternatives.
>
> Regards,
> Timo
>
>
> On 08.09.20 15:37, Leonard Xu wrote:
> > HI, Timo
> >
> > Thanks for driving this FLIP.
> >
> > Sorry but I have a concern about Writing metadata via DynamicTableSink
> section:
> >
> > CREATE TABLE kafka_table (
> >id BIGINT,
> >name STRING,
> >timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED,
> >headers AS CAST(SYSTEM_METADATA("headers") AS MAP)
> PERSISTED
> > ) WITH (
> >...
> > )
> > An insert statement could look like:
> >
> > INSERT INTO kafka_table VALUES (
> >(1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
> > )
> >
> > The proposed INERT syntax does not make sense to me, because it contains
> computed(generated) column.
> > Both SQL server and Postgresql do not allow to insert value to computed
> columns even they are persisted, this boke the generated column semantics
> and may confuse user much.
> >
> > For SQL server computed column[1]:
> >> column_name AS computed_column_expression [ PERSISTED [ NOT NULL ] ]...
> >> NOTE: A computed column cannot be the target of an INSERT or UPDATE
> statement.
> >
> > For Postgresql generated column[2]:
> >>   height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) STORED
> >> NOTE: A generated column cannot be written to directly. In INSERT or
> UPDATE commands, a value cannot be specified for a generated column, but
> the keyword DEFAULT may be specified.
> >
> > It shouldn't be allowed to set/update value for generated column after
> lookup the SQL 2016:
> >>  ::=
> >> INSERT INTO  
> >>
> >> If  CTTVC is specified,
> then every  >> value constructor element> simply contained in CTTVC whose positionally
> corresponding 
> >> in  references a column of which some underlying
> column is a generated column shall
> >> be a .
> >> A  specifies the default value of some
> associated item.
> >
> >
> > [1]
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> <
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> >
> > [2] https://www.postgresql.org/docs/12/ddl-generated-columns.html <
> https://www.postgresql.org/docs/12/ddl-generated-columns.html>
> >
> >> 在 2020年9月8日,17:31,Timo Walther  写道:
> >>
> >> Hi Jark,
> >>
> >> according to Flink's and Calcite's casting definition in [1][2]
> TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If not, we
> will make it possible ;-)
> >>
> >> I'm aware of DeserializationSchema.getProducedType but I think that
> this method is actually misplaced. The type should rather be passed to the
> source itself.
> >>
> >> For our Kafka SQL source, we will also not use this method because the
> Kafka source will add own metadata in addition to

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-08 Thread Timo Walther
The conclusion is that we will drop `fromChangelogStream(ChangelogMode, 
DataStream)` but will keep `fromChangelogStream(DataStream)`. 
The latter is necessary to have a per-record changeflag.


We could think about merging `fromChangelogStream`/`fromDataStream` and 
`toChangelogStream`/`toDataStream`. But I think the planner needs to 
know whether the input is insert-only or not. Esp. for bounded streams 
this information will be useful in the future. Also, outputting a query 
to a non-Row type for retraction queries doesn't make much sense if a 
changeflag is missing.


What do you think?

Regard,
Timo

On 09.09.20 08:34, Danny Chan wrote:

Thanks for the summary Timo ~

I want to clarify a little bit, what is the conclusion about the 
fromChangelogStream and toChangelogStream, should we use this name or we use 
fromDataStream with an optional ChangelogMode flag ?

Best,
Danny Chan
在 2020年9月8日 +0800 PM8:22,Timo Walther ,写道:

Hi Danny,

Your proposed signatures sound good to me.

fromDataStream(dataStream, Schema)
toDataStream(table, AbstractDataType)

They address all my concerns. The API would not be symmetric anymore,
but this is fine with me. Others raised concerns about deprecating
`fromDataStream(dataStream, Expression)`. Are they fine with this as well?

If there are no objections, I would update the FLIP with the methods
above. Bu let me briefly summarize my thoughts on this again, so that we
are all on the same page:
- The biggest discussion point seems the fromInsertStream/toInsertStream.
- I don’t have a strong opinion on naming, from/toDataStream would be
fine for me. But:
- It slightly different type mappings and might break existing pipelines
silently. This point can be neglected as the differences are only minor.
- We need a way of declaring the rowtime attribute but without declaring
all columns again. Reduce manual schema work as much as possible.
- Both Dawid and I don’t like the current either “position based” or
“name based” expression logic that looks like a projection but is not.
- Actually name based expressions are not necessary, since we have
positions for all new data types.
- Schema is not suitable to influence the output type for toDataStream.
It should be DataType.

All items are solved by Danny's suggestion.

Regards,
Timo



On 08.09.20 14:04, Danny Chan wrote:

Hi, Timo ~

"It is not about changelog mode compatibility, it is about the type
compatibility.”

For fromDataStream(dataStream, Schema), there should not be compatibility 
problem or data type inconsistency. We know the logical type from Schema and 
physical type from the dataStream itself.

For toDataStream(table, AbstractDataType), we can get the logical type from 
the table itself
and the physical type from the passed data type.

If both behavior are deterministic, what's the problem for type compatibility 
and safety?

My concern is that in most of the cases, people use the "insert stream", they 
do not need to care about
the data stream ChangelogMode, so there is no need to distinguish them from the 
APIs, an optional param is enough. If we introduces 2 new API there, people 
have to choose between them, and can fromChangelogStream()
accept an insert stream ? What is the behavior if fromInsertStream() accepts a 
changelog stream ?


"This means potentially duplicate definition of fields and their data types etc”

I agree, because table already has an underlying schema there.

Best,
Danny Chan
在 2020年9月3日 +0800 PM8:12,Timo Walther ,写道:

Hi Danny,

"if ChangelogMode.INSERT is the default, existing pipelines should be
compatible"

It is not about changelog mode compatibility, it is about the type
compatibility. The renaming to `toInsertStream` is only to have a mean
of dealing with data type inconsistencies that could break existing
pipelines.

As the FLIP describes, the following new behavior should be implemented:

- It does this by translating the TypeInformation to DataType.
- This will happen with a new TypeInfoDataTypeConverter that will no
longer produce LegacyTypeInformationType.
- All types from DataStream API should be supported by this converter.
- TupleTypeInfoBase will be translated into a proper RowType or
StructuredType.
- BigDecimals will be converted to DECIMAL(38,18) by default.
- Composite types (tuples, POJOs, rows) will be flattened by default if
they are used as top-level records (similar to the old behavior).
- The order of POJO field's is determined by the DataTypeExtractor and
must not be defined manually anymore.
- GenericTypeInfo is converted to RawType immediately by considering the
current configuration.
- A DataStream that originated from Table API will keep its DataType
information due to ExternalTypeInfo implementing DataTypeQueryable.

I would feel safer if we do this under a new method name.

"toDataStream(table, schema.bindTo(DataType))"

This is what I meant with "integrate the DataType into the Schema class
itself". Yes, we can do that if everybody is fine with it. But why
shoul