Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther

Hi Kurt,

thanks for sharing your opinion. I'm totally up for not reusing computed 
columns. I think Jark was a big supporter of this syntax, @Jark are you 
fine with this as well? The non-computed column approach was only a 
"slightly rejected alternative".


Furthermore, we would need to think about how such a new design 
influences the LIKE clause though.


However, we should still keep the `PERSISTED` keyword as it influences 
the query->sink schema. If you look at the list of metadata for existing 
connectors and formats, we currently offer only two writable metadata 
fields. Otherwise, one would need to declare two tables whenever a 
metadata columns is read (one for the source, one for the sink). This 
can be quite inconvientient e.g. for just reading the topic.


Regards,
Timo


On 09.09.20 08:52, Kurt Young wrote:

I also share the concern that reusing the computed column syntax but have
different semantics
would confuse users a lot.

Besides, I think metadata fields are conceptually not the same with
computed columns. The metadata
field is a connector specific thing and it only contains the information
that where does the field come
from (during source) or where does the field need to write to (during
sink). It's more similar with normal
fields, with assumption that all these fields need going to the data part.

Thus I'm more lean to the rejected alternative that Timo mentioned. And I
think we don't need the
PERSISTED keyword, SYSTEM_METADATA should be enough.

During implementation, the framework only needs to pass such  information to the
connector, and the logic of handling such fields inside the connector
should be straightforward.

Regarding the downside Timo mentioned:


The disadvantage is that users cannot call UDFs or parse timestamps.


I think this is fairly simple to solve. Since the metadata field isn't a
computed column anymore, we can support
referencing such fields in the computed column. For example:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp STRING SYSTEM_METADATA("timestamp"),  // get the timestamp
field from metadata
  ts AS to_timestamp(timestamp) // normal computed column, parse the
string to TIMESTAMP type by using the metadata field
) WITH (
 ...
)

Best,
Kurt


On Tue, Sep 8, 2020 at 11:57 PM Timo Walther  wrote:


Hi Leonard,

the only alternative I see is that we introduce a concept that is
completely different to computed columns. This is also mentioned in the
rejected alternative section of the FLIP. Something like:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp INT SYSTEM_METADATA("timestamp") PERSISTED,
  headers MAP SYSTEM_METADATA("headers") PERSISTED
) WITH (
 ...
)

This way we would avoid confusion at all and can easily map columns to
metadata columns. The disadvantage is that users cannot call UDFs or
parse timestamps. This would need to be done in a real computed column.

I'm happy about better alternatives.

Regards,
Timo


On 08.09.20 15:37, Leonard Xu wrote:

HI, Timo

Thanks for driving this FLIP.

Sorry but I have a concern about Writing metadata via DynamicTableSink

section:


CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED,
headers AS CAST(SYSTEM_METADATA("headers") AS MAP)

PERSISTED

) WITH (
...
)
An insert statement could look like:

INSERT INTO kafka_table VALUES (
(1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
)

The proposed INERT syntax does not make sense to me, because it contains

computed(generated) column.

Both SQL server and Postgresql do not allow to insert value to computed

columns even they are persisted, this boke the generated column semantics
and may confuse user much.


For SQL server computed column[1]:

column_name AS computed_column_expression [ PERSISTED [ NOT NULL ] ]...
NOTE: A computed column cannot be the target of an INSERT or UPDATE

statement.


For Postgresql generated column[2]:

   height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) STORED
NOTE: A generated column cannot be written to directly. In INSERT or

UPDATE commands, a value cannot be specified for a generated column, but
the keyword DEFAULT may be specified.


It shouldn't be allowed to set/update value for generated column after

lookup the SQL 2016:

 ::=
INSERT INTO  

If  CTTVC is specified,

then every 
value constructor element> simply contained in CTTVC whose positionally

corresponding 

in  references a column of which some underlying

column is a generated column shall

be a .
A  specifies the default value of some

associated item.



[1]

https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
<
https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15


[2] https://www.postgresql.org/docs/12/ddl-generated-

[jira] [Created] (FLINK-19169) Support Pandas UDAF in PyFlink (FLIP-137)

2020-09-09 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-19169:


 Summary: Support Pandas UDAF in PyFlink (FLIP-137)
 Key: FLINK-19169
 URL: https://issues.apache.org/jira/browse/FLINK-19169
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.12.0


Pandas UDF has been supported in FLINK 1.11 
([FLIP-97|https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink]).
 It solves the high serialization/deserialization overhead in Python UDF and 
makes it convenient to leverage the popular Python libraries such as Pandas, 
Numpy, etc. Since Pandas UDF has so many advantages, we want to support Pandas 
UDAF to extend usage of Pandas UDF.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-09 Thread Dawid Wysakowicz
That's for sure. I am not claiming against it. What I am saying is that
we don't necessarily need a true "sorting" in this particular use case.
We only need to cluster records with the same keys together. We don't
need the keys to be logically sorted. What I am saying is that for
clustering the keys a binary order is enough. I agree this would not
work if we we were to implement an operation such as DataStream#sort.

Best,

Dawid

On 09/09/2020 08:22, Kurt Young wrote:
> I doubt that any sorting algorithm would work with only knowing the
> keys are different but without
> information of which is greater. 
>  
> Best,
> Kurt
>
>
> On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Ad. 1
>
> Yes, you are right in principle.
>
> Let me though clarify my proposal a bit. The proposed sort-style
> execution aims at a generic KeyedProcessFunction were all the
> "aggregations" are actually performed in the user code. It tries to
> improve the performance by actually removing the need to use
> RocksDB e.g.:
>
>     private static final class Summer
>     extends KeyedProcessFunction,
> Tuple2> {
>
>         
>
>     @Override
>     public void processElement(
>     Tuple2 value,
>     Context ctx,
>     Collector> out) throws Exception {
>     if (!Objects.equals(timerRegistered.value(),
> Boolean.TRUE)) {
>    
> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
>     timerRegistered.update(true);
>     }
>     Integer v = counter.value();
>     Integer incomingValue = value.f1;
>     if (v != null) {
>     v += incomingValue;
>     } else {
>     v = incomingValue;
>     }
>     counter.update(v);
>     }
>
>         
>
>    }
>
> Therefore I don't think the first part of your reply with
> separating the
> write and read workload applies here. We do not aim to create a
> competing API with the Table API. We think operations such as joins or
> analytical aggregations should be performed in Table API.
>
> As for the second part I agree it would be nice to fall back to the
> sorting approach only if a certain threshold of memory in a State
> Backend is used. This has some problems though. We would need a way to
> estimate the size of the occupied memory to tell when the threshold is
> reached. That is not easily doable by default e.g. in a
> MemoryStateBackend, as we do not serialize the values in the state
> backend by default. We would have to add that, but this would add the
> overhead of the serialization.
>
> This proposal aims at the cases where we do have a large state
> that will
> not fit into the memory and without the change users are forced to use
> RocksDB. If the state fits in memory I agree it will be better to do
> hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> think it is important to give users the choice to use one or the other
> approach. We might discuss which approach should be the default for
> RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> user configured state backend or sorting-based with a single key at a
> time backend. Moreover we could think if we should let users
> choose the
> sort vs hash "state backend" per operator. Would that suffice?
>
> Ad. 2
>
> I still think we can just use the first X bytes of the serialized form
> as the normalized key and fallback to comparing full keys on
> clashes. It
> is because we are actually not interested in a logical order, but we
> care only about the "grouping" aspect of the sorting. Therefore I
> think
> its enough to compare only parts of the full key as the normalized
> key.
>
> Thanks again for the really nice and thorough feedback!
>
> Best,
>
> Dawid
>
> On 08/09/2020 14:47, Kurt Young wrote:
> > Regarding #1, yes the state backend is definitely hash-based
> execution.
> > However there are some differences between
> > batch hash-based execution. The key difference is *random access &
> > read/write mixed workload". For example, by using
> > state backend in streaming execution, one have to mix the read
> and write
> > operations and all of them are actually random
> > access. But in a batch hash execution, we could divide the
> phases into
> > write and read. For example, we can build the
> > hash table first, with only write operations. And once the build
> is done,
> > we can start to read and trigger the user codes.
> > Take hash aggregation which blink planner implemented as an
> example, during
> > building phase, as

[jira] [Created] (FLINK-19170) Parameter naming error

2020-09-09 Thread sulei (Jira)
sulei created FLINK-19170:
-

 Summary: Parameter naming error
 Key: FLINK-19170
 URL: https://issues.apache.org/jira/browse/FLINK-19170
 Project: Flink
  Issue Type: Bug
Reporter: sulei






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19171) K8s Resource Manager may lead to resource leak after pod deleted

2020-09-09 Thread Yi Tang (Jira)
Yi Tang created FLINK-19171:
---

 Summary: K8s Resource Manager may lead to resource leak after pod 
deleted
 Key: FLINK-19171
 URL: https://issues.apache.org/jira/browse/FLINK-19171
 Project: Flink
  Issue Type: Bug
Reporter: Yi Tang


{code:java}
private void terminatedPodsInMainThread(List pods) {
   getMainThreadExecutor().execute(() -> {
  for (KubernetesPod pod : pods) {
 if (pod.isTerminated()) {
...
 }
  }
   });
}
{code}
Looks like that the RM only remove the pod from ledger if the pod 
"isTerminated", 

and the pod has been taken accounted after being created.

However, it is not complete by checking pod "isTerminated", e.g. a Pending pod 
is deleted manually.

 

Pls let me know if i misunderstand, thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-09 Thread Danny Chan
“But I think the planner needs to
know whether the input is insert-only or not.”

Does fromDataStream(dataStream, schema, changelogMode)

solve your concerns ?  People can pass around whatever ChangelogMode they like 
as an optional param.
By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.

Best,
Danny Chan
在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>
> But I think the planner needs to
> know whether the input is insert-only or not.


Re: [VOTE] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-09 Thread Andrey Zagrebin
+1

Best,
Andrey

On Tue, Sep 8, 2020 at 2:16 PM Yu Li  wrote:

> +1
>
> Best Regards,
> Yu
>
>
> On Tue, 8 Sep 2020 at 17:03, Aljoscha Krettek  wrote:
>
> > +1
> >
> > We just need to make sure to find a good name before the release but
> > shouldn't block any work on this.
> >
> > Aljoscha
> >
> > On 08.09.20 07:59, Xintong Song wrote:
> > > Thanks for the vote, @Jincheng.
> > >
> > >
> > > Concerning the namings, the original idea was, as you suggested, to
> have
> > > separate configuration names for batch and rocksdb while only one of
> them
> > > will take effect at a time.
> > >
> > >
> > > It was then in the discussion thread [1] that @Stepahn suggested to
> > combine
> > > these two.
> > >
> > >>  We never have batch algos and RocksDB mixed, having this as
> > separate
> > >> options is confusing as it suggests this can be combined arbitrarily.
> I
> > >> also think that a slim possibility that we may ever combine this in
> the
> > >> future is not enough reason to make it more complex/confusing.
> > >>
> > >
> > > This suggestion was also supported by others in the discussion thread.
> > > That's why we are trying to come up with a name that covers both batch
> > and
> > > rocksdb memory consumers.
> > >
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > > [1]
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-tp44146p44253.html
> > >
> > > On Tue, Sep 8, 2020 at 1:37 PM jincheng sun 
> > > wrote:
> > >
> > >> +1 for the proposal!
> > >>
> > >> Regarding the name of `BATCH_OP/ROCKSDB`, we can reserve the
> > configuration
> > >> names for batch and rocksdb respectively, ` batch_ OP` for batch,
> > "ROCKSDB"
> > >> for roockdb. and the default value as follows:
> > >>
> > >> {
> > >>  BATCH_OP: 70,
> > >>  ROCKSDB : 70,
> > >>  PYTHON : 30
> > >> }
> > >>
> > >> Only one of `BATCH_ OP` and `ROCKSDB` will work. What do you think?
> > >>
> > >> Best,
> > >> Jincheng
> > >>
> > >>
> > >> Xintong Song  于2020年9月7日周一 下午1:46写道:
> > >>
> > >>> Thanks for the votes.
> > >>>
> > >>> Concerning the name for batch/RocksDB memory consumer, how about
> > >> "execution
> > >>> memory"?
> > >>> We can further explain in docs and config option description that
> this
> > is
> > >>> used for job execution, which is currently dedicated to rocksdb in
> > >>> streaming and batch algorithms in batch.
> > >>>
> > >>> Thank you~
> > >>>
> > >>> Xintong Song
> > >>>
> > >>>
> > >>>
> > >>> On Mon, Sep 7, 2020 at 11:43 AM Yangze Guo 
> wrote:
> > >>>
> >  +1
> > 
> >  Best,
> >  Yangze Guo
> > 
> >  On Mon, Sep 7, 2020 at 10:54 AM Zhu Zhu  wrote:
> > >
> > > +1
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Dian Fu  于2020年9月7日周一 上午10:34写道:
> > >
> > >> +1
> > >>
> > >>> 在 2020年9月3日,下午8:46,Till Rohrmann  写道:
> > >>>
> > >>> Hi Xintong,
> > >>>
> > >>> thanks for starting the vote.
> > >>>
> > >>> +1 for the proposal given that we find a proper name for the
> > >>> different memory consumers (specifically the batch/RocksDB
> > >>> consumer)
> >  and
> > >>> their corresponding weights.
> > >>>
> > >>> Cheers,
> > >>> Till
> > >>>
> > >>> On Thu, Sep 3, 2020 at 12:43 PM Xintong Song <
> > >>> tonysong...@gmail.com>
> > >> wrote:
> > >>>
> >  Hi devs,
> > 
> >  I'd like to start a voting thread on FLIP-141[1], which proposes
> > >>> how
> >  managed memory should be shared by various use cases within a
> > >>> slot.
> >  The
> >  proposal has been discussed in [2].
> > 
> >  The vote will be open for at least 72h + weekends. I'll try to
> >  close it
> > >> on
> >  September 8, unless there is an objection or not enough votes.
> > 
> >  Thank you~
> > 
> >  Xintong Song
> > 
> > 
> >  [1]
> > 
> > 
> > >>
> > 
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-141%3A+Intra-Slot+Managed+Memory+Sharing#FLIP141:IntraSlotManagedMemorySharing-compatibility
> > 
> >  [2]
> > 
> > 
> > >>
> > 
> > >>>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-td44146.html
> > 
> > >>
> > >>
> > 
> > >>>
> > >>
> > >
> >
> >
>


Re: [VOTE] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-09 Thread Andrey Zagrebin
For the option name, maybe:
*flink.main*
or
*flink.managed* (this may be a bit confusing for existing users as we said
that the overall managed memory is managed by Flink)

On Wed, Sep 9, 2020 at 9:56 AM Andrey Zagrebin  wrote:

> +1
>
> Best,
> Andrey
>
> On Tue, Sep 8, 2020 at 2:16 PM Yu Li  wrote:
>
>> +1
>>
>> Best Regards,
>> Yu
>>
>>
>> On Tue, 8 Sep 2020 at 17:03, Aljoscha Krettek 
>> wrote:
>>
>> > +1
>> >
>> > We just need to make sure to find a good name before the release but
>> > shouldn't block any work on this.
>> >
>> > Aljoscha
>> >
>> > On 08.09.20 07:59, Xintong Song wrote:
>> > > Thanks for the vote, @Jincheng.
>> > >
>> > >
>> > > Concerning the namings, the original idea was, as you suggested, to
>> have
>> > > separate configuration names for batch and rocksdb while only one of
>> them
>> > > will take effect at a time.
>> > >
>> > >
>> > > It was then in the discussion thread [1] that @Stepahn suggested to
>> > combine
>> > > these two.
>> > >
>> > >>  We never have batch algos and RocksDB mixed, having this as
>> > separate
>> > >> options is confusing as it suggests this can be combined
>> arbitrarily. I
>> > >> also think that a slim possibility that we may ever combine this in
>> the
>> > >> future is not enough reason to make it more complex/confusing.
>> > >>
>> > >
>> > > This suggestion was also supported by others in the discussion thread.
>> > > That's why we are trying to come up with a name that covers both batch
>> > and
>> > > rocksdb memory consumers.
>> > >
>> > >
>> > > Thank you~
>> > >
>> > > Xintong Song
>> > >
>> > >
>> > > [1]
>> > >
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-tp44146p44253.html
>> > >
>> > > On Tue, Sep 8, 2020 at 1:37 PM jincheng sun > >
>> > > wrote:
>> > >
>> > >> +1 for the proposal!
>> > >>
>> > >> Regarding the name of `BATCH_OP/ROCKSDB`, we can reserve the
>> > configuration
>> > >> names for batch and rocksdb respectively, ` batch_ OP` for batch,
>> > "ROCKSDB"
>> > >> for roockdb. and the default value as follows:
>> > >>
>> > >> {
>> > >>  BATCH_OP: 70,
>> > >>  ROCKSDB : 70,
>> > >>  PYTHON : 30
>> > >> }
>> > >>
>> > >> Only one of `BATCH_ OP` and `ROCKSDB` will work. What do you think?
>> > >>
>> > >> Best,
>> > >> Jincheng
>> > >>
>> > >>
>> > >> Xintong Song  于2020年9月7日周一 下午1:46写道:
>> > >>
>> > >>> Thanks for the votes.
>> > >>>
>> > >>> Concerning the name for batch/RocksDB memory consumer, how about
>> > >> "execution
>> > >>> memory"?
>> > >>> We can further explain in docs and config option description that
>> this
>> > is
>> > >>> used for job execution, which is currently dedicated to rocksdb in
>> > >>> streaming and batch algorithms in batch.
>> > >>>
>> > >>> Thank you~
>> > >>>
>> > >>> Xintong Song
>> > >>>
>> > >>>
>> > >>>
>> > >>> On Mon, Sep 7, 2020 at 11:43 AM Yangze Guo 
>> wrote:
>> > >>>
>> >  +1
>> > 
>> >  Best,
>> >  Yangze Guo
>> > 
>> >  On Mon, Sep 7, 2020 at 10:54 AM Zhu Zhu  wrote:
>> > >
>> > > +1
>> > >
>> > > Thanks,
>> > > Zhu
>> > >
>> > > Dian Fu  于2020年9月7日周一 上午10:34写道:
>> > >
>> > >> +1
>> > >>
>> > >>> 在 2020年9月3日,下午8:46,Till Rohrmann  写道:
>> > >>>
>> > >>> Hi Xintong,
>> > >>>
>> > >>> thanks for starting the vote.
>> > >>>
>> > >>> +1 for the proposal given that we find a proper name for the
>> > >>> different memory consumers (specifically the batch/RocksDB
>> > >>> consumer)
>> >  and
>> > >>> their corresponding weights.
>> > >>>
>> > >>> Cheers,
>> > >>> Till
>> > >>>
>> > >>> On Thu, Sep 3, 2020 at 12:43 PM Xintong Song <
>> > >>> tonysong...@gmail.com>
>> > >> wrote:
>> > >>>
>> >  Hi devs,
>> > 
>> >  I'd like to start a voting thread on FLIP-141[1], which
>> proposes
>> > >>> how
>> >  managed memory should be shared by various use cases within a
>> > >>> slot.
>> >  The
>> >  proposal has been discussed in [2].
>> > 
>> >  The vote will be open for at least 72h + weekends. I'll try to
>> >  close it
>> > >> on
>> >  September 8, unless there is an objection or not enough votes.
>> > 
>> >  Thank you~
>> > 
>> >  Xintong Song
>> > 
>> > 
>> >  [1]
>> > 
>> > 
>> > >>
>> > 
>> > >>>
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-141%3A+Intra-Slot+Managed+Memory+Sharing#FLIP141:IntraSlotManagedMemorySharing-compatibility
>> > 
>> >  [2]
>> > 
>> > 
>> > >>
>> > 
>> > >>>
>> > >>
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-td44146.html
>> > 
>> > >>
>> > >>
>> > 
>> > >>>
>> > >>
>> > >
>> >
>> >
>>
>


Re: [DISCUSS] FLIP-142: Disentangle StateBackends from Checkpointing

2020-09-09 Thread Konstantin Knauf
Thanks for the initiative. Big +1. Would be interested to hear if the
proposed interfaces still make sense in the face of the new fault-tolerance
work that is planned. Stephan/Piotr will know.

On Tue, Sep 8, 2020 at 7:05 PM Seth Wiesman  wrote:

> Hi Devs,
>
> I'd like to propose an update to how state backends and checkpoint storage
> are configured to help users better understand Flink.
>
> Apache Flink's durability story is a mystery to many users. One of the most
> common recurring questions from users comes from not understanding the
> relationship between state, state backends, and snapshots. Some of this
> confusion can be abated with learning material but the question is so
> pervasive that we believe Flink’s user APIs should be better communicate
> what different components are responsible for.
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-142%3A+Disentangle+StateBackends+from+Checkpointing
>
>
> I look forward to a healthy discussion.
>
>
> Seth
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Jark Wu
Hi everyone,

I think we have a conclusion that the writable metadata shouldn't be
defined as a computed column, but a normal column.

"timestamp STRING SYSTEM_METADATA('timestamp')" is one of the approaches.
However, it is not SQL standard compliant, we need to be cautious enough
when adding new syntax.
Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
resolve the query-sink schema problem if it is read-only metadata. That
adds more stuff to learn for users.

>From my point of view, the "timestamp", "headers" are something like "key"
and "value" that stores with the real data. So why not define the
"timestamp" in the same way with "key" by using a "timestamp.field"
connector option?
On the other side, the read-only metadata, such as "offset", shouldn't be
defined as a normal column. So why not use the existing computed column
syntax for such metadata? Then we don't have the query-sink schema problem.
So here is my proposal:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  col1 STRING,
  col2 STRING,
  ts TIMESTAMP(3) WITH LOCAL TIME ZONE,-- ts is a normal field, so can
be read and written.
  offset AS SYSTEM_METADATA("offset")
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic',
  'key.fields' = 'id, name',
  'key.format' = 'csv',
  'value.format' = 'avro',
  'timestamp.field' = 'ts'-- define the mapping of Kafka timestamp
);

INSERT INTO kafka_table
SELECT id, name, col1, col2, rowtime FROM another_table;

I think this can solve all the problems without introducing any new syntax.
The only minor disadvantage is that we separate the definition way/syntax
of read-only metadata and read-write fields.
However, I don't think this is a big problem.

Best,
Jark


On Wed, 9 Sep 2020 at 15:09, Timo Walther  wrote:

> Hi Kurt,
>
> thanks for sharing your opinion. I'm totally up for not reusing computed
> columns. I think Jark was a big supporter of this syntax, @Jark are you
> fine with this as well? The non-computed column approach was only a
> "slightly rejected alternative".
>
> Furthermore, we would need to think about how such a new design
> influences the LIKE clause though.
>
> However, we should still keep the `PERSISTED` keyword as it influences
> the query->sink schema. If you look at the list of metadata for existing
> connectors and formats, we currently offer only two writable metadata
> fields. Otherwise, one would need to declare two tables whenever a
> metadata columns is read (one for the source, one for the sink). This
> can be quite inconvientient e.g. for just reading the topic.
>
> Regards,
> Timo
>
>
> On 09.09.20 08:52, Kurt Young wrote:
> > I also share the concern that reusing the computed column syntax but have
> > different semantics
> > would confuse users a lot.
> >
> > Besides, I think metadata fields are conceptually not the same with
> > computed columns. The metadata
> > field is a connector specific thing and it only contains the information
> > that where does the field come
> > from (during source) or where does the field need to write to (during
> > sink). It's more similar with normal
> > fields, with assumption that all these fields need going to the data
> part.
> >
> > Thus I'm more lean to the rejected alternative that Timo mentioned. And I
> > think we don't need the
> > PERSISTED keyword, SYSTEM_METADATA should be enough.
> >
> > During implementation, the framework only needs to pass such  > metadata field> information to the
> > connector, and the logic of handling such fields inside the connector
> > should be straightforward.
> >
> > Regarding the downside Timo mentioned:
> >
> >> The disadvantage is that users cannot call UDFs or parse timestamps.
> >
> > I think this is fairly simple to solve. Since the metadata field isn't a
> > computed column anymore, we can support
> > referencing such fields in the computed column. For example:
> >
> > CREATE TABLE kafka_table (
> >   id BIGINT,
> >   name STRING,
> >   timestamp STRING SYSTEM_METADATA("timestamp"),  // get the
> timestamp
> > field from metadata
> >   ts AS to_timestamp(timestamp) // normal computed column, parse the
> > string to TIMESTAMP type by using the metadata field
> > ) WITH (
> >  ...
> > )
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Sep 8, 2020 at 11:57 PM Timo Walther  wrote:
> >
> >> Hi Leonard,
> >>
> >> the only alternative I see is that we introduce a concept that is
> >> completely different to computed columns. This is also mentioned in the
> >> rejected alternative section of the FLIP. Something like:
> >>
> >> CREATE TABLE kafka_table (
> >>   id BIGINT,
> >>   name STRING,
> >>   timestamp INT SYSTEM_METADATA("timestamp") PERSISTED,
> >>   headers MAP SYSTEM_METADATA("headers") PERSISTED
> >> ) WITH (
> >>  ...
> >> )
> >>
> >> This way we would avoid confusion at all and can easily map columns to
> >> metadata columns. The disadvantage is that users cannot call UDFs or
> >> parse timestamps. This would nee

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-09 Thread Timo Walther
I had this in the inital design, but Jark had concerns at least for the 
`toChangelogStream(ChangelogMode)` (see earlier discussion).


`fromDataStream(dataStream, schema, changelogMode)` would be possible.

But in this case I would vote for a symmetric API. If we keep 
toChangelogStream we should also have a fromChangelogStream.


And if we unify `toChangelogStream` and `toDataStream`, retractions 
cannot be represented for non-Rows and users will experience duplicate 
records with a missing changeflag.


Regards,
Timo


On 09.09.20 09:31, Danny Chan wrote:

“But I think the planner needs to
know whether the input is insert-only or not.”

Does fromDataStream(dataStream, schema, changelogMode)

solve your concerns ?  People can pass around whatever ChangelogMode they like 
as an optional param.
By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.

Best,
Danny Chan
在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:


But I think the planner needs to
know whether the input is insert-only or not.






[jira] [Created] (FLINK-19172) [AbstractFileStateBackend]

2020-09-09 Thread Alessio Savi (Jira)
Alessio Savi created FLINK-19172:


 Summary: [AbstractFileStateBackend]
 Key: FLINK-19172
 URL: https://issues.apache.org/jira/browse/FLINK-19172
 Project: Flink
  Issue Type: Bug
  Components: FileSystems, Runtime / Checkpointing, Runtime / State 
Backends
Affects Versions: 1.8.0
Reporter: Alessio Savi
 Attachments: Flink.PNG

The method `validatePath` of class `AbstractFileStateBackend` does not check if 
the pathPart retrived from the input `Path` is blank. Instead, it only check if 
it is null.

Is this a bug?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-09 Thread Jark Wu
I prefer to have separate APIs for them as changelog stream requires Row
type.
It would make the API more straightforward and reduce the confusion.

Best,
Jark

On Wed, 9 Sep 2020 at 16:21, Timo Walther  wrote:

> I had this in the inital design, but Jark had concerns at least for the
> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>
> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>
> But in this case I would vote for a symmetric API. If we keep
> toChangelogStream we should also have a fromChangelogStream.
>
> And if we unify `toChangelogStream` and `toDataStream`, retractions
> cannot be represented for non-Rows and users will experience duplicate
> records with a missing changeflag.
>
> Regards,
> Timo
>
>
> On 09.09.20 09:31, Danny Chan wrote:
> > “But I think the planner needs to
> > know whether the input is insert-only or not.”
> >
> > Does fromDataStream(dataStream, schema, changelogMode)
> >
> > solve your concerns ?  People can pass around whatever ChangelogMode
> they like as an optional param.
> > By default: fromDataStream(dataStream, schema), the ChangelogMode is
> INSERT.
> >
> > Best,
> > Danny Chan
> > 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> >>
> >> But I think the planner needs to
> >> know whether the input is insert-only or not.
> >
>
>


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther

Hi Jark,

now we are back at the original design proposed by Dawid :D Yes, we 
should be cautious about adding new syntax. But the length of this 
discussion shows that we are looking for a good long-term solution. In 
this case I would rather vote for a deep integration into the syntax.


Computed columns are also not SQL standard compliant. And our DDL is 
neither, so we have some degree of freedom here.


Trying to solve everything via properties sounds rather like a hack to 
me. You are right that one could argue that "timestamp", "headers" are 
something like "key" and "value". However, mixing


`offset AS SYSTEM_METADATA("offset")`

and

`'timestamp.field' = 'ts'`

looks more confusing to users that an explicit

`offset AS CAST(SYSTEM_METADATA("offset") AS INT)`

or

`offset INT SYSTEM_METADATA("offset")`

that is symetric for both source and sink.

What do others think?

Regards,
Timo


On 09.09.20 10:09, Jark Wu wrote:

Hi everyone,

I think we have a conclusion that the writable metadata shouldn't be
defined as a computed column, but a normal column.

"timestamp STRING SYSTEM_METADATA('timestamp')" is one of the approaches.
However, it is not SQL standard compliant, we need to be cautious enough
when adding new syntax.
Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
resolve the query-sink schema problem if it is read-only metadata. That
adds more stuff to learn for users.


From my point of view, the "timestamp", "headers" are something like "key"

and "value" that stores with the real data. So why not define the
"timestamp" in the same way with "key" by using a "timestamp.field"
connector option?
On the other side, the read-only metadata, such as "offset", shouldn't be
defined as a normal column. So why not use the existing computed column
syntax for such metadata? Then we don't have the query-sink schema problem.
So here is my proposal:

CREATE TABLE kafka_table (
   id BIGINT,
   name STRING,
   col1 STRING,
   col2 STRING,
   ts TIMESTAMP(3) WITH LOCAL TIME ZONE,-- ts is a normal field, so can
be read and written.
   offset AS SYSTEM_METADATA("offset")
) WITH (
   'connector' = 'kafka',
   'topic' = 'test-topic',
   'key.fields' = 'id, name',
   'key.format' = 'csv',
   'value.format' = 'avro',
   'timestamp.field' = 'ts'-- define the mapping of Kafka timestamp
);

INSERT INTO kafka_table
SELECT id, name, col1, col2, rowtime FROM another_table;

I think this can solve all the problems without introducing any new syntax.
The only minor disadvantage is that we separate the definition way/syntax
of read-only metadata and read-write fields.
However, I don't think this is a big problem.

Best,
Jark


On Wed, 9 Sep 2020 at 15:09, Timo Walther  wrote:


Hi Kurt,

thanks for sharing your opinion. I'm totally up for not reusing computed
columns. I think Jark was a big supporter of this syntax, @Jark are you
fine with this as well? The non-computed column approach was only a
"slightly rejected alternative".

Furthermore, we would need to think about how such a new design
influences the LIKE clause though.

However, we should still keep the `PERSISTED` keyword as it influences
the query->sink schema. If you look at the list of metadata for existing
connectors and formats, we currently offer only two writable metadata
fields. Otherwise, one would need to declare two tables whenever a
metadata columns is read (one for the source, one for the sink). This
can be quite inconvientient e.g. for just reading the topic.

Regards,
Timo


On 09.09.20 08:52, Kurt Young wrote:

I also share the concern that reusing the computed column syntax but have
different semantics
would confuse users a lot.

Besides, I think metadata fields are conceptually not the same with
computed columns. The metadata
field is a connector specific thing and it only contains the information
that where does the field come
from (during source) or where does the field need to write to (during
sink). It's more similar with normal
fields, with assumption that all these fields need going to the data

part.


Thus I'm more lean to the rejected alternative that Timo mentioned. And I
think we don't need the
PERSISTED keyword, SYSTEM_METADATA should be enough.

During implementation, the framework only needs to pass such  information to the
connector, and the logic of handling such fields inside the connector
should be straightforward.

Regarding the downside Timo mentioned:


The disadvantage is that users cannot call UDFs or parse timestamps.


I think this is fairly simple to solve. Since the metadata field isn't a
computed column anymore, we can support
referencing such fields in the computed column. For example:

CREATE TABLE kafka_table (
   id BIGINT,
   name STRING,
   timestamp STRING SYSTEM_METADATA("timestamp"),  // get the

timestamp

field from metadata
   ts AS to_timestamp(timestamp) // normal computed column, parse the
string to TIMESTAMP type by using the metadata field
) WITH (
  ...

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Kurt Young
 I would vote for `offset INT SYSTEM_METADATA("offset")`.

I don't think we can stick with the SQL standard in DDL part forever,
especially as there are more and more
requirements coming from different connectors and external systems.

Best,
Kurt


On Wed, Sep 9, 2020 at 4:40 PM Timo Walther  wrote:

> Hi Jark,
>
> now we are back at the original design proposed by Dawid :D Yes, we
> should be cautious about adding new syntax. But the length of this
> discussion shows that we are looking for a good long-term solution. In
> this case I would rather vote for a deep integration into the syntax.
>
> Computed columns are also not SQL standard compliant. And our DDL is
> neither, so we have some degree of freedom here.
>
> Trying to solve everything via properties sounds rather like a hack to
> me. You are right that one could argue that "timestamp", "headers" are
> something like "key" and "value". However, mixing
>
> `offset AS SYSTEM_METADATA("offset")`
>
> and
>
> `'timestamp.field' = 'ts'`
>
> looks more confusing to users that an explicit
>
> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
>
> or
>
> `offset INT SYSTEM_METADATA("offset")`
>
> that is symetric for both source and sink.
>
> What do others think?
>
> Regards,
> Timo
>
>
> On 09.09.20 10:09, Jark Wu wrote:
> > Hi everyone,
> >
> > I think we have a conclusion that the writable metadata shouldn't be
> > defined as a computed column, but a normal column.
> >
> > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the approaches.
> > However, it is not SQL standard compliant, we need to be cautious enough
> > when adding new syntax.
> > Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
> > resolve the query-sink schema problem if it is read-only metadata. That
> > adds more stuff to learn for users.
> >
> >>From my point of view, the "timestamp", "headers" are something like
> "key"
> > and "value" that stores with the real data. So why not define the
> > "timestamp" in the same way with "key" by using a "timestamp.field"
> > connector option?
> > On the other side, the read-only metadata, such as "offset", shouldn't be
> > defined as a normal column. So why not use the existing computed column
> > syntax for such metadata? Then we don't have the query-sink schema
> problem.
> > So here is my proposal:
> >
> > CREATE TABLE kafka_table (
> >id BIGINT,
> >name STRING,
> >col1 STRING,
> >col2 STRING,
> >ts TIMESTAMP(3) WITH LOCAL TIME ZONE,-- ts is a normal field, so
> can
> > be read and written.
> >offset AS SYSTEM_METADATA("offset")
> > ) WITH (
> >'connector' = 'kafka',
> >'topic' = 'test-topic',
> >'key.fields' = 'id, name',
> >'key.format' = 'csv',
> >'value.format' = 'avro',
> >'timestamp.field' = 'ts'-- define the mapping of Kafka timestamp
> > );
> >
> > INSERT INTO kafka_table
> > SELECT id, name, col1, col2, rowtime FROM another_table;
> >
> > I think this can solve all the problems without introducing any new
> syntax.
> > The only minor disadvantage is that we separate the definition way/syntax
> > of read-only metadata and read-write fields.
> > However, I don't think this is a big problem.
> >
> > Best,
> > Jark
> >
> >
> > On Wed, 9 Sep 2020 at 15:09, Timo Walther  wrote:
> >
> >> Hi Kurt,
> >>
> >> thanks for sharing your opinion. I'm totally up for not reusing computed
> >> columns. I think Jark was a big supporter of this syntax, @Jark are you
> >> fine with this as well? The non-computed column approach was only a
> >> "slightly rejected alternative".
> >>
> >> Furthermore, we would need to think about how such a new design
> >> influences the LIKE clause though.
> >>
> >> However, we should still keep the `PERSISTED` keyword as it influences
> >> the query->sink schema. If you look at the list of metadata for existing
> >> connectors and formats, we currently offer only two writable metadata
> >> fields. Otherwise, one would need to declare two tables whenever a
> >> metadata columns is read (one for the source, one for the sink). This
> >> can be quite inconvientient e.g. for just reading the topic.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 09.09.20 08:52, Kurt Young wrote:
> >>> I also share the concern that reusing the computed column syntax but
> have
> >>> different semantics
> >>> would confuse users a lot.
> >>>
> >>> Besides, I think metadata fields are conceptually not the same with
> >>> computed columns. The metadata
> >>> field is a connector specific thing and it only contains the
> information
> >>> that where does the field come
> >>> from (during source) or where does the field need to write to (during
> >>> sink). It's more similar with normal
> >>> fields, with assumption that all these fields need going to the data
> >> part.
> >>>
> >>> Thus I'm more lean to the rejected alternative that Timo mentioned.
> And I
> >>> think we don't need the
> >>> PERSISTED keyword, SYSTEM_METADATA should be enough.
> >>>
> >>> During imp

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-09 Thread Danny Chan
I think it would bring in much confusion by a different API name just because 
the DataStream generic type is different.
If there are ChangelogMode that only works for Row, can we have a type check 
there ?

Switch to a new API name does not really solve the problem well, people still 
need to declare the ChangelogMode explicitly, and there are some confusions:

• Should DataStream of Row type always use #fromChangelogStream ?
• Does fromChangelogStream works for only INSERT ChangelogMode ?


Best,
Danny Chan
在 2020年9月9日 +0800 PM4:21,Timo Walther ,写道:
> I had this in the inital design, but Jark had concerns at least for the
> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>
> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>
> But in this case I would vote for a symmetric API. If we keep
> toChangelogStream we should also have a fromChangelogStream.
>
> And if we unify `toChangelogStream` and `toDataStream`, retractions
> cannot be represented for non-Rows and users will experience duplicate
> records with a missing changeflag.
>
> Regards,
> Timo
>
>
> On 09.09.20 09:31, Danny Chan wrote:
> > “But I think the planner needs to
> > know whether the input is insert-only or not.”
> >
> > Does fromDataStream(dataStream, schema, changelogMode)
> >
> > solve your concerns ?  People can pass around whatever ChangelogMode they 
> > like as an optional param.
> > By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.
> >
> > Best,
> > Danny Chan
> > 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> > >
> > > But I think the planner needs to
> > > know whether the input is insert-only or not.
> >
>


Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-09 Thread Aljoscha Krettek
I think Kurts concerns/comments are very valid and we need to implement 
such things in the future. However, I also think that we need to get 
started somewhere and I think what's proposed in this FLIP is a good 
starting point that we can build on. So we should not get paralyzed by 
thinking too far ahead into the future. Does that make sense?


Best,
Aljoscha

On 08.09.20 16:59, Dawid Wysakowicz wrote:

Ad. 1

Yes, you are right in principle.

Let me though clarify my proposal a bit. The proposed sort-style
execution aims at a generic KeyedProcessFunction were all the
"aggregations" are actually performed in the user code. It tries to
improve the performance by actually removing the need to use RocksDB e.g.:

     private static final class Summer
     extends KeyedProcessFunction,
Tuple2> {

         

     @Override
     public void processElement(
     Tuple2 value,
     Context ctx,
     Collector> out) throws Exception {
     if (!Objects.equals(timerRegistered.value(), Boolean.TRUE)) {
     ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
     timerRegistered.update(true);
     }
     Integer v = counter.value();
     Integer incomingValue = value.f1;
     if (v != null) {
     v += incomingValue;
     } else {
     v = incomingValue;
     }
     counter.update(v);
     }

         

    }

Therefore I don't think the first part of your reply with separating the
write and read workload applies here. We do not aim to create a
competing API with the Table API. We think operations such as joins or
analytical aggregations should be performed in Table API.

As for the second part I agree it would be nice to fall back to the
sorting approach only if a certain threshold of memory in a State
Backend is used. This has some problems though. We would need a way to
estimate the size of the occupied memory to tell when the threshold is
reached. That is not easily doable by default e.g. in a
MemoryStateBackend, as we do not serialize the values in the state
backend by default. We would have to add that, but this would add the
overhead of the serialization.

This proposal aims at the cases where we do have a large state that will
not fit into the memory and without the change users are forced to use
RocksDB. If the state fits in memory I agree it will be better to do
hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
think it is important to give users the choice to use one or the other
approach. We might discuss which approach should be the default for
RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
user configured state backend or sorting-based with a single key at a
time backend. Moreover we could think if we should let users choose the
sort vs hash "state backend" per operator. Would that suffice?

Ad. 2

I still think we can just use the first X bytes of the serialized form
as the normalized key and fallback to comparing full keys on clashes. It
is because we are actually not interested in a logical order, but we
care only about the "grouping" aspect of the sorting. Therefore I think
its enough to compare only parts of the full key as the normalized key.

Thanks again for the really nice and thorough feedback!

Best,

Dawid

On 08/09/2020 14:47, Kurt Young wrote:

Regarding #1, yes the state backend is definitely hash-based execution.
However there are some differences between
batch hash-based execution. The key difference is *random access &
read/write mixed workload". For example, by using
state backend in streaming execution, one have to mix the read and write
operations and all of them are actually random
access. But in a batch hash execution, we could divide the phases into
write and read. For example, we can build the
hash table first, with only write operations. And once the build is done,
we can start to read and trigger the user codes.
Take hash aggregation which blink planner implemented as an example, during
building phase, as long as the hash map
could fit into memory, we will update the accumulators directly in the hash
map. And once we are running out of memory,
we then fall back to sort based execution. It improves the performance a
lot if the incoming data can be processed in
memory.

Regarding #2, IIUC you are actually describing a binary format of key, not
normalized key which is used in DataSet. I will
take String for example. If we have lots of keys with length all greater
than, let's say 20. In your proposal, you will encode
the whole string in the prefix of your composed data (  + 
+  ). And when you compare
records, you will actually compare the *whole* key of the record. For
normalized key, it's fixed-length in this case, IIRC it will
take 8 bytes to represent the string. And the sorter will store the
normalized key and offset in a dedicated array. When doing
th

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-09 Thread Kurt Young
Yes, I didn't intend to block this FLIP, and some of the comments are
actually implementation details.
And all of them are handled internally, not visible to users, thus we can
also change or improve them
in the future.

Best,
Kurt


On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek  wrote:

> I think Kurts concerns/comments are very valid and we need to implement
> such things in the future. However, I also think that we need to get
> started somewhere and I think what's proposed in this FLIP is a good
> starting point that we can build on. So we should not get paralyzed by
> thinking too far ahead into the future. Does that make sense?
>
> Best,
> Aljoscha
>
> On 08.09.20 16:59, Dawid Wysakowicz wrote:
> > Ad. 1
> >
> > Yes, you are right in principle.
> >
> > Let me though clarify my proposal a bit. The proposed sort-style
> > execution aims at a generic KeyedProcessFunction were all the
> > "aggregations" are actually performed in the user code. It tries to
> > improve the performance by actually removing the need to use RocksDB
> e.g.:
> >
> >  private static final class Summer
> >  extends KeyedProcessFunction,
> > Tuple2> {
> >
> >  
> >
> >  @Override
> >  public void processElement(
> >  Tuple2 value,
> >  Context ctx,
> >  Collector> out) throws Exception {
> >  if (!Objects.equals(timerRegistered.value(), Boolean.TRUE))
> {
> >
> ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);
> >  timerRegistered.update(true);
> >  }
> >  Integer v = counter.value();
> >  Integer incomingValue = value.f1;
> >  if (v != null) {
> >  v += incomingValue;
> >  } else {
> >  v = incomingValue;
> >  }
> >  counter.update(v);
> >  }
> >
> >  
> >
> > }
> >
> > Therefore I don't think the first part of your reply with separating the
> > write and read workload applies here. We do not aim to create a
> > competing API with the Table API. We think operations such as joins or
> > analytical aggregations should be performed in Table API.
> >
> > As for the second part I agree it would be nice to fall back to the
> > sorting approach only if a certain threshold of memory in a State
> > Backend is used. This has some problems though. We would need a way to
> > estimate the size of the occupied memory to tell when the threshold is
> > reached. That is not easily doable by default e.g. in a
> > MemoryStateBackend, as we do not serialize the values in the state
> > backend by default. We would have to add that, but this would add the
> > overhead of the serialization.
> >
> > This proposal aims at the cases where we do have a large state that will
> > not fit into the memory and without the change users are forced to use
> > RocksDB. If the state fits in memory I agree it will be better to do
> > hash-based aggregations e.g. using the MemoryStateBackend. Therefore I
> > think it is important to give users the choice to use one or the other
> > approach. We might discuss which approach should be the default for
> > RuntimeMode.BATCH proposed in FLIP-134. Should it be hash-based with
> > user configured state backend or sorting-based with a single key at a
> > time backend. Moreover we could think if we should let users choose the
> > sort vs hash "state backend" per operator. Would that suffice?
> >
> > Ad. 2
> >
> > I still think we can just use the first X bytes of the serialized form
> > as the normalized key and fallback to comparing full keys on clashes. It
> > is because we are actually not interested in a logical order, but we
> > care only about the "grouping" aspect of the sorting. Therefore I think
> > its enough to compare only parts of the full key as the normalized key.
> >
> > Thanks again for the really nice and thorough feedback!
> >
> > Best,
> >
> > Dawid
> >
> > On 08/09/2020 14:47, Kurt Young wrote:
> >> Regarding #1, yes the state backend is definitely hash-based execution.
> >> However there are some differences between
> >> batch hash-based execution. The key difference is *random access &
> >> read/write mixed workload". For example, by using
> >> state backend in streaming execution, one have to mix the read and write
> >> operations and all of them are actually random
> >> access. But in a batch hash execution, we could divide the phases into
> >> write and read. For example, we can build the
> >> hash table first, with only write operations. And once the build is
> done,
> >> we can start to read and trigger the user codes.
> >> Take hash aggregation which blink planner implemented as an example,
> during
> >> building phase, as long as the hash map
> >> could fit into memory, we will update the accumulators directly in the
> hash
> >> map. And once we are running out of memory,
> >> we then fall back to sort based execution. It improves 

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Danny Chan
"offset INT SYSTEM_METADATA("offset")"

This is actually Oracle or MySQL style computed column syntax.

"You are right that one could argue that "timestamp", "headers" are
something like "key" and "value""

I have the same feeling, both key value and headers timestamp are *real* data
stored in the consumed record, they are not computed or generated.

"Trying to solve everything via properties sounds rather like a hack to
me"

Things are not that hack if we can unify the routines or the definitions
(all from the computed column way or all from the table options), i also
think that it is a hacky that we mix in 2 kinds of syntax for different
kinds of metadata (read-only and read-write). In this FLIP, we declare the
Kafka key fields with table options but SYSTEM_METADATA for other metadata,
that is a hacky thing or something in-consistent.

Kurt Young  于2020年9月9日周三 下午4:48写道:

>  I would vote for `offset INT SYSTEM_METADATA("offset")`.
>
> I don't think we can stick with the SQL standard in DDL part forever,
> especially as there are more and more
> requirements coming from different connectors and external systems.
>
> Best,
> Kurt
>
>
> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther  wrote:
>
> > Hi Jark,
> >
> > now we are back at the original design proposed by Dawid :D Yes, we
> > should be cautious about adding new syntax. But the length of this
> > discussion shows that we are looking for a good long-term solution. In
> > this case I would rather vote for a deep integration into the syntax.
> >
> > Computed columns are also not SQL standard compliant. And our DDL is
> > neither, so we have some degree of freedom here.
> >
> > Trying to solve everything via properties sounds rather like a hack to
> > me. You are right that one could argue that "timestamp", "headers" are
> > something like "key" and "value". However, mixing
> >
> > `offset AS SYSTEM_METADATA("offset")`
> >
> > and
> >
> > `'timestamp.field' = 'ts'`
> >
> > looks more confusing to users that an explicit
> >
> > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
> >
> > or
> >
> > `offset INT SYSTEM_METADATA("offset")`
> >
> > that is symetric for both source and sink.
> >
> > What do others think?
> >
> > Regards,
> > Timo
> >
> >
> > On 09.09.20 10:09, Jark Wu wrote:
> > > Hi everyone,
> > >
> > > I think we have a conclusion that the writable metadata shouldn't be
> > > defined as a computed column, but a normal column.
> > >
> > > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the
> approaches.
> > > However, it is not SQL standard compliant, we need to be cautious
> enough
> > > when adding new syntax.
> > > Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
> > > resolve the query-sink schema problem if it is read-only metadata. That
> > > adds more stuff to learn for users.
> > >
> > >>From my point of view, the "timestamp", "headers" are something like
> > "key"
> > > and "value" that stores with the real data. So why not define the
> > > "timestamp" in the same way with "key" by using a "timestamp.field"
> > > connector option?
> > > On the other side, the read-only metadata, such as "offset", shouldn't
> be
> > > defined as a normal column. So why not use the existing computed column
> > > syntax for such metadata? Then we don't have the query-sink schema
> > problem.
> > > So here is my proposal:
> > >
> > > CREATE TABLE kafka_table (
> > >id BIGINT,
> > >name STRING,
> > >col1 STRING,
> > >col2 STRING,
> > >ts TIMESTAMP(3) WITH LOCAL TIME ZONE,-- ts is a normal field, so
> > can
> > > be read and written.
> > >offset AS SYSTEM_METADATA("offset")
> > > ) WITH (
> > >'connector' = 'kafka',
> > >'topic' = 'test-topic',
> > >'key.fields' = 'id, name',
> > >'key.format' = 'csv',
> > >'value.format' = 'avro',
> > >'timestamp.field' = 'ts'-- define the mapping of Kafka timestamp
> > > );
> > >
> > > INSERT INTO kafka_table
> > > SELECT id, name, col1, col2, rowtime FROM another_table;
> > >
> > > I think this can solve all the problems without introducing any new
> > syntax.
> > > The only minor disadvantage is that we separate the definition
> way/syntax
> > > of read-only metadata and read-write fields.
> > > However, I don't think this is a big problem.
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Wed, 9 Sep 2020 at 15:09, Timo Walther  wrote:
> > >
> > >> Hi Kurt,
> > >>
> > >> thanks for sharing your opinion. I'm totally up for not reusing
> computed
> > >> columns. I think Jark was a big supporter of this syntax, @Jark are
> you
> > >> fine with this as well? The non-computed column approach was only a
> > >> "slightly rejected alternative".
> > >>
> > >> Furthermore, we would need to think about how such a new design
> > >> influences the LIKE clause though.
> > >>
> > >> However, we should still keep the `PERSISTED` keyword as it influences
> > >> the query->sink schema. If you look at the list of metadata for
> existing
> > >> connectors an

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Jark Wu
Hi Danny,

This is not Oracle and MySQL computed column syntax, because there is no
"AS" after the type.

Hi everyone,

If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we
must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink
schema problem.
Personally, I think we can use a shorter keyword "METADATA" for
"SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system function
and confuse users this looks like a computed column.


Best,
Jark



On Wed, 9 Sep 2020 at 17:23, Danny Chan  wrote:

> "offset INT SYSTEM_METADATA("offset")"
>
> This is actually Oracle or MySQL style computed column syntax.
>
> "You are right that one could argue that "timestamp", "headers" are
> something like "key" and "value""
>
> I have the same feeling, both key value and headers timestamp are *real*
> data
> stored in the consumed record, they are not computed or generated.
>
> "Trying to solve everything via properties sounds rather like a hack to
> me"
>
> Things are not that hack if we can unify the routines or the definitions
> (all from the computed column way or all from the table options), i also
> think that it is a hacky that we mix in 2 kinds of syntax for different
> kinds of metadata (read-only and read-write). In this FLIP, we declare the
> Kafka key fields with table options but SYSTEM_METADATA for other metadata,
> that is a hacky thing or something in-consistent.
>
> Kurt Young  于2020年9月9日周三 下午4:48写道:
>
> >  I would vote for `offset INT SYSTEM_METADATA("offset")`.
> >
> > I don't think we can stick with the SQL standard in DDL part forever,
> > especially as there are more and more
> > requirements coming from different connectors and external systems.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 9, 2020 at 4:40 PM Timo Walther  wrote:
> >
> > > Hi Jark,
> > >
> > > now we are back at the original design proposed by Dawid :D Yes, we
> > > should be cautious about adding new syntax. But the length of this
> > > discussion shows that we are looking for a good long-term solution. In
> > > this case I would rather vote for a deep integration into the syntax.
> > >
> > > Computed columns are also not SQL standard compliant. And our DDL is
> > > neither, so we have some degree of freedom here.
> > >
> > > Trying to solve everything via properties sounds rather like a hack to
> > > me. You are right that one could argue that "timestamp", "headers" are
> > > something like "key" and "value". However, mixing
> > >
> > > `offset AS SYSTEM_METADATA("offset")`
> > >
> > > and
> > >
> > > `'timestamp.field' = 'ts'`
> > >
> > > looks more confusing to users that an explicit
> > >
> > > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
> > >
> > > or
> > >
> > > `offset INT SYSTEM_METADATA("offset")`
> > >
> > > that is symetric for both source and sink.
> > >
> > > What do others think?
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 09.09.20 10:09, Jark Wu wrote:
> > > > Hi everyone,
> > > >
> > > > I think we have a conclusion that the writable metadata shouldn't be
> > > > defined as a computed column, but a normal column.
> > > >
> > > > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the
> > approaches.
> > > > However, it is not SQL standard compliant, we need to be cautious
> > enough
> > > > when adding new syntax.
> > > > Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
> > > > resolve the query-sink schema problem if it is read-only metadata.
> That
> > > > adds more stuff to learn for users.
> > > >
> > > >>From my point of view, the "timestamp", "headers" are something like
> > > "key"
> > > > and "value" that stores with the real data. So why not define the
> > > > "timestamp" in the same way with "key" by using a "timestamp.field"
> > > > connector option?
> > > > On the other side, the read-only metadata, such as "offset",
> shouldn't
> > be
> > > > defined as a normal column. So why not use the existing computed
> column
> > > > syntax for such metadata? Then we don't have the query-sink schema
> > > problem.
> > > > So here is my proposal:
> > > >
> > > > CREATE TABLE kafka_table (
> > > >id BIGINT,
> > > >name STRING,
> > > >col1 STRING,
> > > >col2 STRING,
> > > >ts TIMESTAMP(3) WITH LOCAL TIME ZONE,-- ts is a normal field,
> so
> > > can
> > > > be read and written.
> > > >offset AS SYSTEM_METADATA("offset")
> > > > ) WITH (
> > > >'connector' = 'kafka',
> > > >'topic' = 'test-topic',
> > > >'key.fields' = 'id, name',
> > > >'key.format' = 'csv',
> > > >'value.format' = 'avro',
> > > >'timestamp.field' = 'ts'-- define the mapping of Kafka
> timestamp
> > > > );
> > > >
> > > > INSERT INTO kafka_table
> > > > SELECT id, name, col1, col2, rowtime FROM another_table;
> > > >
> > > > I think this can solve all the problems without introducing any new
> > > syntax.
> > > > The only minor disadvantage is that we separate the definition
> > way/syntax
> > > > of read-only met

[jira] [Created] (FLINK-19173) Add Pandas Batch Group Aggregation Function Operator

2020-09-09 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-19173:


 Summary: Add Pandas Batch Group Aggregation Function Operator
 Key: FLINK-19173
 URL: https://issues.apache.org/jira/browse/FLINK-19173
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.12.0


Add Pandas Batch Group Aggregation Function Operator



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-09 Thread Stephan Ewen
Hi!

I read through the FLIP and looks good to me. One suggestion and one
question:

Regarding naming, we could call the ROCKSDB/BATCH_OP category DATAPROC
because this is the memory that goes into holding (and structuring) the
data.

I am a bit confused about the Scope enum (with values Slot and Op). Do we
need to store this in the configuration or can we drop this?
>From my understanding, this is transparent already:
  - When anyone goes to the MemoryManager, they ask for a fraction of the
Slot's budget.
  - RocksDB (which is per slot) goes directly to the MemoryManager
  - Python process (per slot) goes directly to the MemoryManager
  - Batch algorithms apply their local operator weight before going to the
MemoryManager, so by the time the allocate memory, it is already the right
fraction per-slot.

Best,
Stephan


On Fri, Sep 4, 2020 at 3:46 AM Xintong Song  wrote:

> Thanks Till, `taskmanager.memory.managed.consumer-weights` sounds good to
> me.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Sep 3, 2020 at 8:44 PM Till Rohrmann  wrote:
>
> > Thanks for updating the FLIP Xintong. It looks good to me. One minor
> > comment is that we could name the configuration parameter
> > also taskmanager.memory.managed.consumer-weights which might be a bit
> more
> > expressive what this option does.
> >
> > Cheers,
> > Till
> >
> > On Thu, Sep 3, 2020 at 12:44 PM Xintong Song 
> > wrote:
> >
> > > Thanks all for the feedback.
> > >
> > > FYI, I've opened a voting thread[1] on this.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > > [1]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-141-Intra-Slot-Managed-Memory-Sharing-td44358.html
> > >
> > >
> > > On Thu, Sep 3, 2020 at 2:54 PM Zhu Zhu  wrote:
> > >
> > > > Thanks for proposing this improvement! @Xintong
> > > > The proposal looks good to me. Agreed that we should make it as
> simple
> > as
> > > > possible for users to understand.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Dian Fu  于2020年9月3日周四 下午2:11写道:
> > > >
> > > > > Thanks for driving this FLIP, Xintong! +1 to the updated version.
> > > > >
> > > > > > 在 2020年9月2日,下午6:09,Xintong Song  写道:
> > > > > >
> > > > > > Thanks for the input, Yu.
> > > > > >
> > > > > > I believe the current proposal should work with RocksDB, or any
> > other
> > > > > state
> > > > > > backend, using memory at either the slot or the scope. With the
> > > > proposed
> > > > > > approach, all we need is an indicator (e.g., a configuration
> > option)
> > > > > > telling us which scope should we calculate the fractions for.
> > > > > >
> > > > > > Thank you~
> > > > > >
> > > > > > Xintong Song
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Sep 2, 2020 at 4:53 PM Yu Li  wrote:
> > > > > >
> > > > > >> Thanks for compiling the FLIP Xintong, and +1 for the updated
> doc.
> > > > > >>
> > > > > >> Just one supplement for the RocksDB state backend part:
> > > > > >>
> > > > > >> It's true that currently we're using managed memory at the slot
> > > scope.
> > > > > >> However, IMHO, we may support setting weights for different
> > stateful
> > > > > >> operators (for advanced usage) in future. For example, users may
> > > > choose
> > > > > to
> > > > > >> set higher weights for join operator over aggregation operator,
> to
> > > > give
> > > > > >> more memory to those with bigger states. In this case, we may
> also
> > > use
> > > > > >> managed memory at the operator scope for state backends. And if
> I
> > > > > >> understand correctly, the current design could cover this case
> > well.
> > > > > >>
> > > > > >> Best Regards,
> > > > > >> Yu
> > > > > >>
> > > > > >>
> > > > > >> On Wed, 2 Sep 2020 at 15:39, Xintong Song <
> tonysong...@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >>> Thanks all for the feedback and discussion.
> > > > > >>>
> > > > > >>> I have updated the FLIP, with the following changes.
> > > > > >>>
> > > > > >>>   - Choose the main proposal over the alternative approach
> > > > > >>>   - Combine weights of RocksDB and batch operators
> > > > > >>>   - Expose weights through configuration options, rather than
> via
> > > > > >>>   ExecutionConfig.
> > > > > >>>   - Add implementation plan.
> > > > > >>>
> > > > > >>> Please help take another look.
> > > > > >>>
> > > > > >>> Thank you~
> > > > > >>>
> > > > > >>> Xintong Song
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> On Wed, Sep 2, 2020 at 2:41 PM Xintong Song <
> > tonysong...@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >>>
> > > > >  Thanks for the inputs, Aljoscha & Till.
> > > > > 
> > > > > 
> > > > >  # Weight Configuration
> > > > > 
> > > > > 
> > > > >  I think exposing the knobs incrementally is a good idea.
> > However,
> > > > I'm
> > > > > >> not
> > > > >  sure about non-configurable as the first step.
> > > > > 
> > > > > 
> > > > >  Currently, users can tune memory for rocksdb
> > > > >  ('taskmanag

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Leonard Xu
Hi everyone,

I’m +1 for "offset INT SYSTEM_METADATA("offset”)” if we have to make a choice.

It’s not a generated column syntax and thus we can get rid of the limitation of 
generated column.

About distinguishing the read-only metadata and writeable metadata, I prefer to 
add keyword after  SYSTEM_METADATA rather than declaring in with option fields.
And for the keyword, I tend to do not use "PERSISTED” or “STORED” which have 
been used in SQL server and Postgresql.  All metadata are readable and only two 
are writeable,
how about simply use “WRITABLE” for “timestamp” and “headers” ?

+1 for Jark’s proposal to make the SYSTEM_METADATA keyword shorter.



Best,
Leonard
 


> 在 2020年9月9日,17:41,Jark Wu  写道:
> 
> Hi Danny,
> 
> This is not Oracle and MySQL computed column syntax, because there is no
> "AS" after the type.
> 
> Hi everyone,
> 
> If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we
> must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink
> schema problem.
> Personally, I think we can use a shorter keyword "METADATA" for
> "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system function
> and confuse users this looks like a computed column.
> 
> 
> Best,
> Jark
> 
> 
> 
> On Wed, 9 Sep 2020 at 17:23, Danny Chan  wrote:
> 
>> "offset INT SYSTEM_METADATA("offset")"
>> 
>> This is actually Oracle or MySQL style computed column syntax.
>> 
>> "You are right that one could argue that "timestamp", "headers" are
>> something like "key" and "value""
>> 
>> I have the same feeling, both key value and headers timestamp are *real*
>> data
>> stored in the consumed record, they are not computed or generated.
>> 
>> "Trying to solve everything via properties sounds rather like a hack to
>> me"
>> 
>> Things are not that hack if we can unify the routines or the definitions
>> (all from the computed column way or all from the table options), i also
>> think that it is a hacky that we mix in 2 kinds of syntax for different
>> kinds of metadata (read-only and read-write). In this FLIP, we declare the
>> Kafka key fields with table options but SYSTEM_METADATA for other metadata,
>> that is a hacky thing or something in-consistent.
>> 
>> Kurt Young  于2020年9月9日周三 下午4:48写道:
>> 
>>> I would vote for `offset INT SYSTEM_METADATA("offset")`.
>>> 
>>> I don't think we can stick with the SQL standard in DDL part forever,
>>> especially as there are more and more
>>> requirements coming from different connectors and external systems.
>>> 
>>> Best,
>>> Kurt
>>> 
>>> 
>>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther  wrote:
>>> 
 Hi Jark,
 
 now we are back at the original design proposed by Dawid :D Yes, we
 should be cautious about adding new syntax. But the length of this
 discussion shows that we are looking for a good long-term solution. In
 this case I would rather vote for a deep integration into the syntax.
 
 Computed columns are also not SQL standard compliant. And our DDL is
 neither, so we have some degree of freedom here.
 
 Trying to solve everything via properties sounds rather like a hack to
 me. You are right that one could argue that "timestamp", "headers" are
 something like "key" and "value". However, mixing
 
 `offset AS SYSTEM_METADATA("offset")`
 
 and
 
 `'timestamp.field' = 'ts'`
 
 looks more confusing to users that an explicit
 
 `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
 
 or
 
 `offset INT SYSTEM_METADATA("offset")`
 
 that is symetric for both source and sink.
 
 What do others think?
 
 Regards,
 Timo
 
 
 On 09.09.20 10:09, Jark Wu wrote:
> Hi everyone,
> 
> I think we have a conclusion that the writable metadata shouldn't be
> defined as a computed column, but a normal column.
> 
> "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the
>>> approaches.
> However, it is not SQL standard compliant, we need to be cautious
>>> enough
> when adding new syntax.
> Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
> resolve the query-sink schema problem if it is read-only metadata.
>> That
> adds more stuff to learn for users.
> 
>> From my point of view, the "timestamp", "headers" are something like
 "key"
> and "value" that stores with the real data. So why not define the
> "timestamp" in the same way with "key" by using a "timestamp.field"
> connector option?
> On the other side, the read-only metadata, such as "offset",
>> shouldn't
>>> be
> defined as a normal column. So why not use the existing computed
>> column
> syntax for such metadata? Then we don't have the query-sink schema
 problem.
> So here is my proposal:
> 
> CREATE TABLE kafka_table (
>   id BIGINT,
>   name STRING,
>   col1 STRING,
>   col2 STRING,
>   ts TIMESTAMP(3) WITH LOCAL TIME ZO

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther

Hi everyone,

"key" and "value" in the properties are a special case because they need 
to configure a format. So key and value are more than just metadata. 
Jark's example for setting a timestamp would work but as the FLIP 
discusses, we have way more metadata fields like headers, epoch-leader, 
etc. Having a property for all of this metadata would mess up the WITH 
section entirely. Furthermore, we also want to deal with metadata from 
the formats. Solving this through properties as well would further 
complicate the property design.


Personally, I still like the computed column design more because it 
allows to have full flexibility to compute the final column:


timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))

Instead of having a helper column and a real column in the table:

helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
realTimestamp AS adjustTimestamp(helperTimestamp)

But I see that the discussion leans towards:

timestamp INT SYSTEM_METADATA("ts")

Which is fine with me. It is the shortest solution, because we don't 
need additional CAST. We can discuss the syntax, so that confusion with 
computed columns can be avoided.


timestamp INT USING SYSTEM_METADATA("ts")
timestamp INT FROM SYSTEM_METADATA("ts")
timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED

We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM 
makes it clearer that it comes magically from the system.


What do you think?

Regards,
Timo



On 09.09.20 11:41, Jark Wu wrote:

Hi Danny,

This is not Oracle and MySQL computed column syntax, because there is no
"AS" after the type.

Hi everyone,

If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we
must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink
schema problem.
Personally, I think we can use a shorter keyword "METADATA" for
"SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system function
and confuse users this looks like a computed column.


Best,
Jark



On Wed, 9 Sep 2020 at 17:23, Danny Chan  wrote:


"offset INT SYSTEM_METADATA("offset")"

This is actually Oracle or MySQL style computed column syntax.

"You are right that one could argue that "timestamp", "headers" are
something like "key" and "value""

I have the same feeling, both key value and headers timestamp are *real*
data
stored in the consumed record, they are not computed or generated.

"Trying to solve everything via properties sounds rather like a hack to
me"

Things are not that hack if we can unify the routines or the definitions
(all from the computed column way or all from the table options), i also
think that it is a hacky that we mix in 2 kinds of syntax for different
kinds of metadata (read-only and read-write). In this FLIP, we declare the
Kafka key fields with table options but SYSTEM_METADATA for other metadata,
that is a hacky thing or something in-consistent.

Kurt Young  于2020年9月9日周三 下午4:48写道:


  I would vote for `offset INT SYSTEM_METADATA("offset")`.

I don't think we can stick with the SQL standard in DDL part forever,
especially as there are more and more
requirements coming from different connectors and external systems.

Best,
Kurt


On Wed, Sep 9, 2020 at 4:40 PM Timo Walther  wrote:


Hi Jark,

now we are back at the original design proposed by Dawid :D Yes, we
should be cautious about adding new syntax. But the length of this
discussion shows that we are looking for a good long-term solution. In
this case I would rather vote for a deep integration into the syntax.

Computed columns are also not SQL standard compliant. And our DDL is
neither, so we have some degree of freedom here.

Trying to solve everything via properties sounds rather like a hack to
me. You are right that one could argue that "timestamp", "headers" are
something like "key" and "value". However, mixing

`offset AS SYSTEM_METADATA("offset")`

and

`'timestamp.field' = 'ts'`

looks more confusing to users that an explicit

`offset AS CAST(SYSTEM_METADATA("offset") AS INT)`

or

`offset INT SYSTEM_METADATA("offset")`

that is symetric for both source and sink.

What do others think?

Regards,
Timo


On 09.09.20 10:09, Jark Wu wrote:

Hi everyone,

I think we have a conclusion that the writable metadata shouldn't be
defined as a computed column, but a normal column.

"timestamp STRING SYSTEM_METADATA('timestamp')" is one of the

approaches.

However, it is not SQL standard compliant, we need to be cautious

enough

when adding new syntax.
Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
resolve the query-sink schema problem if it is read-only metadata.

That

adds more stuff to learn for users.

>From my point of view, the "timestamp", "headers" are something like

"key"

and "value" that stores with the real data. So why not define the
"timestamp" in the same way with "key" by using a "timestamp.field"
connector option?
On the other side, the read-only metadata, such as "offset",

shouldn't

be

d

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-09 Thread Timo Walther

I agree with Jark. It reduces confusion.

The DataStream API doesn't know changelog processing at all. A 
DataStream of Row can be used with both `fromDataStream` and 
`fromChangelogStream`. But only the latter API will interpret it as a 
changelog something.


And as I mentioned before, the `toChangelogStream` must work with Row 
otherwise users are confused due to duplicate records with a missing 
changeflag.


I will update the FLIP-136 a last time. I hope we can then continue to a 
vote.


Regards,
Timo


On 09.09.20 10:50, Danny Chan wrote:

I think it would bring in much confusion by a different API name just because 
the DataStream generic type is different.
If there are ChangelogMode that only works for Row, can we have a type check 
there ?

Switch to a new API name does not really solve the problem well, people still 
need to declare the ChangelogMode explicitly, and there are some confusions:

• Should DataStream of Row type always use #fromChangelogStream ?
• Does fromChangelogStream works for only INSERT ChangelogMode ?


Best,
Danny Chan
在 2020年9月9日 +0800 PM4:21,Timo Walther ,写道:

I had this in the inital design, but Jark had concerns at least for the
`toChangelogStream(ChangelogMode)` (see earlier discussion).

`fromDataStream(dataStream, schema, changelogMode)` would be possible.

But in this case I would vote for a symmetric API. If we keep
toChangelogStream we should also have a fromChangelogStream.

And if we unify `toChangelogStream` and `toDataStream`, retractions
cannot be represented for non-Rows and users will experience duplicate
records with a missing changeflag.

Regards,
Timo


On 09.09.20 09:31, Danny Chan wrote:

“But I think the planner needs to
know whether the input is insert-only or not.”

Does fromDataStream(dataStream, schema, changelogMode)

solve your concerns ?  People can pass around whatever ChangelogMode they like 
as an optional param.
By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.

Best,
Danny Chan
在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:


But I think the planner needs to
know whether the input is insert-only or not.










Re: [DISCUSS] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-09 Thread Xintong Song
Thanks for the suggestion, @Stephan.

DATAPROC makes good sense to me. +1 here

Regarding the Scope, it is meant for calculating fractions from the
weights. The idea is that the algorithm looks into the scopes and
calculates fractions without understanding the individual use cases.

I guess I should not have put the Scope in the code block in Declare Use
Cases. This is more an internal implementation detail rather than public
interfaces. Sorry for the confusion. I copied the declaration
of MemoryUseCase from some local trying-out codes.

Thank you~

Xintong Song



On Wed, Sep 9, 2020 at 6:15 PM Stephan Ewen  wrote:

> Hi!
>
> I read through the FLIP and looks good to me. One suggestion and one
> question:
>
> Regarding naming, we could call the ROCKSDB/BATCH_OP category DATAPROC
> because this is the memory that goes into holding (and structuring) the
> data.
>
> I am a bit confused about the Scope enum (with values Slot and Op). Do we
> need to store this in the configuration or can we drop this?
> From my understanding, this is transparent already:
>   - When anyone goes to the MemoryManager, they ask for a fraction of the
> Slot's budget.
>   - RocksDB (which is per slot) goes directly to the MemoryManager
>   - Python process (per slot) goes directly to the MemoryManager
>   - Batch algorithms apply their local operator weight before going to the
> MemoryManager, so by the time the allocate memory, it is already the right
> fraction per-slot.
>
> Best,
> Stephan
>
>
> On Fri, Sep 4, 2020 at 3:46 AM Xintong Song  wrote:
>
>> Thanks Till, `taskmanager.memory.managed.consumer-weights` sounds good to
>> me.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Sep 3, 2020 at 8:44 PM Till Rohrmann 
>> wrote:
>>
>> > Thanks for updating the FLIP Xintong. It looks good to me. One minor
>> > comment is that we could name the configuration parameter
>> > also taskmanager.memory.managed.consumer-weights which might be a bit
>> more
>> > expressive what this option does.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Thu, Sep 3, 2020 at 12:44 PM Xintong Song 
>> > wrote:
>> >
>> > > Thanks all for the feedback.
>> > >
>> > > FYI, I've opened a voting thread[1] on this.
>> > >
>> > > Thank you~
>> > >
>> > > Xintong Song
>> > >
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-141-Intra-Slot-Managed-Memory-Sharing-td44358.html
>> > >
>> > >
>> > > On Thu, Sep 3, 2020 at 2:54 PM Zhu Zhu  wrote:
>> > >
>> > > > Thanks for proposing this improvement! @Xintong
>> > > > The proposal looks good to me. Agreed that we should make it as
>> simple
>> > as
>> > > > possible for users to understand.
>> > > >
>> > > > Thanks,
>> > > > Zhu
>> > > >
>> > > > Dian Fu  于2020年9月3日周四 下午2:11写道:
>> > > >
>> > > > > Thanks for driving this FLIP, Xintong! +1 to the updated version.
>> > > > >
>> > > > > > 在 2020年9月2日,下午6:09,Xintong Song  写道:
>> > > > > >
>> > > > > > Thanks for the input, Yu.
>> > > > > >
>> > > > > > I believe the current proposal should work with RocksDB, or any
>> > other
>> > > > > state
>> > > > > > backend, using memory at either the slot or the scope. With the
>> > > > proposed
>> > > > > > approach, all we need is an indicator (e.g., a configuration
>> > option)
>> > > > > > telling us which scope should we calculate the fractions for.
>> > > > > >
>> > > > > > Thank you~
>> > > > > >
>> > > > > > Xintong Song
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Sep 2, 2020 at 4:53 PM Yu Li  wrote:
>> > > > > >
>> > > > > >> Thanks for compiling the FLIP Xintong, and +1 for the updated
>> doc.
>> > > > > >>
>> > > > > >> Just one supplement for the RocksDB state backend part:
>> > > > > >>
>> > > > > >> It's true that currently we're using managed memory at the slot
>> > > scope.
>> > > > > >> However, IMHO, we may support setting weights for different
>> > stateful
>> > > > > >> operators (for advanced usage) in future. For example, users
>> may
>> > > > choose
>> > > > > to
>> > > > > >> set higher weights for join operator over aggregation
>> operator, to
>> > > > give
>> > > > > >> more memory to those with bigger states. In this case, we may
>> also
>> > > use
>> > > > > >> managed memory at the operator scope for state backends. And
>> if I
>> > > > > >> understand correctly, the current design could cover this case
>> > well.
>> > > > > >>
>> > > > > >> Best Regards,
>> > > > > >> Yu
>> > > > > >>
>> > > > > >>
>> > > > > >> On Wed, 2 Sep 2020 at 15:39, Xintong Song <
>> tonysong...@gmail.com>
>> > > > > wrote:
>> > > > > >>
>> > > > > >>> Thanks all for the feedback and discussion.
>> > > > > >>>
>> > > > > >>> I have updated the FLIP, with the following changes.
>> > > > > >>>
>> > > > > >>>   - Choose the main proposal over the alternative approach
>> > > > > >>>   - Combine weights of RocksDB and batch operators
>> > > > > >>>   - Expose weights through configuration options, rather than
>> via
>> > > > > >>>   ExecutionConfig.
>> > > 

[CANCEL][VOTE] FLIP-134: DataStream Semantics for Bounded Input

2020-09-09 Thread Aljoscha Krettek
I'm hereby cancelling this vote. There was more discussion on the 
[DISCUSS] thread for FLIP-134.


Aljoscha

On 24.08.20 11:33, Kostas Kloudas wrote:

Hi all,

After the discussion in [1], I would like to open a voting thread for
FLIP-134 [2] which discusses the semantics that the DataStream API
will expose when applied on a bounded input.

The vote will be open until 27th August (72h), unless there is an
objection or not enough votes.

Cheers,
Kostas

[1] 
https://lists.apache.org/thread.html/reb368f095ec13638b95cd5d885a0aa8e69af06d6e982a5f045f50022%40%3Cdev.flink.apache.org%3E
[2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522





Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-09-09 Thread Aljoscha Krettek
I updated the FLIP, you can check out the changes here: 
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158871522&selectedPageVersions=16&selectedPageVersions=15


There is still the open question of what IGNORE means for 
getProcessingTime().


Plus, I introduced a setting for ignoring Triggers because I think it 
otherwise doesn't work well with FAILing hard on processing-time API 
calls. I described it in the FLIP, so please have a look at the diff I 
linked above.


Aljoscha


On 08.09.20 11:35, Dawid Wysakowicz wrote:

The only one where I could see that users want different behaviour
BATCH jobs on the DataStream API. I agree that processing-time does
not make much sense in batch jobs. However, if users have written some
business logic using processing-time timers their jobs will silently
not work if we set the default to IGNORE. Setting it to FAIL would at
least make users aware that something is not right.

I see your point. I was also undecided myself which option to use here.
I went with IGNORE for the reason that I thought the common/the most
prominent functions should work just out of the box without much
additional tweaking. I found the case of "running the same program in
BATCH and STREAM" one of such cases and therefore optimized the options
for that case. That's why went with IGNORE instead of FAIL. Again I am
good with either of the two.


I can also see a small group of users wanting processing-time timers
for BATCH. We could, for example, fire all processing-time timers at
the "end of input", then we also set the watermark to +Inf.

I agree. I think this case would be covered with ENABLE + TRIGGER. I do
agree though it makes sense to mention this case explicitly as the
ENABLE option would behave slightly different in BATCH than in STREAM.
Maybe not strictly speaking different, but would be worth explaining
anyway. The way I heard from some people you can think of BATCH
processing happening instantaneously in processing time. Therefore there
can be no timers triggered in between records. In BATCH processing the
only time when timers can be triggered is at the end of input. Or at
least that is how I see it.


Another thing is: what should we do with new triggers that are set
after the end-of-input. If we have TRIGGER and users keep setting new
processing-time timers in the callback, would we continue firing them.
Or should the behaviour bee QUIESCE_AND_TRIGGER, where we work off
remaining timers but don't add new ones? Do we silently ignore adding
new ones?

My take on this issue is that it should be good enough to have the
QUIESCE_AND_TRIGGER behaviour with ignoring timers registered after the
end of input. We can not fail hard in such scenario, unless we expose a
flag saying the timer is after the end of input. Otherwise I can not see
a way to correctly safe guard for this scenario. I can see some use
cases that would benefit from allowing the timers registration, e.g.
periodically checking if some external process finished. In my opinion
this is a bit of a different topic, as it is actually an issue of
inverting the control when an operator can finish. Right now it is the
task that decides that the job/operator finishes at the end of input.


By the way, I assume WAIT means we wait for processing-time to
actually reach the time of pending timers? Or did you have something
else in mind with this?

Yes, that's what I meant. I actually took the options from this
issue[1], where there is some discussion on that topic as well.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-18647

On 08/09/2020 10:57, Aljoscha Krettek wrote:

I agree with almost all of your points!

The only one where I could see that users want different behaviour
BATCH jobs on the DataStream API. I agree that processing-time does
not make much sense in batch jobs. However, if users have written some
business logic using processing-time timers their jobs will silently
not work if we set the default to IGNORE. Setting it to FAIL would at
least make users aware that something is not right.

I can also see a small group of users wanting processing-time timers
for BATCH. We could, for example, fire all processing-time timers at
the "end of input", then we also set the watermark to +Inf.

Another thing is: what should we do with new triggers that are set
after the end-of-input. If we have TRIGGER and users keep setting new
processing-time timers in the callback, would we continue firing them.
Or should the behaviour bee QUIESCE_AND_TRIGGER, where we work off
remaining timers but don't add new ones? Do we silently ignore adding
new ones?

By the way, I assume WAIT means we wait for processing-time to
actually reach the time of pending timers? Or did you have something
else in mind with this?

Aljoscha

On 08.09.20 09:19, Dawid Wysakowicz wrote:

Hey Aljoscha

A couple of thoughts for the two remaining TODOs in the doc:

# Processing Time Support in BATCH/BOUNDED execution mode

I think th

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Dawid Wysakowicz
Hi,

Sorry for joining so late. First of all, I don't want to distract the
discussion, but I thought maybe my opinion could help a bit, but maybe
it won't ;)

The first observation I got is that I think everyone agrees we need a
way distinguish the read-only from r/w columns. Is that correct?

Secondly if I understand the discussion correctly there are three
competing approaches:

Option 1)

If a metadata column is r/w use the WITH section for declaring such field

If a metadata column is r use computed column e.g.: offset AS
CAST(SYSTEM_METADATA("offset") AS long)

Option 2)

Use the computed column syntax, but add a keyword for marking a column
writable e.g.:

r-only: offset AS CAST(SYSTEM_METADATA("offset") AS long)

r/w: timestamp AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3))
PERSISTED/WRITABLE/STORED

Option 3)

Use a new syntax, not to confuse it with computed columns.

r-only: offset USING/FROM/(blank) SYSTEM_METADATA("offset")

r/w-only: timestamp USING/FROM/(blank) SYSTEM_METADATA("timestamp")
PERSISTED/WRITABLE/STORED

My personal preference is in that order 1>2>3. Let me explain why I
think that.

Ad. 1

I sort of agree with @Jark and @Danny that if a field is readable and
writable than it is actually a *real* data. Moreover I think in Kafka it
is quite common to include a field in all different parts of the record.
Take this code snippet from ksqlDB for example[1].

I understand @Timo's argument that it would not be too generic if we had
more writable columns. But at least the way I see it, so far we have
only a single r/w metadata field: timestamp. I am not sure if we should
make the Kafka's headers writable. As per the motivation in the FLIP it
introduced them, they are mostly for system meta-information, which does
not necessarily contain business logic[2]. There are no more metadata
columns marked as writable in the FLIP, as far as I can tell.

The additional benefit is that the concept of computed columns is
intact. They are only ever computed and you can not store into the columns.

Ad. 2

The option two is more flexible than option 3, because it allows for
computed expressions. In some sense this is also its disadvantage
because computed expressions can not be used for r/w columns. Therefore
we are loosing the flexibility for STORED/PERSISTED/WRITABLE columns.

Ad. 3

The argument that reusing computed columns can be misleading does not
really appeal to me. I think any new syntax that a user needs to learn
is equally misleading. The only benefit I see is that it makes the
situation more symmetric, as you cannot have computed expressions for
both r-only and r/w columns, which at the same time is a disadvantage of
that proposal.

As for the issue of shortening the SYSTEM_METADATA to METADATA. Here I
very much prefer the SYSTEM_ prefix. In my opinion in this case the
clarity is more important than brevity. Moreover personally I never
found a couple of letters that are usually copy-pasted, or
auto-completed a real problem. This might be though my personal preference.

Hope I will not distract the discussion too much.

Best,

Dawid

[1]
https://docs.ksqldb.io/en/latest/developer-guide/create-a-stream/#create-a-stream-with-timestamps

[2]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers#KIP82AddRecordHeaders-Motivation

On 09/09/2020 12:40, Timo Walther wrote:
> Hi everyone,
>
> "key" and "value" in the properties are a special case because they
> need to configure a format. So key and value are more than just
> metadata. Jark's example for setting a timestamp would work but as the
> FLIP discusses, we have way more metadata fields like headers,
> epoch-leader, etc. Having a property for all of this metadata would
> mess up the WITH section entirely. Furthermore, we also want to deal
> with metadata from the formats. Solving this through properties as
> well would further complicate the property design.
>
> Personally, I still like the computed column design more because it
> allows to have full flexibility to compute the final column:
>
> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))
>
> Instead of having a helper column and a real column in the table:
>
> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
> realTimestamp AS adjustTimestamp(helperTimestamp)
>
> But I see that the discussion leans towards:
>
> timestamp INT SYSTEM_METADATA("ts")
>
> Which is fine with me. It is the shortest solution, because we don't
> need additional CAST. We can discuss the syntax, so that confusion
> with computed columns can be avoided.
>
> timestamp INT USING SYSTEM_METADATA("ts")
> timestamp INT FROM SYSTEM_METADATA("ts")
> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
>
> We use `SYSTEM_TIME` for temporal tables. I think prefixing with
> SYSTEM makes it clearer that it comes magically from the system.
>
> What do you think?
>
> Regards,
> Timo
>
>
>
> On 09.09.20 11:41, Jark Wu wrote:
>> Hi Danny,
>>
>> This is not Oracle 

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Danny Chan
“Personally, I still like the computed column design more because it
allows to have full flexibility to compute the final column”

I have the same feeling, the non-standard syntax "timestamp INT
SYSTEM_METADATA("ts")" is neither a computed column nor normal column. It
looks very likely a computed column but it's not (no AS keyword there), we
should be cautious for such syntax because we use a function as a column
constraint. No SQL vendor has such a syntax.

Can we just use the SQL keyword as a constraint to mark the column as
metadata ?

timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]

Note that the "FROM 'field name'" is only needed when the name conflicts
with the declared table column name, when there are no conflicts, we can
simplify it to:

timestamp INT METADATA

By default, the field is non-virtual and can be read and written, users
need to mark the column as virtual when it is only readable.

Timo Walther  于2020年9月9日周三 下午6:41写道:

> Hi everyone,
>
> "key" and "value" in the properties are a special case because they need
> to configure a format. So key and value are more than just metadata.
> Jark's example for setting a timestamp would work but as the FLIP
> discusses, we have way more metadata fields like headers, epoch-leader,
> etc. Having a property for all of this metadata would mess up the WITH
> section entirely. Furthermore, we also want to deal with metadata from
> the formats. Solving this through properties as well would further
> complicate the property design.
>
> Personally, I still like the computed column design more because it
> allows to have full flexibility to compute the final column:
>
> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))
>
> Instead of having a helper column and a real column in the table:
>
> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
> realTimestamp AS adjustTimestamp(helperTimestamp)
>
> But I see that the discussion leans towards:
>
> timestamp INT SYSTEM_METADATA("ts")
>
> Which is fine with me. It is the shortest solution, because we don't
> need additional CAST. We can discuss the syntax, so that confusion with
> computed columns can be avoided.
>
> timestamp INT USING SYSTEM_METADATA("ts")
> timestamp INT FROM SYSTEM_METADATA("ts")
> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
>
> We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM
> makes it clearer that it comes magically from the system.
>
> What do you think?
>
> Regards,
> Timo
>
>
>
> On 09.09.20 11:41, Jark Wu wrote:
> > Hi Danny,
> >
> > This is not Oracle and MySQL computed column syntax, because there is no
> > "AS" after the type.
> >
> > Hi everyone,
> >
> > If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we
> > must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink
> > schema problem.
> > Personally, I think we can use a shorter keyword "METADATA" for
> > "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system
> function
> > and confuse users this looks like a computed column.
> >
> >
> > Best,
> > Jark
> >
> >
> >
> > On Wed, 9 Sep 2020 at 17:23, Danny Chan  wrote:
> >
> >> "offset INT SYSTEM_METADATA("offset")"
> >>
> >> This is actually Oracle or MySQL style computed column syntax.
> >>
> >> "You are right that one could argue that "timestamp", "headers" are
> >> something like "key" and "value""
> >>
> >> I have the same feeling, both key value and headers timestamp are *real*
> >> data
> >> stored in the consumed record, they are not computed or generated.
> >>
> >> "Trying to solve everything via properties sounds rather like a hack to
> >> me"
> >>
> >> Things are not that hack if we can unify the routines or the definitions
> >> (all from the computed column way or all from the table options), i also
> >> think that it is a hacky that we mix in 2 kinds of syntax for different
> >> kinds of metadata (read-only and read-write). In this FLIP, we declare
> the
> >> Kafka key fields with table options but SYSTEM_METADATA for other
> metadata,
> >> that is a hacky thing or something in-consistent.
> >>
> >> Kurt Young  于2020年9月9日周三 下午4:48写道:
> >>
> >>>   I would vote for `offset INT SYSTEM_METADATA("offset")`.
> >>>
> >>> I don't think we can stick with the SQL standard in DDL part forever,
> >>> especially as there are more and more
> >>> requirements coming from different connectors and external systems.
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther 
> wrote:
> >>>
>  Hi Jark,
> 
>  now we are back at the original design proposed by Dawid :D Yes, we
>  should be cautious about adding new syntax. But the length of this
>  discussion shows that we are looking for a good long-term solution. In
>  this case I would rather vote for a deep integration into the syntax.
> 
>  Computed columns are also not SQL standard compliant. And our DDL is
>  neither, so we have some degree of

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

2020-09-09 Thread Danny Chan
Thanks, i'm fine with that.


Timo Walther  于2020年9月9日周三 下午7:02写道:

> I agree with Jark. It reduces confusion.
>
> The DataStream API doesn't know changelog processing at all. A
> DataStream of Row can be used with both `fromDataStream` and
> `fromChangelogStream`. But only the latter API will interpret it as a
> changelog something.
>
> And as I mentioned before, the `toChangelogStream` must work with Row
> otherwise users are confused due to duplicate records with a missing
> changeflag.
>
> I will update the FLIP-136 a last time. I hope we can then continue to a
> vote.
>
> Regards,
> Timo
>
>
> On 09.09.20 10:50, Danny Chan wrote:
> > I think it would bring in much confusion by a different API name just
> because the DataStream generic type is different.
> > If there are ChangelogMode that only works for Row, can we have a type
> check there ?
> >
> > Switch to a new API name does not really solve the problem well, people
> still need to declare the ChangelogMode explicitly, and there are some
> confusions:
> >
> > • Should DataStream of Row type always use #fromChangelogStream ?
> > • Does fromChangelogStream works for only INSERT ChangelogMode ?
> >
> >
> > Best,
> > Danny Chan
> > 在 2020年9月9日 +0800 PM4:21,Timo Walther ,写道:
> >> I had this in the inital design, but Jark had concerns at least for the
> >> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>
> >> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
> >>
> >> But in this case I would vote for a symmetric API. If we keep
> >> toChangelogStream we should also have a fromChangelogStream.
> >>
> >> And if we unify `toChangelogStream` and `toDataStream`, retractions
> >> cannot be represented for non-Rows and users will experience duplicate
> >> records with a missing changeflag.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 09.09.20 09:31, Danny Chan wrote:
> >>> “But I think the planner needs to
> >>> know whether the input is insert-only or not.”
> >>>
> >>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>
> >>> solve your concerns ?  People can pass around whatever ChangelogMode
> they like as an optional param.
> >>> By default: fromDataStream(dataStream, schema), the ChangelogMode is
> INSERT.
> >>>
> >>> Best,
> >>> Danny Chan
> >>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> 
>  But I think the planner needs to
>  know whether the input is insert-only or not.
> >>>
> >>
> >
>
>


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther

+1 for:

timestamp INT METADATA [FROM 'my-timestamp-field']

However, I would inverse the default. Because reading is more common 
than writing.


Regards,
Timo


On 09.09.20 14:25, Danny Chan wrote:

“Personally, I still like the computed column design more because it
allows to have full flexibility to compute the final column”

I have the same feeling, the non-standard syntax "timestamp INT
SYSTEM_METADATA("ts")" is neither a computed column nor normal column. It
looks very likely a computed column but it's not (no AS keyword there), we
should be cautious for such syntax because we use a function as a column
constraint. No SQL vendor has such a syntax.

Can we just use the SQL keyword as a constraint to mark the column as
metadata ?

timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]

Note that the "FROM 'field name'" is only needed when the name conflicts
with the declared table column name, when there are no conflicts, we can
simplify it to:

timestamp INT METADATA

By default, the field is non-virtual and can be read and written, users
need to mark the column as virtual when it is only readable.

Timo Walther  于2020年9月9日周三 下午6:41写道:


Hi everyone,

"key" and "value" in the properties are a special case because they need
to configure a format. So key and value are more than just metadata.
Jark's example for setting a timestamp would work but as the FLIP
discusses, we have way more metadata fields like headers, epoch-leader,
etc. Having a property for all of this metadata would mess up the WITH
section entirely. Furthermore, we also want to deal with metadata from
the formats. Solving this through properties as well would further
complicate the property design.

Personally, I still like the computed column design more because it
allows to have full flexibility to compute the final column:

timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))

Instead of having a helper column and a real column in the table:

helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
realTimestamp AS adjustTimestamp(helperTimestamp)

But I see that the discussion leans towards:

timestamp INT SYSTEM_METADATA("ts")

Which is fine with me. It is the shortest solution, because we don't
need additional CAST. We can discuss the syntax, so that confusion with
computed columns can be avoided.

timestamp INT USING SYSTEM_METADATA("ts")
timestamp INT FROM SYSTEM_METADATA("ts")
timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED

We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM
makes it clearer that it comes magically from the system.

What do you think?

Regards,
Timo



On 09.09.20 11:41, Jark Wu wrote:

Hi Danny,

This is not Oracle and MySQL computed column syntax, because there is no
"AS" after the type.

Hi everyone,

If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we
must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink
schema problem.
Personally, I think we can use a shorter keyword "METADATA" for
"SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system

function

and confuse users this looks like a computed column.


Best,
Jark



On Wed, 9 Sep 2020 at 17:23, Danny Chan  wrote:


"offset INT SYSTEM_METADATA("offset")"

This is actually Oracle or MySQL style computed column syntax.

"You are right that one could argue that "timestamp", "headers" are
something like "key" and "value""

I have the same feeling, both key value and headers timestamp are *real*
data
stored in the consumed record, they are not computed or generated.

"Trying to solve everything via properties sounds rather like a hack to
me"

Things are not that hack if we can unify the routines or the definitions
(all from the computed column way or all from the table options), i also
think that it is a hacky that we mix in 2 kinds of syntax for different
kinds of metadata (read-only and read-write). In this FLIP, we declare

the

Kafka key fields with table options but SYSTEM_METADATA for other

metadata,

that is a hacky thing or something in-consistent.

Kurt Young  于2020年9月9日周三 下午4:48写道:


   I would vote for `offset INT SYSTEM_METADATA("offset")`.

I don't think we can stick with the SQL standard in DDL part forever,
especially as there are more and more
requirements coming from different connectors and external systems.

Best,
Kurt


On Wed, Sep 9, 2020 at 4:40 PM Timo Walther 

wrote:



Hi Jark,

now we are back at the original design proposed by Dawid :D Yes, we
should be cautious about adding new syntax. But the length of this
discussion shows that we are looking for a good long-term solution. In
this case I would rather vote for a deep integration into the syntax.

Computed columns are also not SQL standard compliant. And our DDL is
neither, so we have some degree of freedom here.

Trying to solve everything via properties sounds rather like a hack to
me. You are right that one could argue that "timestamp", "headers" are
something like "key" a

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Leonard Xu
Thanks @Dawid for the nice summary, I think you catch all opinions of the long 
discussion well.  

@Danny
“ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] 
 Note that the "FROM 'field name'" is only needed when the name conflict 
 with the declared table column name, when there are no conflicts, we can 
simplify it to
  timestamp INT METADATA"

I really like the proposal, there is no confusion with computed column any 
more,  and it’s concise enough.


@Timo @Dawid
“We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM makes 
it clearer that it comes magically from the system.”
“As for the issue of shortening the SYSTEM_METADATA to METADATA. Here I
very much prefer the SYSTEM_ prefix.”

I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot, 
First of all,  the word `TIME` has broad meanings but the word `METADATA ` not, 
 `METADATA ` has specific meaning,
Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but `SYSTEM_METADATA ` 
not.
Personally, I like more simplify way,sometimes  less is more. 


Best,
Leonard



> 
> Timo Walther  于2020年9月9日周三 下午6:41写道:
> 
>> Hi everyone,
>> 
>> "key" and "value" in the properties are a special case because they need
>> to configure a format. So key and value are more than just metadata.
>> Jark's example for setting a timestamp would work but as the FLIP
>> discusses, we have way more metadata fields like headers, epoch-leader,
>> etc. Having a property for all of this metadata would mess up the WITH
>> section entirely. Furthermore, we also want to deal with metadata from
>> the formats. Solving this through properties as well would further
>> complicate the property design.
>> 
>> Personally, I still like the computed column design more because it
>> allows to have full flexibility to compute the final column:
>> 
>> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))
>> 
>> Instead of having a helper column and a real column in the table:
>> 
>> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
>> realTimestamp AS adjustTimestamp(helperTimestamp)
>> 
>> But I see that the discussion leans towards:
>> 
>> timestamp INT SYSTEM_METADATA("ts")
>> 
>> Which is fine with me. It is the shortest solution, because we don't
>> need additional CAST. We can discuss the syntax, so that confusion with
>> computed columns can be avoided.
>> 
>> timestamp INT USING SYSTEM_METADATA("ts")
>> timestamp INT FROM SYSTEM_METADATA("ts")
>> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
>> 
>> We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM
>> makes it clearer that it comes magically from the system.
>> 
>> What do you think?
>> 
>> Regards,
>> Timo
>> 
>> 
>> 
>> On 09.09.20 11:41, Jark Wu wrote:
>>> Hi Danny,
>>> 
>>> This is not Oracle and MySQL computed column syntax, because there is no
>>> "AS" after the type.
>>> 
>>> Hi everyone,
>>> 
>>> If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we
>>> must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink
>>> schema problem.
>>> Personally, I think we can use a shorter keyword "METADATA" for
>>> "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system
>> function
>>> and confuse users this looks like a computed column.
>>> 
>>> 
>>> Best,
>>> Jark
>>> 
>>> 
>>> 
>>> On Wed, 9 Sep 2020 at 17:23, Danny Chan  wrote:
>>> 
 "offset INT SYSTEM_METADATA("offset")"
 
 This is actually Oracle or MySQL style computed column syntax.
 
 "You are right that one could argue that "timestamp", "headers" are
 something like "key" and "value""
 
 I have the same feeling, both key value and headers timestamp are *real*
 data
 stored in the consumed record, they are not computed or generated.
 
 "Trying to solve everything via properties sounds rather like a hack to
 me"
 
 Things are not that hack if we can unify the routines or the definitions
 (all from the computed column way or all from the table options), i also
 think that it is a hacky that we mix in 2 kinds of syntax for different
 kinds of metadata (read-only and read-write). In this FLIP, we declare
>> the
 Kafka key fields with table options but SYSTEM_METADATA for other
>> metadata,
 that is a hacky thing or something in-consistent.
 
 Kurt Young  于2020年9月9日周三 下午4:48写道:
 
>  I would vote for `offset INT SYSTEM_METADATA("offset")`.
> 
> I don't think we can stick with the SQL standard in DDL part forever,
> especially as there are more and more
> requirements coming from different connectors and external systems.
> 
> Best,
> Kurt
> 
> 
> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther 
>> wrote:
> 
>> Hi Jark,
>> 
>> now we are back at the original design proposed by Dawid :D Yes, we
>> should be cautious about adding new syntax. But the length of this
>> discussion shows that we 

Re: [DISCUSS] FLIP-142: Disentangle StateBackends from Checkpointing

2020-09-09 Thread Yun Tang
Hi Seth

Thanks for bringing this discussion, and I really like this refactor to give 
more cleaner concepts!

When we talk about the relationship between state, state backends, and 
snapshots. The 'CheckpointStorage'
only focus on how to persist the checkpointed state (to JM or to DFS), there 
still exist some concepts related to the
implementation of state backend, e.g. the configuration of 
'state.backend.incremental' and 'state.backend.async'.

What do you think of these configurations? They're still related with 
checkpointing but limits to the feature of state backend.

Moreover, when talk about separating state backend from checkpointing, I also 
want to give another two cents here:
state backend which holds the state is must-to-have when we use state in 
streaming job, however, checkpointing is not a must-to-have
if we do not enable the checkpointing. And 'ExecutionGraph' could live without 
checkpointCoordinator on JM side while
'StreamTask' always initialize the 'checkpointStorage' on task side. I think JM 
knows the inner relationship between state backend and checkpoint
while TM seems mix them together.

Best
Yun Tang

From: Konstantin Knauf 
Sent: Wednesday, September 9, 2020 16:05
To: dev 
Subject: Re: [DISCUSS] FLIP-142: Disentangle StateBackends from Checkpointing

Thanks for the initiative. Big +1. Would be interested to hear if the
proposed interfaces still make sense in the face of the new fault-tolerance
work that is planned. Stephan/Piotr will know.

On Tue, Sep 8, 2020 at 7:05 PM Seth Wiesman  wrote:

> Hi Devs,
>
> I'd like to propose an update to how state backends and checkpoint storage
> are configured to help users better understand Flink.
>
> Apache Flink's durability story is a mystery to many users. One of the most
> common recurring questions from users comes from not understanding the
> relationship between state, state backends, and snapshots. Some of this
> confusion can be abated with learning material but the question is so
> pervasive that we believe Flink’s user APIs should be better communicate
> what different components are responsible for.
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-142%3A+Disentangle+StateBackends+from+Checkpointing
>
>
> I look forward to a healthy discussion.
>
>
> Seth
>


--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Jark Wu
I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM
'my-timestamp-field'] [VIRTUAL]
Especially I like the shortcut: timestamp INT METADATA, this makes the most
common case to be supported in the simplest way.

I also think the default should be "PERSISTED", so VIRTUAL is optional when
you are accessing a read-only metadata. Because:
1. The "timestamp INT METADATA" should be a normal column, because
"METADATA" is just a modifier to indicate it is from metadata, a normal
column should be persisted.
If virtual by default, when a user types "timestamp int" ==> persisted
column, then adds a "metadata" after that ==> virtual column, then adds a
"persisted" after that ==> persisted column.
I think this looks reversed several times and makes users confused.
Physical fields are also prefixed with "fieldName TYPE", so "timestamp INT
METADATA" is persisted is very straightforward.
2. From the collected user question [1], we can see that "timestamp" is the
most common use case. "timestamp" is a read-write metadata. Persisted by
default doesn't break the reading behavior.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15869

On Wed, 9 Sep 2020 at 20:56, Leonard Xu  wrote:

> Thanks @Dawid for the nice summary, I think you catch all opinions of the
> long discussion well.
>
> @Danny
> “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]
>  Note that the "FROM 'field name'" is only needed when the name conflict
>  with the declared table column name, when there are no conflicts, we can
> simplify it to
>   timestamp INT METADATA"
>
> I really like the proposal, there is no confusion with computed column any
> more,  and it’s concise enough.
>
>
> @Timo @Dawid
> “We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM
> makes it clearer that it comes magically from the system.”
> “As for the issue of shortening the SYSTEM_METADATA to METADATA. Here I
> very much prefer the SYSTEM_ prefix.”
>
> I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot,
> First of all,  the word `TIME` has broad meanings but the word `METADATA `
> not,  `METADATA ` has specific meaning,
> Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but
> `SYSTEM_METADATA ` not.
> Personally, I like more simplify way,sometimes  less is more.
>
>
> Best,
> Leonard
>
>
>
> >
> > Timo Walther  于2020年9月9日周三 下午6:41写道:
> >
> >> Hi everyone,
> >>
> >> "key" and "value" in the properties are a special case because they need
> >> to configure a format. So key and value are more than just metadata.
> >> Jark's example for setting a timestamp would work but as the FLIP
> >> discusses, we have way more metadata fields like headers, epoch-leader,
> >> etc. Having a property for all of this metadata would mess up the WITH
> >> section entirely. Furthermore, we also want to deal with metadata from
> >> the formats. Solving this through properties as well would further
> >> complicate the property design.
> >>
> >> Personally, I still like the computed column design more because it
> >> allows to have full flexibility to compute the final column:
> >>
> >> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS
> TIMESTAMP(3)))
> >>
> >> Instead of having a helper column and a real column in the table:
> >>
> >> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
> >> realTimestamp AS adjustTimestamp(helperTimestamp)
> >>
> >> But I see that the discussion leans towards:
> >>
> >> timestamp INT SYSTEM_METADATA("ts")
> >>
> >> Which is fine with me. It is the shortest solution, because we don't
> >> need additional CAST. We can discuss the syntax, so that confusion with
> >> computed columns can be avoided.
> >>
> >> timestamp INT USING SYSTEM_METADATA("ts")
> >> timestamp INT FROM SYSTEM_METADATA("ts")
> >> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
> >>
> >> We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM
> >> makes it clearer that it comes magically from the system.
> >>
> >> What do you think?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >>
> >> On 09.09.20 11:41, Jark Wu wrote:
> >>> Hi Danny,
> >>>
> >>> This is not Oracle and MySQL computed column syntax, because there is
> no
> >>> "AS" after the type.
> >>>
> >>> Hi everyone,
> >>>
> >>> If we want to use "offset INT SYSTEM_METADATA("offset")", then I think
> we
> >>> must further discuss about "PERSISED" or "VIRTUAL" keyword for
> query-sink
> >>> schema problem.
> >>> Personally, I think we can use a shorter keyword "METADATA" for
> >>> "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system
> >> function
> >>> and confuse users this looks like a computed column.
> >>>
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>>
> >>>
> >>> On Wed, 9 Sep 2020 at 17:23, Danny Chan  wrote:
> >>>
>  "offset INT SYSTEM_METADATA("offset")"
> 
>  This is actually Oracle or MySQL style computed column syntax.
> 
>  "You are right that one could argue that "timestamp", "headers" are
> 

Re: [DISCUSS] FLIP-142: Disentangle StateBackends from Checkpointing

2020-09-09 Thread Aljoscha Krettek

I like it a lot!

I think it makes sense to clean this up despite the planned new 
fault-tolerance mechanisms. In the future, users will decide which 
mechanism to use and I can imagine that a lot of them will keep using 
the current mechanism for quite a while to come. But I'm happy to yield 
to Stephan's opinion here, he knows more about the progress of that work.


The one nitpick I have is about naming: will users understand 
OnHeapStateBackend? I mean, do they know what on-heap/off-heap memory is 
and the tradeoffs? An alternative could be HashMapStateBackend, because 
that's essentially what it is. I wouldn't block anything on this, though.


Aljoscha

On 09.09.20 10:05, Konstantin Knauf wrote:

Thanks for the initiative. Big +1. Would be interested to hear if the
proposed interfaces still make sense in the face of the new fault-tolerance
work that is planned. Stephan/Piotr will know.

On Tue, Sep 8, 2020 at 7:05 PM Seth Wiesman  wrote:


Hi Devs,

I'd like to propose an update to how state backends and checkpoint storage
are configured to help users better understand Flink.

Apache Flink's durability story is a mystery to many users. One of the most
common recurring questions from users comes from not understanding the
relationship between state, state backends, and snapshots. Some of this
confusion can be abated with learning material but the question is so
pervasive that we believe Flink’s user APIs should be better communicate
what different components are responsible for.



https://cwiki.apache.org/confluence/display/FLINK/FLIP-142%3A+Disentangle+StateBackends+from+Checkpointing


I look forward to a healthy discussion.


Seth








Re: [DISCUSS] FLIP-142: Disentangle StateBackends from Checkpointing

2020-09-09 Thread Seth Wiesman
@Yun yes, this is really about making CheckpointStorage an orthogonal
concept. I think we can remain pragmatic and keep state-backend specific
configurations (async, incremental, etc) in the state backend themselves. I
view these as more advanced configurations and by the time someone is
changing the defaults they likely understand what is going on. My goal is
to help on-board users and so long as each state backend has a no-arg
default constructor that works for many users I think we've achieved that
goal.

Regarding the checkpoint coordinator, that makes sense but I will consider
out of the scope of this FLIP. I want to focus on simplifying APIs.

@Aljoscha Krettek 

My feeling is that state backends and checkpointing are going to be
integral to Flink for many years, regardless or other enhancements so this
change is still valuable.

Since this is a FLIP about improving the user api I'm happy to bikeshed the
names a little more than normal. HashMap makes sense, my other thought was
InMemory.

Seth



On Wed, Sep 9, 2020 at 8:04 AM Aljoscha Krettek  wrote:

> I like it a lot!
>
> I think it makes sense to clean this up despite the planned new
> fault-tolerance mechanisms. In the future, users will decide which
> mechanism to use and I can imagine that a lot of them will keep using
> the current mechanism for quite a while to come. But I'm happy to yield
> to Stephan's opinion here, he knows more about the progress of that work.
>
> The one nitpick I have is about naming: will users understand
> OnHeapStateBackend? I mean, do they know what on-heap/off-heap memory is
> and the tradeoffs? An alternative could be HashMapStateBackend, because
> that's essentially what it is. I wouldn't block anything on this, though.
>
> Aljoscha
>
> On 09.09.20 10:05, Konstantin Knauf wrote:
> > Thanks for the initiative. Big +1. Would be interested to hear if the
> > proposed interfaces still make sense in the face of the new
> fault-tolerance
> > work that is planned. Stephan/Piotr will know.
> >
> > On Tue, Sep 8, 2020 at 7:05 PM Seth Wiesman  wrote:
> >
> >> Hi Devs,
> >>
> >> I'd like to propose an update to how state backends and checkpoint
> storage
> >> are configured to help users better understand Flink.
> >>
> >> Apache Flink's durability story is a mystery to many users. One of the
> most
> >> common recurring questions from users comes from not understanding the
> >> relationship between state, state backends, and snapshots. Some of this
> >> confusion can be abated with learning material but the question is so
> >> pervasive that we believe Flink’s user APIs should be better communicate
> >> what different components are responsible for.
> >>
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-142%3A+Disentangle+StateBackends+from+Checkpointing
> >>
> >>
> >> I look forward to a healthy discussion.
> >>
> >>
> >> Seth
> >>
> >
> >
>
>


[DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-09 Thread Aljoscha Krettek

Hi Devs,

@Users: I'm cc'ing the user ML to see if there are any users that are 
relying on this feature. Please comment here if that is the case.


I'd like to discuss the deprecation and eventual removal of UnionList 
Operator State, aka Operator State with Union Redistribution. If you 
don't know what I'm talking about you can take a look in the 
documentation: [1]. It's not documented thoroughly because it started 
out as mostly an internal feature.


The immediate main reason for removing this is also mentioned in the 
documentation: "Do not use this feature if your list may have high 
cardinality. Checkpoint metadata will store an offset to each list 
entry, which could lead to RPC framesize or out-of-memory errors." The 
insidious part of this limitation is that you will only notice that 
there is a problem when it is too late. Checkpointing will still work 
and a program can continue when the state size is too big. The system 
will only fail when trying to restore from a snapshot that has union 
state that is too big. This could be fixed by working around that issue 
but I think there are more long-term issues with this type of state.


I think we need to deprecate and remove API for state that is not tied 
to a key. Keyed state is easy to reason about, the system can 
re-partition state and also re-partition records and therefore scale the 
system in and out. Operator state, on the other hand is not tied to a 
key but an operator. This is a more "physical" concept, if you will, 
that potentially ties business logic closer to the underlying runtime 
execution model, which in turns means less degrees of freedom for the 
framework, that is Flink. This is future work, though, but we should 
start with deprecating union list state because it is the potentially 
most dangerous type of state.


We currently use this state type internally in at least the 
StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However, 
we're in the process of hopefully getting rid of it there with our work 
on sources and sinks. Before we fully remove it, we should of course 
signal this to users by deprecating it.


What do you think?

Best,
Aljoscha


Re: [DISCUSS] Releasing Flink 1.11.2

2020-09-09 Thread Zhu Zhu
Hi All,

Just an update.
All known blockers are resolved and I'm starting to create RC1 for release
1.11.2.

Thanks,
Zhu

Zhu Zhu  于2020年9月9日周三 上午11:36写道:

> Thanks for reporting this issue and offering to fix it @Jingsong Li
> 
> Agreed it is a reasonable blocker. I will postpone 1.11.2 RC1 creation
> until it is fixed.
>
> Thanks,
> Zhu
>
> Jingsong Li  于2020年9月9日周三 上午11:27写道:
>
>> Hi Zhu Zhu,
>>
>> Replenish its[1] influence:
>> For HiveStreamingSink & FileSystemSink in Table/SQL, partition commit make
>> partition visible for downstream Hive/Spark engines.
>> But due to FLINK-19166, will lose some partitions to commit after Job
>> failover in some cases, especially for short partitions.
>> In the user's opinion, the data is lost, which does not conform to
>> exactly-once.
>>
>> [1]https://issues.apache.org/jira/browse/FLINK-19166
>> [2]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit
>>
>> Best,
>> Jingsong
>>
>> On Wed, Sep 9, 2020 at 11:08 AM Jingsong Li 
>> wrote:
>>
>> > Hi Zhu Zhu,
>> >
>> > Add a new blocker: https://issues.apache.org/jira/browse/FLINK-19166
>> >
>> > Will fix it soon.
>> >
>> > Best,
>> > Jingsong
>> >
>> > On Tue, Sep 8, 2020 at 12:26 AM Zhu Zhu  wrote:
>> >
>> >> Hi All,
>> >>
>> >> Since there are still two 1.11.2 blockers items in progress, RC1
>> creation
>> >> will be postponed to tomorrow.
>> >>
>> >> Thanks,
>> >> Zhu
>> >>
>> >> Zhu Zhu  于2020年9月4日周五 下午3:50写道:
>> >>
>> >> > @Dawid
>> >> > Thanks for the information and preparing a fix for FLINK-19133!
>> >> > I have made it a blocker for 1.11.2 and will keep tracking its
>> status.
>> >> >
>> >> > @Till
>> >> > Thanks for the updates and efforts for FLINK-18959!
>> >> >
>> >> > Thanks,
>> >> > Zhu
>> >> >
>> >> > Till Rohrmann  于2020年9月4日周五 下午3:41写道:
>> >> >
>> >> >> Fyi, FLINK-18959 has been merged into the release-1.10 branch.
>> >> >>
>> >> >> Cheers,
>> >> >> Till
>> >> >>
>> >> >> On Thu, Sep 3, 2020 at 2:38 PM Dawid Wysakowicz <
>> >> dwysakow...@apache.org>
>> >> >> wrote:
>> >> >>
>> >> >>> User has just reported another issue FLINK-19133 which I think
>> should
>> >> be
>> >> >>> a blocker for the 1.11.2 release. I'll try to prepare a fix as
>> soon as
>> >> >>> possible.
>> >> >>>
>> >> >>> On 03/09/2020 09:36, Zhu Zhu wrote:
>> >> >>> > Thanks for the inputs!
>> >> >>> > I have made FLINK-14942 and FLINK-18641 blockers for 1.11.2.
>> >> >>> >
>> >> >>> > And thanks a lot for offering help, zhijiang!
>> >> >>> >
>> >> >>> > Thanks,
>> >> >>> > Zhu
>> >> >>> >
>> >> >>> > Congxian Qiu  于2020年9月3日周四 下午3:18写道:
>> >> >>> >
>> >> >>> >> Hi
>> >> >>> >> I'd like to include FLINK-14942 into 1.11.2. FLINK-14942
>> (this
>> >> >>> fixes a
>> >> >>> >> bug introduce in 1.11.0), there is a pr for it already.
>> >> >>> >> Best,
>> >> >>> >> Congxian
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> Zhou, Brian  于2020年9月3日周四 上午11:21写道:
>> >> >>> >>
>> >> >>> >>> Hi,
>> >> >>> >>>
>> >> >>> >>> Thanks Becket for addressing the issue. FLINK-18641 is now a
>> >> blocker
>> >> >>> for
>> >> >>> >>> Pravega connector integration, hope we can have it included in
>> >> 1.11.2
>> >> >>> >>> release.
>> >> >>> >>>
>> >> >>> >>> Best Regards,
>> >> >>> >>> Brian
>> >> >>> >>>
>> >> >>> >>> -Original Message-
>> >> >>> >>> From: Becket Qin 
>> >> >>> >>> Sent: Thursday, September 3, 2020 11:18
>> >> >>> >>> To: dev
>> >> >>> >>> Cc: khachatryan.ro...@gmail.com; Till Rohrmann; david;
>> Jingsong
>> >> Li
>> >> >>> >>> Subject: Re: [DISCUSS] Releasing Flink 1.11.2
>> >> >>> >>>
>> >> >>> >>>
>> >> >>> >>> [EXTERNAL EMAIL]
>> >> >>> >>>
>> >> >>> >>> Hi Zhuzhu,
>> >> >>> >>>
>> >> >>> >>> Thanks for starting the discussion.
>> >> >>> >>>
>> >> >>> >>> I'd like to include FLINK-18641 into 1.11.2 as well. It is a
>> >> >>> regression
>> >> >>> >>> from previous versions and is currently blocking the
>> development
>> >> of
>> >> >>> >> Pravega
>> >> >>> >>> source on top of FLIP-27.
>> >> >>> >>>
>> >> >>> >>> Thanks,
>> >> >>> >>>
>> >> >>> >>> Jiangjie (Becket) Qin
>> >> >>> >>>
>> >> >>> >>> On Wed, Sep 2, 2020 at 11:13 PM Zhu Zhu 
>> >> wrote:
>> >> >>> >>>
>> >> >>>  Thank you all for the inputs!
>> >> >>> 
>> >> >>>  I agree with Till that we should set a soft deadline first.
>> >> >>>  I'd like to propose next Monday if no new blocker issue pops
>> up.
>> >> >>>  But feel free to raise your concerns if you feel next Monday
>> as a
>> >> >>>  deadline may not work for fixes which should be a blocker for
>> >> >>> 1.11.2.
>> >> >>> 
>> >> >>>  Here's a summary of the wanted/blocker but still open fixes:
>> >> >>>  - FLINK-19121 Avoid accessing HDFS frequently in
>> >> >>> HiveBulkWriterFactory
>> >> >>>  - FLINK-19109 Split Reader eats chained periodic watermarks
>> >> >>>  - (not a strict blocker) FLINK-18959 Fail to
>> >> archiveExecutionGraph
>> >> >>>  because job is not finished when dispatcher close
>> >> >>

[jira] [Created] (FLINK-19174) idleTimeMsPerSecond can report incorrect values if task is blocked for more then 60 seconds

2020-09-09 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-19174:
--

 Summary: idleTimeMsPerSecond can report incorrect values if task 
is blocked for more then 60 seconds
 Key: FLINK-19174
 URL: https://issues.apache.org/jira/browse/FLINK-19174
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.12.0, 1.11.2
Reporter: Piotr Nowojski


If task is blocked for more than 60 seconds 
({{org.apache.flink.metrics.MeterView#DEFAULT_TIME_SPAN_IN_SECONDS}}), 
{{idleTimeMsPerSecond}} can be reported as zero, despite task being completely 
idle. Once the task is unblocked and the {{idleTimeMsPerSecond}} metric is 
updated, it can for the next 60 seconds exceed 1000ms/second.

Average value over the longer periods of time will be correct and accurate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther

"If virtual by default, when a user types "timestamp int" ==> persisted
column, then adds a "metadata" after that ==> virtual column, then adds 
a "persisted" after that ==> persisted column."


Thanks for this nice mental model explanation, Jark. This makes total 
sense to me. Also making the the most common case as short at just 
adding `METADATA` is a very good idea. Thanks, Danny!


Let me update the FLIP again with all these ideas.

Regards,
Timo


On 09.09.20 15:03, Jark Wu wrote:

I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM
'my-timestamp-field'] [VIRTUAL]
Especially I like the shortcut: timestamp INT METADATA, this makes the most
common case to be supported in the simplest way.

I also think the default should be "PERSISTED", so VIRTUAL is optional when
you are accessing a read-only metadata. Because:
1. The "timestamp INT METADATA" should be a normal column, because
"METADATA" is just a modifier to indicate it is from metadata, a normal
column should be persisted.
 If virtual by default, when a user types "timestamp int" ==> persisted
column, then adds a "metadata" after that ==> virtual column, then adds a
"persisted" after that ==> persisted column.
 I think this looks reversed several times and makes users confused.
Physical fields are also prefixed with "fieldName TYPE", so "timestamp INT
METADATA" is persisted is very straightforward.
2. From the collected user question [1], we can see that "timestamp" is the
most common use case. "timestamp" is a read-write metadata. Persisted by
default doesn't break the reading behavior.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15869

On Wed, 9 Sep 2020 at 20:56, Leonard Xu  wrote:


Thanks @Dawid for the nice summary, I think you catch all opinions of the
long discussion well.

@Danny
“ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]
  Note that the "FROM 'field name'" is only needed when the name conflict
  with the declared table column name, when there are no conflicts, we can
simplify it to
   timestamp INT METADATA"

I really like the proposal, there is no confusion with computed column any
more,  and it’s concise enough.


@Timo @Dawid
“We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM
makes it clearer that it comes magically from the system.”
“As for the issue of shortening the SYSTEM_METADATA to METADATA. Here I
very much prefer the SYSTEM_ prefix.”

I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot,
First of all,  the word `TIME` has broad meanings but the word `METADATA `
not,  `METADATA ` has specific meaning,
Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but
`SYSTEM_METADATA ` not.
Personally, I like more simplify way,sometimes  less is more.


Best,
Leonard





Timo Walther  于2020年9月9日周三 下午6:41写道:


Hi everyone,

"key" and "value" in the properties are a special case because they need
to configure a format. So key and value are more than just metadata.
Jark's example for setting a timestamp would work but as the FLIP
discusses, we have way more metadata fields like headers, epoch-leader,
etc. Having a property for all of this metadata would mess up the WITH
section entirely. Furthermore, we also want to deal with metadata from
the formats. Solving this through properties as well would further
complicate the property design.

Personally, I still like the computed column design more because it
allows to have full flexibility to compute the final column:

timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS

TIMESTAMP(3)))


Instead of having a helper column and a real column in the table:

helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
realTimestamp AS adjustTimestamp(helperTimestamp)

But I see that the discussion leans towards:

timestamp INT SYSTEM_METADATA("ts")

Which is fine with me. It is the shortest solution, because we don't
need additional CAST. We can discuss the syntax, so that confusion with
computed columns can be avoided.

timestamp INT USING SYSTEM_METADATA("ts")
timestamp INT FROM SYSTEM_METADATA("ts")
timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED

We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM
makes it clearer that it comes magically from the system.

What do you think?

Regards,
Timo



On 09.09.20 11:41, Jark Wu wrote:

Hi Danny,

This is not Oracle and MySQL computed column syntax, because there is

no

"AS" after the type.

Hi everyone,

If we want to use "offset INT SYSTEM_METADATA("offset")", then I think

we

must further discuss about "PERSISED" or "VIRTUAL" keyword for

query-sink

schema problem.
Personally, I think we can use a shorter keyword "METADATA" for
"SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system

function

and confuse users this looks like a computed column.


Best,
Jark



On Wed, 9 Sep 2020 at 17:23, Danny Chan  wrote:


"offset INT SYSTEM_METADATA("offset")"

This is actually Oracle or MySQL style computed column syntax.


Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-09 Thread Arvid Heise
+1 to getting rid of non-keyed state as is in general and for union state
in particular. I had a hard time to wrap my head around the semantics of
non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also
be reworked in deprecated.

My main question is how to represent state in these two cases. For sources,
state should probably be bound to splits. In that regard, split (id) may
act as a key. More generally, there should be probably a concept that
supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split
(?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a
good way to bind state to some sort of key, especially for arbitrary
communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek  wrote:

> Hi Devs,
>
> @Users: I'm cc'ing the user ML to see if there are any users that are
> relying on this feature. Please comment here if that is the case.
>
> I'd like to discuss the deprecation and eventual removal of UnionList
> Operator State, aka Operator State with Union Redistribution. If you
> don't know what I'm talking about you can take a look in the
> documentation: [1]. It's not documented thoroughly because it started
> out as mostly an internal feature.
>
> The immediate main reason for removing this is also mentioned in the
> documentation: "Do not use this feature if your list may have high
> cardinality. Checkpoint metadata will store an offset to each list
> entry, which could lead to RPC framesize or out-of-memory errors." The
> insidious part of this limitation is that you will only notice that
> there is a problem when it is too late. Checkpointing will still work
> and a program can continue when the state size is too big. The system
> will only fail when trying to restore from a snapshot that has union
> state that is too big. This could be fixed by working around that issue
> but I think there are more long-term issues with this type of state.
>
> I think we need to deprecate and remove API for state that is not tied
> to a key. Keyed state is easy to reason about, the system can
> re-partition state and also re-partition records and therefore scale the
> system in and out. Operator state, on the other hand is not tied to a
> key but an operator. This is a more "physical" concept, if you will,
> that potentially ties business logic closer to the underlying runtime
> execution model, which in turns means less degrees of freedom for the
> framework, that is Flink. This is future work, though, but we should
> start with deprecating union list state because it is the potentially
> most dangerous type of state.
>
> We currently use this state type internally in at least the
> StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> we're in the process of hopefully getting rid of it there with our work
> on sources and sinks. Before we fully remove it, we should of course
> signal this to users by deprecating it.
>
> What do you think?
>
> Best,
> Aljoscha
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


[jira] [Created] (FLINK-19175) Tests in JoinITCase do not test BroadcastHashJoin

2020-09-09 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-19175:


 Summary: Tests in JoinITCase do not test BroadcastHashJoin
 Key: FLINK-19175
 URL: https://issues.apache.org/jira/browse/FLINK-19175
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner, Tests
Reporter: Dawid Wysakowicz


The tests in JoinITCase claim to test the {{BroadcastHashJoin}}, but they 
actually do not. None of the tables used in the tests have proper statistics 
therefore, none of the tables meet the threshold for the broadcast join. At the 
same time the {{ShuffleHashJoin}} is not disabled, therefore they silently 
fallback to {{ShuffleHashJoin}}.

In summary none (or at least not all of the tests) are executed for 
BroadcastHashJoin, but are executed twice for ShuffleHashJoin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-09 Thread Seth Wiesman
Generally +1

The one use case I've seen of union state I've seen in production (outside
of sources and sinks) is as a "poor mans" broadcast state. This was
obviously before that feature was added which is now a few years ago so I
don't know if those pipelines still exist. FWIW, if they do the state
processor api can provide a migration path as it supports rewriting union
state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise  wrote:

> +1 to getting rid of non-keyed state as is in general and for union state
> in particular. I had a hard time to wrap my head around the semantics of
> non-keyed state when designing the rescale of unaligned checkpoint.
>
> The only plausible use cases are legacy source and sinks. Both should also
> be reworked in deprecated.
>
> My main question is how to represent state in these two cases. For sources,
> state should probably be bound to splits. In that regard, split (id) may
> act as a key. More generally, there should be probably a concept that
> supersedes keys and includes splits.
>
> For sinks, I can see two cases:
> - Either we are in a keyed context, then state should be bound to the key.
> - Or we are in a non-keyed context, then state might be bound to the split
> (?) in case of a source->sink chaining.
> - Maybe it should also be a new(?) concept like output partition.
>
> It's not clear to me if there are more cases and if we can always find a
> good way to bind state to some sort of key, especially for arbitrary
> communication patterns (which we may need to replace as well potentially).
>
> On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek 
> wrote:
>
> > Hi Devs,
> >
> > @Users: I'm cc'ing the user ML to see if there are any users that are
> > relying on this feature. Please comment here if that is the case.
> >
> > I'd like to discuss the deprecation and eventual removal of UnionList
> > Operator State, aka Operator State with Union Redistribution. If you
> > don't know what I'm talking about you can take a look in the
> > documentation: [1]. It's not documented thoroughly because it started
> > out as mostly an internal feature.
> >
> > The immediate main reason for removing this is also mentioned in the
> > documentation: "Do not use this feature if your list may have high
> > cardinality. Checkpoint metadata will store an offset to each list
> > entry, which could lead to RPC framesize or out-of-memory errors." The
> > insidious part of this limitation is that you will only notice that
> > there is a problem when it is too late. Checkpointing will still work
> > and a program can continue when the state size is too big. The system
> > will only fail when trying to restore from a snapshot that has union
> > state that is too big. This could be fixed by working around that issue
> > but I think there are more long-term issues with this type of state.
> >
> > I think we need to deprecate and remove API for state that is not tied
> > to a key. Keyed state is easy to reason about, the system can
> > re-partition state and also re-partition records and therefore scale the
> > system in and out. Operator state, on the other hand is not tied to a
> > key but an operator. This is a more "physical" concept, if you will,
> > that potentially ties business logic closer to the underlying runtime
> > execution model, which in turns means less degrees of freedom for the
> > framework, that is Flink. This is future work, though, but we should
> > start with deprecating union list state because it is the potentially
> > most dangerous type of state.
> >
> > We currently use this state type internally in at least the
> > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> > we're in the process of hopefully getting rid of it there with our work
> > on sources and sinks. Before we fully remove it, we should of course
> > signal this to users by deprecating it.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


[DISCUSS] Support source/sink parallelism config in Flink sql

2020-09-09 Thread admin
Hi devs:
Currently,Flink sql does not support source/sink parallelism config.So,it will 
result in wasting or lacking resources in some cases.
I think it is necessary to introduce configuration of source/sink parallelism 
in sql.
From my side,i have the solution for this feature.Add parallelism config in 
‘with’ properties of DDL.

Before 1.11,we can get parallelism and then set it to 
StreamTableSink#consumeDataStream or StreamTableSource#getDataStream
After 1.11,we can get parallelism from catalogTable and then set it to 
transformation in CommonPhysicalTableSourceScan or CommonPhysicalSink.

What do you think?






[jira] [Created] (FLINK-19176) Support ScalaPB as a message payload serializer in Stateful Functions

2020-09-09 Thread Galen Warren (Jira)
Galen Warren created FLINK-19176:


 Summary: Support ScalaPB as a message payload serializer in 
Stateful Functions
 Key: FLINK-19176
 URL: https://issues.apache.org/jira/browse/FLINK-19176
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Affects Versions: 2.0.0
Reporter: Galen Warren
 Fix For: statefun-2.1.0


Currently, Stateful Functions supports four options for serialization of 
message payloads:
 * Protobuf (based on code generated for Java)
 * Kryo
 * Multilanguage 
 * Raw

This proposal is to add a fifth option to this list, to support serialization 
of message payloads based on [ScalaPB|[https://scalapb.github.io/docs/]]. This 
would allow Scala developers to use ScalaPB-generated classes as message types 
in Stateful Functions directly as message types, without having to convert 
to/from code generated for Java and/or raw byte[] messages.

This would be a simple implementation, as there is already a 
[MessagePayloadSerializer 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java]]
 interface that is implemented for each of the existing serialization methods; 
supporting ScalaPB would require a new class implementing 
MessagePayloadSerializer in terms of ScalaPB-generated code, and a means to 
select it, by adding a new value to the [MessageFactoryType 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryType.java]]enumeration.
 Plus testing :)

I've done this already locally, the implementation is very similar to the 
existing [MessagePayloadSerializerPb 
|[https://github.com/apache/flink-statefun/blob/8ffe619a94917d676cf1054c8a0e60de544663db/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializerPb.java]]implementation.
 It uses a reference to ScalaPB in "provided" scope.

Would you be interested in a pull request to add this? Primary benefit is to 
make it easy to use Stateful Functions in a Scala environment. Thanks.

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


add support for ScalaPB-based message-payload serialization to Stateful Functions?

2020-09-09 Thread Galen Warren
Hi all -- I created a ticket
regarding a proposal to
add a new message-payload serialization method to StateFul Functions, based
on ScalaPB. This would be very similar to the existing support for protobuf
serialization based on code generated for Java. Would this be something the
community would be interested in? Any feedback is welcome.

My apologies for the formatting in the ticket, I added hyperlinks to some
code that would be involved in the implementation, but the links don't seem
to be rendering properly.

Thanks,

Galen Warren


Re: [VOTE] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-09 Thread Xintong Song
Thanks everyone,

I'm closing this vote now in a separate email.

Concerning the naming, I will use DATAPROC, as @Stephan suggested in the
discussion thread [1], for now. If there are any other opinions, feel free
to reach out to me anytime before the release.

Thank you~

Xintong Song


[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-tp44146p44533.html

On Wed, Sep 9, 2020 at 4:02 PM Andrey Zagrebin  wrote:

> For the option name, maybe:
> *flink.main*
> or
> *flink.managed* (this may be a bit confusing for existing users as we said
> that the overall managed memory is managed by Flink)
>
> On Wed, Sep 9, 2020 at 9:56 AM Andrey Zagrebin 
> wrote:
>
> > +1
> >
> > Best,
> > Andrey
> >
> > On Tue, Sep 8, 2020 at 2:16 PM Yu Li  wrote:
> >
> >> +1
> >>
> >> Best Regards,
> >> Yu
> >>
> >>
> >> On Tue, 8 Sep 2020 at 17:03, Aljoscha Krettek 
> >> wrote:
> >>
> >> > +1
> >> >
> >> > We just need to make sure to find a good name before the release but
> >> > shouldn't block any work on this.
> >> >
> >> > Aljoscha
> >> >
> >> > On 08.09.20 07:59, Xintong Song wrote:
> >> > > Thanks for the vote, @Jincheng.
> >> > >
> >> > >
> >> > > Concerning the namings, the original idea was, as you suggested, to
> >> have
> >> > > separate configuration names for batch and rocksdb while only one of
> >> them
> >> > > will take effect at a time.
> >> > >
> >> > >
> >> > > It was then in the discussion thread [1] that @Stepahn suggested to
> >> > combine
> >> > > these two.
> >> > >
> >> > >>  We never have batch algos and RocksDB mixed, having this as
> >> > separate
> >> > >> options is confusing as it suggests this can be combined
> >> arbitrarily. I
> >> > >> also think that a slim possibility that we may ever combine this in
> >> the
> >> > >> future is not enough reason to make it more complex/confusing.
> >> > >>
> >> > >
> >> > > This suggestion was also supported by others in the discussion
> thread.
> >> > > That's why we are trying to come up with a name that covers both
> batch
> >> > and
> >> > > rocksdb memory consumers.
> >> > >
> >> > >
> >> > > Thank you~
> >> > >
> >> > > Xintong Song
> >> > >
> >> > >
> >> > > [1]
> >> > >
> >> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-141-Intra-Slot-Managed-Memory-Sharing-tp44146p44253.html
> >> > >
> >> > > On Tue, Sep 8, 2020 at 1:37 PM jincheng sun <
> sunjincheng...@gmail.com
> >> >
> >> > > wrote:
> >> > >
> >> > >> +1 for the proposal!
> >> > >>
> >> > >> Regarding the name of `BATCH_OP/ROCKSDB`, we can reserve the
> >> > configuration
> >> > >> names for batch and rocksdb respectively, ` batch_ OP` for batch,
> >> > "ROCKSDB"
> >> > >> for roockdb. and the default value as follows:
> >> > >>
> >> > >> {
> >> > >>  BATCH_OP: 70,
> >> > >>  ROCKSDB : 70,
> >> > >>  PYTHON : 30
> >> > >> }
> >> > >>
> >> > >> Only one of `BATCH_ OP` and `ROCKSDB` will work. What do you think?
> >> > >>
> >> > >> Best,
> >> > >> Jincheng
> >> > >>
> >> > >>
> >> > >> Xintong Song  于2020年9月7日周一 下午1:46写道:
> >> > >>
> >> > >>> Thanks for the votes.
> >> > >>>
> >> > >>> Concerning the name for batch/RocksDB memory consumer, how about
> >> > >> "execution
> >> > >>> memory"?
> >> > >>> We can further explain in docs and config option description that
> >> this
> >> > is
> >> > >>> used for job execution, which is currently dedicated to rocksdb in
> >> > >>> streaming and batch algorithms in batch.
> >> > >>>
> >> > >>> Thank you~
> >> > >>>
> >> > >>> Xintong Song
> >> > >>>
> >> > >>>
> >> > >>>
> >> > >>> On Mon, Sep 7, 2020 at 11:43 AM Yangze Guo 
> >> wrote:
> >> > >>>
> >> >  +1
> >> > 
> >> >  Best,
> >> >  Yangze Guo
> >> > 
> >> >  On Mon, Sep 7, 2020 at 10:54 AM Zhu Zhu 
> wrote:
> >> > >
> >> > > +1
> >> > >
> >> > > Thanks,
> >> > > Zhu
> >> > >
> >> > > Dian Fu  于2020年9月7日周一 上午10:34写道:
> >> > >
> >> > >> +1
> >> > >>
> >> > >>> 在 2020年9月3日,下午8:46,Till Rohrmann  写道:
> >> > >>>
> >> > >>> Hi Xintong,
> >> > >>>
> >> > >>> thanks for starting the vote.
> >> > >>>
> >> > >>> +1 for the proposal given that we find a proper name for the
> >> > >>> different memory consumers (specifically the batch/RocksDB
> >> > >>> consumer)
> >> >  and
> >> > >>> their corresponding weights.
> >> > >>>
> >> > >>> Cheers,
> >> > >>> Till
> >> > >>>
> >> > >>> On Thu, Sep 3, 2020 at 12:43 PM Xintong Song <
> >> > >>> tonysong...@gmail.com>
> >> > >> wrote:
> >> > >>>
> >> >  Hi devs,
> >> > 
> >> >  I'd like to start a voting thread on FLIP-141[1], which
> >> proposes
> >> > >>> how
> >> >  managed memory should be shared by various use cases within a
> >> > >>> slot.
> >> >  The
> >> >  proposal has been discussed in [2].
> >> > 
> >> >  The vote will be open for at least 72h + weekends.

[RESULT][VOTE] FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-09 Thread Xintong Song
Hi devs,

I'm happy to announce that FLIP-141[1] is officially approved.

The vote [2] has been opened for more than 72h + weekends, and we have
received 9 +1s, 8 of which are binding, and no vetos. Thanks everyone for
participating.

* Xintong (binding)
* Till (binding)
* Dian (binding)
* Zhu (binding)
* Yangze (non-binding)
* Jincheng (binding)
* Aljoscha (binding)
* Yu (binding)
* Andrey (binding)

Thank you~

Xintong Song


Re: [VOTE] FLIP-139: General Python User-Defined Aggregate Function on Table API

2020-09-09 Thread Dian Fu
+1(binding)

Regards,
Dian

> 在 2020年9月8日,上午7:43,jincheng sun  写道:
> 
> +1(binding)
> 
> Best,
> Jincheng
> 
> Xingbo Huang  于2020年9月7日周一 下午5:45写道:
> 
>> Hi,
>> 
>> +1 (non-binding)
>> 
>> Best,
>> Xingbo
>> 
>> Wei Zhong  于2020年9月7日周一 下午2:37写道:
>> 
>>> Hi all,
>>> 
>>> I would like to start the vote for FLIP-139[1] which is discussed and
>>> reached consensus in the discussion thread[2].
>>> 
>>> The vote will be open for at least 72 hours. I'll try to close it by
>>> 2020-09-10 07:00 UTC, unless there is an objection or not enough votes.
>>> 
>>> Best,
>>> Wei
>>> 
>>> [1]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-139%3A+General+Python+User-Defined+Aggregate+Function+Support+on+Table+API
>>> [2]
>>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-139-General-Python-User-Defined-Aggregate-Function-on-Table-API-td44139.html
>>> 
>>> 
>> 



Re: [VOTE] FLIP-139: General Python User-Defined Aggregate Function on Table API

2020-09-09 Thread Hequn Cheng
+1 (binding)

Best,
Hequn

On Thu, Sep 10, 2020 at 10:03 AM Dian Fu  wrote:

> +1(binding)
>
> Regards,
> Dian
>
> > 在 2020年9月8日,上午7:43,jincheng sun  写道:
> >
> > +1(binding)
> >
> > Best,
> > Jincheng
> >
> > Xingbo Huang  于2020年9月7日周一 下午5:45写道:
> >
> >> Hi,
> >>
> >> +1 (non-binding)
> >>
> >> Best,
> >> Xingbo
> >>
> >> Wei Zhong  于2020年9月7日周一 下午2:37写道:
> >>
> >>> Hi all,
> >>>
> >>> I would like to start the vote for FLIP-139[1] which is discussed and
> >>> reached consensus in the discussion thread[2].
> >>>
> >>> The vote will be open for at least 72 hours. I'll try to close it by
> >>> 2020-09-10 07:00 UTC, unless there is an objection or not enough votes.
> >>>
> >>> Best,
> >>> Wei
> >>>
> >>> [1]
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-139%3A+General+Python+User-Defined+Aggregate+Function+Support+on+Table+API
> >>> [2]
> >>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-139-General-Python-User-Defined-Aggregate-Function-on-Table-API-td44139.html
> >>>
> >>>
> >>
>
>


[jira] [Created] (FLINK-19177) FLIP-141: Intra-Slot Managed Memory Sharing

2020-09-09 Thread Xintong Song (Jira)
Xintong Song created FLINK-19177:


 Summary: FLIP-141: Intra-Slot Managed Memory Sharing
 Key: FLINK-19177
 URL: https://issues.apache.org/jira/browse/FLINK-19177
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Xintong Song
Assignee: Xintong Song
 Fix For: 1.12.0


This is the umbrella ticket of [FLIP-141: Intra-Slot Managed Memory 
Sharing|https://cwiki.apache.org/confluence/display/FLINK/FLIP-141%3A+Intra-Slot+Managed+Memory+Sharing].
 

[FLIP-53|https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management]
 introduced a fraction based approach for sharing managed memory within a slot. 
This approach needs to be extended as python operators, which also use managed 
memory, are introduced. This FLIP proposes a design for extending intra-slot 
managed memory sharing for python operators and other potential future managed 
memory use cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19178) Introduce the memory weights configuration option

2020-09-09 Thread Xintong Song (Jira)
Xintong Song created FLINK-19178:


 Summary: Introduce the memory weights configuration option
 Key: FLINK-19178
 URL: https://issues.apache.org/jira/browse/FLINK-19178
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19179) Implement the new fraction calculation logic

2020-09-09 Thread Xintong Song (Jira)
Xintong Song created FLINK-19179:


 Summary: Implement the new fraction calculation logic
 Key: FLINK-19179
 URL: https://issues.apache.org/jira/browse/FLINK-19179
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song


This also means migrating the batch operator use cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19180) Make RocksDB respect the calculated fraction

2020-09-09 Thread Xintong Song (Jira)
Xintong Song created FLINK-19180:


 Summary: Make RocksDB respect the calculated fraction
 Key: FLINK-19180
 URL: https://issues.apache.org/jira/browse/FLINK-19180
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19182) Update document for intra-slot managed memory sharing

2020-09-09 Thread Xintong Song (Jira)
Xintong Song created FLINK-19182:


 Summary: Update document for intra-slot managed memory sharing
 Key: FLINK-19182
 URL: https://issues.apache.org/jira/browse/FLINK-19182
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19181) Make python processes respect the calculated fraction

2020-09-09 Thread Xintong Song (Jira)
Xintong Song created FLINK-19181:


 Summary: Make python processes respect the calculated fraction
 Key: FLINK-19181
 URL: https://issues.apache.org/jira/browse/FLINK-19181
 Project: Flink
  Issue Type: Sub-task
Reporter: Xintong Song






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19183) flink-connector-hive module compile failed with "cannot find symbol: variable TableEnvUtil"

2020-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-19183:
---

 Summary: flink-connector-hive module compile failed with "cannot 
find symbol: variable TableEnvUtil"
 Key: FLINK-19183
 URL: https://issues.apache.org/jira/browse/FLINK-19183
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.12.0
Reporter: Dian Fu
 Fix For: 1.12.0


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6416&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=9b1a0f88-517b-5893-fc93-76f4670982b4]

{code}
[ERROR] COMPILATION ERROR : 
[INFO] -
[ERROR] 
/home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[584,33]
 cannot find symbol
  symbol:   variable TableEnvUtil
  location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase
[ERROR] 
/home/vsts/work/1/s/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:[589,33]
 cannot find symbol
  symbol:   variable TableEnvUtil
  location: class org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19184) Add Batch Physical Pandas Group Aggregate Rule and RelNode

2020-09-09 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-19184:


 Summary: Add Batch Physical Pandas Group Aggregate Rule and RelNode
 Key: FLINK-19184
 URL: https://issues.apache.org/jira/browse/FLINK-19184
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.12.0


Add Batch Physical Pandas Group Aggregate Rule and RelNode



--
This message was sent by Atlassian Jira
(v8.3.4#803005)