Re: Support local aggregate push down for Blink batch planner

2021-01-05 Thread Jark Wu
Thanks for the update.

The proposal looks good to me now.

Best,
Jark

On Tue, 5 Jan 2021 at 14:44, Jingsong Li  wrote:

> Thanks for your proposal! Sebastian.
>
> +1 for SupportsAggregatePushDown. The above wonderful discussion has
> solved many of my concerns.
>
> ## Semantic problems
>
> We may need to add some mechanisms or comments, because as far as I know,
> the semantics of each database is actually different, which may need to be
> reflected in your specific implementation.
>
> For example, the AVG output types of various databases may be different.
> For example, MySQL outputs double, this is different from Flink. What
> should we do? (Lucky, avg will be splitted into sum and count, But we also
> need care about decimal and others)
>
> ## The phase of push-down rule
>
> I strongly recommend that you do not put it in the Volcano phase, which
> may make the cost calculation very troublesome.
> So in PHYSICAL_REWRITE?
>
> ## About interface
>
> For scalability, I slightly recommend that we introduce an `Aggregate`
> interface, it contains `List aggregateExpressions, int[]
> groupingFields, DataType producedDataType` fields. In this way, we can add
> fields easily without breaking compatibility.
>
> I think the current design is very good, just put forward some ideas.
>
> Best,
> Jingsong
>
> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu 
> wrote:
>
>> Hi Jark,
>>
>> Thx for your further feedback and help. The interface of
>> SupportsAggregatePushDown may indeed need some adjustments.
>>
>> For (1) Agree: Yeah, the upstream only need to know if the TableSource can
>> handle all of the aggregates.
>> It's better to just return a boolean type to indicate whether all of
>> aggregates push down was successful or not. [Resolved in proposal]
>>
>> For (2) Agree: The aggOutputDataType represent the produced data type of
>> the new table source to make sure that the new table source can
>> connect with the related exchange node. The format of this
>> aggOutputDataType is groupedFields's type + agg function's return type.
>> The reason for adding this parameter in this function is also to
>> facilitate
>> the user to build the final output type. I have changed this parameter
>> to be producedDataType. [Resolved in proposal]
>>
>> For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
>> groupingFields. [Resolved in proposal]
>>
>> Thx again for the suggestion, looking for the further discussion.
>>
>> Jark Wu  于2021年1月5日周二 下午12:05写道:
>>
>> > I'm also +1 for idea#2.
>> >
>> > Regarding to the updated interface,
>> >
>> > Result applyAggregates(List aggregateExpressions,
>> >  int[] groupSet, DataType aggOutputDataType);
>> >
>> > final class Result {
>> >private final List acceptedAggregates;
>> >private final List remainingAggregates;
>> > }
>> >
>> > I have following comments:
>> >
>> > 1) Do we need the composite Result return type? Is a boolean return type
>> > enough?
>> > From my understanding, all of the aggregates should be accepted,
>> > otherwise the pushdown should fail.
>> > Therefore, users don't need to distinguish which aggregates are
>> > "accepted".
>> >
>> > 2) Does the `aggOutputDataType` represent the produced data type of the
>> > new source, or just the return type of all the agg functions?
>> > I would prefer to `producedDataType` just like
>> > `SupportsReadingMetadata` to reduce the effort for users to concat a
>> final
>> > output type.
>> > The return type of each agg function can be obtained from the
>> > `CallExpression`.
>> >
>> > 3) What do you think about renaming `groupSet` to `grouping` or
>> > `groupedFields` ?
>> > The `groupSet` may confuse users that it relates to "grouping sets".
>> >
>> >
>> > What do you think?
>> >
>> > Best,
>> > Jark
>> >
>> >
>> >
>> > On Tue, 5 Jan 2021 at 11:04, Kurt Young  wrote:
>> >
>> >> Sorry for the typo -_-!
>> >> I meant idea #2.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >>
>> >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu 
>> >> wrote:
>> >>
>> >>> Hi Kurt,
>> >>>
>> >>> Thx a lot for your feedback. If local aggregation is more like a
>> >>> physical operator rather than logical
>> >>> operator, I think your suggestion should be idea #2 which handle all
>> in
>> >>> the physical optimization phase?
>> >>>
>> >>> Looking forward for the further discussion.
>> >>>
>> >>>
>> >>> Kurt Young  于2021年1月5日周二 上午9:52写道:
>> >>>
>>  Local aggregation is more like a physical operator rather than
>> logical
>>  operator. I would suggest going with idea #1.
>> 
>>  Best,
>>  Kurt
>> 
>> 
>>  On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu > >
>>  wrote:
>> 
>>  > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>>  > For (1): Agree: Since we are in the period of upgrading the new
>> table
>>  > source api,
>>  > we really should consider the new interface for the new optimize
>>  rule. If
>>  > the new rule
>>  > doe

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Arvid Heise
Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately.
My gut feeling says that this is something we should only address for new
sinks where we decouple the semantics of commits and checkpoints
anyways. @Aljoscha
Krettek  any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source
partition that is finished before the first checkpoint. Then, we would need
to store the finished state of the subtask somehow. So I'm assuming, we
still need to trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was
assuming we want to have it more fine-grained on OperatorSubtaskState.
Maybe we can store the flag inside managed or raw state without changing
the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao  wrote:

>Hi all,
>
>  I tested the previous PoC with the current tests and I found some
> new issues that might cause divergence, and sorry for there might also be
> some reversal for some previous problems:
>
>
>  1. Which operators should wait for one more checkpoint before close ?
>
> One motivation for this FLIP is to ensure the 2PC sink commits the
> last part of data before closed, which makes the sink operator need to wait
> for one more checkpoint like onEndOfInput() -> waitForCheckpoint() ->
> notifyCheckpointComplete() -> close(). This lead to the issue which
> operators should wait for checkpoint? Possible options are
>  a. Make all the operators (or UDF) implemented
> notifyCheckpointCompleted method wait for one more checkpoint. One
> exception is that since we can only snapshot one or all tasks for a legacy
> source operator to avoid data repetition[1], we could not support legacy
> operators and its chained operators to wait for checkpoints since there
> will be deadlock if part of the tasks are finished, this would finally be
> solved after legacy source are deprecated. The PoC used this option for now.
> b. Make operators (or UDF) implemented a special marker
> interface to wait for one more checkpoint.
>
>
>2. Do we need to solve the case that tasks finished before triggered ?
>
>   Previously I think we could postpone it, however, during testing I
> found that it might cause some problems since by default checkpoint failure
> would cause job failover, and the job would also need wait for another
> interval to trigger the next checkpoint. To pass the tests, I updated the
> PoC to include this part, and we may have a double think on if we need to
> include it or use some other options.
>
> 3. How to extend a new format for checkpoint meta ?
>
> Sorry previously I gave a wrong estimation, after I extract a
> sub-component for (de)serialize operator state, I found the problem just
> goes to the new OperatorStateSerializer. The problem seems to be that v2,
> v3 and v4 have different fields, thus they use different process when
> (de)serialize, which is a bit different from the case that we have a fixed
> steps and each step has different logic. Thus we might either
>  a. Use base classes for each two version.
>  b. Or have a unified framework contains all the possible fields
> across all version, and use empty field serializer to skip some fields in
> each version.
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks
>
> --
> From:Yun Gao 
> Send Time:2020 Dec. 16 (Wed.) 11:07
> To:Aljoscha Krettek ; dev ;
> user 
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>  Hi Aljoscha,
>
> Very thanks for the feedbacks! For the remaining issues:
>
>
>   > 1. You mean we would insert "artificial" barriers for barrier 2 in 
> case we receive  EndOfPartition while other inputs have already received 
> barrier 2? I think that makes sense, yes.
>
>
>   Yes, exactly, I would like to  insert "artificial" barriers for in case 
> we receive  EndOfPartition while other inputs have already received barrier 
> 2, and also for the similar cases that some input channels received 
> EndOfPartition during checkpoint 2 is ongoing and when the task receive 
> directly checkpoint triggering after all the precedent tasks are finished but 
> not received their EndOfPartition yet.
>
>
>  > 3. This indeed seems complex. Maybe we could switch to using 
> composition instead of inheritance to make this more extensible?
>
>
> I re-checked the code and now I think composition would be better to 
> avoid complex inheritance hierarchy by exposing the changed part 
> `(de)serializeOperatorState` out, and I'll update the PoC to change this 
> part. Very thanks for the suggestions!
>
>
>> 4. Don't we currently have the sam

Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-05 Thread Till Rohrmann
+1 for Jark's and Xintong's proposal.

Would the default weight for OPERATOR and STATE_BACKEND be the same value?

Cheers,
Till

On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li  wrote:

> +1 for allowing streaming operators to use managed memory.
>
> The memory use of streams requires some hierarchy, and the bottom layer is
> undoubtedly the current StateBackend.
> Let the stream operators freely use the managed memory, which will make the
> memory management model to be unified and give the operator free space.
>
> Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> `STATE_BACKEND` or `OPERATOR`.
>
> Best,
> Jingsong
>
> On Tue, Jan 5, 2021 at 12:33 PM Jark Wu  wrote:
>
> > +1 to Xingtong's proposal!
> >
> > Best,
> > Jark
> >
> > On Tue, 5 Jan 2021 at 12:13, Xintong Song  wrote:
> >
> > > +1 for allowing streaming operators to use managed memory.
> > >
> > > As for the consumer names, I'm afraid using `DATAPROC` for both
> streaming
> > > ops and state backends will not work. Currently, RocksDB state backend
> > uses
> > > a shared piece of memory for all the states within that slot. It's not
> > the
> > > operator's decision how much memory it uses for the states.
> > >
> > > I would suggest the following. (IIUC, the same as what Jark proposed)
> > > * `OPERATOR` for both streaming and bath operators
> > > * `STATE_BACKEND` for state backends
> > > * `PYTHON` for python processes
> > > * `DATAPROC` as a legacy key for state backend or batch operators if
> > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu  wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > `STATE_BACKEND`, because they have different scope (slot vs.
> operator).
> > > > But @Xintong Song  may have more insights on
> > it.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek 
> > > wrote:
> > > >
> > > >> I agree, we should allow streaming operators to use managed memory
> for
> > > >> other use cases.
> > > >>
> > > >> Do you think we need an additional "consumer" setting or that they
> > would
> > > >> just use `DATAPROC` and decide by themselves what to use the memory
> > for?
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >> On 2020/12/22 17:14, Jark Wu wrote:
> > > >> >Hi all,
> > > >> >
> > > >> >I found that currently the managed memory can only be used in 3
> > > workloads
> > > >> >[1]:
> > > >> >- state backends for streaming jobs
> > > >> >- sorting, hash tables for batch jobs
> > > >> >- python UDFs
> > > >> >
> > > >> >And the configuration option
> > > >> `taskmanager.memory.managed.consumer-weights`
> > > >> >only allows values: PYTHON and DATAPROC (state in streaming or
> > > algorithms
> > > >> >in batch).
> > > >> >I'm confused why it doesn't allow streaming operators to use
> managed
> > > >> memory
> > > >> >for purposes other than state backends.
> > > >> >
> > > >> >The background is that we are planning to use some batch algorithms
> > > >> >(sorting & bytes hash table) to improve the performance of
> streaming
> > > SQL
> > > >> >operators, especially for the mini-batch operators.
> > > >> >Currently, the mini-batch operators are buffering input records and
> > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient and
> > > there
> > > >> >are potential risks of full GC and OOM.
> > > >> >With the managed memory, we can fully use the memory to buffer more
> > > data
> > > >> >without worrying about OOM and improve the performance a lot.
> > > >> >
> > > >> >What do you think about allowing streaming operators to use managed
> > > >> memory
> > > >> >and exposing it in configuration.
> > > >> >
> > > >> >Best,
> > > >> >Jark
> > > >> >
> > > >> >[1]:
> > > >> >
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> > > >>
> > > >
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


[PSA] Configure "Save Actions" only for Java files

2021-01-05 Thread Aljoscha Krettek
If you're using "Save Actions" to auto-format your Java code, as 
recommended in [1], you should add a regex in the settings to make sure 
that this only formats Java code. Otherwise you will get weird results 
when IntelliJ also formats XML, Markdown or Scala files for you.


Best,
Aljoscha

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/ide_setup.html#code-formatting


Re: [PSA] Configure "Save Actions" only for Java files

2021-01-05 Thread Till Rohrmann
This is very helpful. Thanks a lot Aljoscha!

Cheers,
Till

On Tue, Jan 5, 2021 at 10:59 AM Aljoscha Krettek 
wrote:

> If you're using "Save Actions" to auto-format your Java code, as
> recommended in [1], you should add a regex in the settings to make sure
> that this only formats Java code. Otherwise you will get weird results
> when IntelliJ also formats XML, Markdown or Scala files for you.
>
> Best,
> Aljoscha
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/ide_setup.html#code-formatting
>


Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-05 Thread Xintong Song
>
> Would the default weight for OPERATOR and STATE_BACKEND be the same value?
>

I would say yes, to align with previous behaviors.


Thank you~

Xintong Song



On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann  wrote:

> +1 for Jark's and Xintong's proposal.
>
> Would the default weight for OPERATOR and STATE_BACKEND be the same value?
>
> Cheers,
> Till
>
> On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li  wrote:
>
> > +1 for allowing streaming operators to use managed memory.
> >
> > The memory use of streams requires some hierarchy, and the bottom layer
> is
> > undoubtedly the current StateBackend.
> > Let the stream operators freely use the managed memory, which will make
> the
> > memory management model to be unified and give the operator free space.
> >
> > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> > `STATE_BACKEND` or `OPERATOR`.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu  wrote:
> >
> > > +1 to Xingtong's proposal!
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 5 Jan 2021 at 12:13, Xintong Song 
> wrote:
> > >
> > > > +1 for allowing streaming operators to use managed memory.
> > > >
> > > > As for the consumer names, I'm afraid using `DATAPROC` for both
> > streaming
> > > > ops and state backends will not work. Currently, RocksDB state
> backend
> > > uses
> > > > a shared piece of memory for all the states within that slot. It's
> not
> > > the
> > > > operator's decision how much memory it uses for the states.
> > > >
> > > > I would suggest the following. (IIUC, the same as what Jark proposed)
> > > > * `OPERATOR` for both streaming and bath operators
> > > > * `STATE_BACKEND` for state backends
> > > > * `PYTHON` for python processes
> > > > * `DATAPROC` as a legacy key for state backend or batch operators if
> > > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu  wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > > `STATE_BACKEND`, because they have different scope (slot vs.
> > operator).
> > > > > But @Xintong Song  may have more insights
> on
> > > it.
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > >
> > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek  >
> > > > wrote:
> > > > >
> > > > >> I agree, we should allow streaming operators to use managed memory
> > for
> > > > >> other use cases.
> > > > >>
> > > > >> Do you think we need an additional "consumer" setting or that they
> > > would
> > > > >> just use `DATAPROC` and decide by themselves what to use the
> memory
> > > for?
> > > > >>
> > > > >> Best,
> > > > >> Aljoscha
> > > > >>
> > > > >> On 2020/12/22 17:14, Jark Wu wrote:
> > > > >> >Hi all,
> > > > >> >
> > > > >> >I found that currently the managed memory can only be used in 3
> > > > workloads
> > > > >> >[1]:
> > > > >> >- state backends for streaming jobs
> > > > >> >- sorting, hash tables for batch jobs
> > > > >> >- python UDFs
> > > > >> >
> > > > >> >And the configuration option
> > > > >> `taskmanager.memory.managed.consumer-weights`
> > > > >> >only allows values: PYTHON and DATAPROC (state in streaming or
> > > > algorithms
> > > > >> >in batch).
> > > > >> >I'm confused why it doesn't allow streaming operators to use
> > managed
> > > > >> memory
> > > > >> >for purposes other than state backends.
> > > > >> >
> > > > >> >The background is that we are planning to use some batch
> algorithms
> > > > >> >(sorting & bytes hash table) to improve the performance of
> > streaming
> > > > SQL
> > > > >> >operators, especially for the mini-batch operators.
> > > > >> >Currently, the mini-batch operators are buffering input records
> and
> > > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient
> and
> > > > there
> > > > >> >are potential risks of full GC and OOM.
> > > > >> >With the managed memory, we can fully use the memory to buffer
> more
> > > > data
> > > > >> >without worrying about OOM and improve the performance a lot.
> > > > >> >
> > > > >> >What do you think about allowing streaming operators to use
> managed
> > > > >> memory
> > > > >> >and exposing it in configuration.
> > > > >> >
> > > > >> >Best,
> > > > >> >Jark
> > > > >> >
> > > > >> >[1]:
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> > > > >>
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


[CVE-2020-17519] Apache Flink directory traversal attack: reading remote files through the REST API

2021-01-05 Thread Robert Metzger
CVE-2020-17519: Apache Flink directory traversal attack: reading remote
files through the REST API

Vendor:
The Apache Software Foundation

Versions Affected:
1.11.0, 1.11.1, 1.11.2

Description:
A change introduced in Apache Flink 1.11.0 (and released in 1.11.1 and
1.11.2 as well) allows attackers to read any file on the local filesystem
of the JobManager through the REST interface of the JobManager process.
Access is restricted to files accessible by the JobManager process.

Mitigation:
All users should upgrade to Flink 1.11.3 or 1.12.0 if their Flink
instance(s) are exposed.
The issue was fixed in commit b561010b0ee741543c3953306037f00d7a9f0801 from
apache/flink:master.

Credits:
This issue was discovered by 0rich1 of Ant Security FG Lab


[CVE-2020-17518] Apache Flink directory traversal attack: remote file writing through the REST API

2021-01-05 Thread Robert Metzger
CVE-2020-17518: Apache Flink directory traversal attack: remote file
writing through the REST API

Vendor:
The Apache Software Foundation

Versions Affected:
1.5.1 to 1.11.2

Description:
Flink 1.5.1 introduced a REST handler that allows you to write an uploaded
file to an arbitrary location on the local file system, through a
maliciously modified HTTP HEADER. The files can be written to any location
accessible by Flink 1.5.1.

Mitigation:
All users should upgrade to Flink 1.11.3 or 1.12.0 if their Flink
instance(s) are exposed.
The issue was fixed in commit a5264a6f41524afe8ceadf1d8ddc8c80f323ebc4 from
apache/flink:master.

Credits:
This issue was discovered by 0rich1 of Ant Security FG Lab


[jira] [Created] (FLINK-20850) Analyze whether CoLocationConstraints and CoLocationGroup can be removed

2021-01-05 Thread Matthias (Jira)
Matthias created FLINK-20850:


 Summary: Analyze whether CoLocationConstraints and CoLocationGroup 
can be removed
 Key: FLINK-20850
 URL: https://issues.apache.org/jira/browse/FLINK-20850
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias


It appears that {{CoLocationGroup}} and {{CoLocationContraint}} are not used by 
{{LocalInputPreferredSlotSharingStrategy}} anymore. Instead, 
{{CoLocationGroupDesc}} and {{CoLocationContraintDesc}} serve as light-weight 
versions of these concepts.

Does this mean that we can remove the former ones?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Apache Pinot Sink

2021-01-05 Thread Poerschke, Mats
Hi all,

we want to contribute a sink connector for Apache Pinot. The following briefly 
describes the planned control flow. Please feel free to comment on any of its 
aspects.

Background
Apache Pinot is a large-scale real-time data ingestion engine working on data 
segments internally. The controller exposes an external API which allows 
posting new segments via REST call. A thereby posted segment must contain an id 
(called segment name).

Control Flow
The Flink sink will collect data tuples locally. When creating a checkpoint, 
all those tuples are grouped into one segment which is then pushed to the Pinot 
controller. We will assign each pushed segment a unique incrementing identifier.
After receiving a success response from the Pinot controller, the latest 
segment name is persisted within the Flink checkpoint.
In case we have to recover from a failure, the latest successfully pushed 
segment name can be reconstructed from the Flink checkpoint. At this point the 
system might be in an inconsistent state. The Pinot controller might have 
already stored a newer segment (which’s name was, due to the failure, not 
persisted in the flink checkpoint).
This inconsistency is resolved with the next successful checkpoint creation. 
The there pushed segment will get the same segment name assigned as the 
inconsistent segment. Thus, Pinot replaces the old with the new segment which 
prevents introducing duplicate entries.


Best regards
Mats Pörschke



Re: Apache Pinot Sink

2021-01-05 Thread Poerschke, Mats
Just as a short addition: We plan to contribute the sink to Apache Bahir.

Best regards
Mats Pörschke

> On 5. Jan 2021, at 13:21, Poerschke, Mats 
>  wrote:
> 
> Hi all,
> 
> we want to contribute a sink connector for Apache Pinot. The following 
> briefly describes the planned control flow. Please feel free to comment on 
> any of its aspects.
> 
> Background
> Apache Pinot is a large-scale real-time data ingestion engine working on data 
> segments internally. The controller exposes an external API which allows 
> posting new segments via REST call. A thereby posted segment must contain an 
> id (called segment name).
> 
> Control Flow
> The Flink sink will collect data tuples locally. When creating a checkpoint, 
> all those tuples are grouped into one segment which is then pushed to the 
> Pinot controller. We will assign each pushed segment a unique incrementing 
> identifier.
> After receiving a success response from the Pinot controller, the latest 
> segment name is persisted within the Flink checkpoint.
> In case we have to recover from a failure, the latest successfully pushed 
> segment name can be reconstructed from the Flink checkpoint. At this point 
> the system might be in an inconsistent state. The Pinot controller might have 
> already stored a newer segment (which’s name was, due to the failure, not 
> persisted in the flink checkpoint).
> This inconsistency is resolved with the next successful checkpoint creation. 
> The there pushed segment will get the same segment name assigned as the 
> inconsistent segment. Thus, Pinot replaces the old with the new segment which 
> prevents introducing duplicate entries.
> 
> 
> Best regards
> Mats Pörschke
> 



[jira] [Created] (FLINK-20851) flink datagen produce NULL value

2021-01-05 Thread appleyuchi (Jira)
appleyuchi created FLINK-20851:
--

 Summary: flink datagen produce NULL value
 Key: FLINK-20851
 URL: https://issues.apache.org/jira/browse/FLINK-20851
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.12.0
 Environment: 
CREATE TABLE orders (
order_uid  BIGINT,
product_id BIGINT,
price  DECIMAL(32, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
);

SELECT * FROM orders;


This DDL will produce NULL for column price.

but it's ok in
https://github.com/knaufk/flink-sql-cookbook/blob/master/recipes/01/01_create_table.md

①If it's the flink-sql-book's fault,tell me please,
Could you please tell me the right syntax for it?

②If it's the flink's bug,fix it please.

Thanks for your help.


Reporter: appleyuchi
 Attachments: datagen報錯.png





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20852) Enrich back pressure stats per subtask in the WebUI

2021-01-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-20852:
--

 Summary: Enrich back pressure stats per subtask in the WebUI
 Key: FLINK-20852
 URL: https://issues.apache.org/jira/browse/FLINK-20852
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Affects Versions: 1.12.0
Reporter: Piotr Nowojski
 Fix For: 1.13.0


We can enrich the back pressure tab in the WebUI with a couple of more metrics 
that can help us diagnose the problem, like:
* backPressuredTimeMsPerSecond
* busyTimeMsPerSecond
* idleTimeMsPerSecond
* inPoolUsage
* outPoolUsage



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20853) Add reader schema null check for AvroDeserializationSchema when recordClazz is GenericRecord

2021-01-05 Thread hailong wang (Jira)
hailong wang created FLINK-20853:


 Summary: Add reader schema null check for 
AvroDeserializationSchema when recordClazz is GenericRecord 
 Key: FLINK-20853
 URL: https://issues.apache.org/jira/browse/FLINK-20853
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.12.0
Reporter: hailong wang
 Fix For: 1.13.0


Reader schema can not be null when recordClazz is GenericRecord.

Although its constructor is default, this will cause NPE when reader schema is 
null and recordClazz is GenericRecord for the class extends it, such as 
RegistryAvroDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Aljoscha Krettek

On 2021/01/05 10:16, Arvid Heise wrote:

1. I'd think that this is an orthogonal issue, which I'd solve separately.
My gut feeling says that this is something we should only address for new
sinks where we decouple the semantics of commits and checkpoints
anyways. @Aljoscha
Krettek  any idea on this one?


I also think it's somewhat orthogonal, let's see where we land here once 
the other issues are hammered out.



2. I'm not sure I get it completely. Let's assume we have a source
partition that is finished before the first checkpoint. Then, we would need
to store the finished state of the subtask somehow. So I'm assuming, we
still need to trigger some checkpointing code on finished subtasks.


What he's talking about here is the race condition between a) checkpoint 
coordinator decides to do a checkpoint and b) a source operator shuts 
down.


Normally, the checkpoint coordinator only needs to trigger sources, and 
not intermediate operators. When we allow sources to shut down, 
intermediate operators now can become the "head" of a pipeline and 
become the things that need to be triggered.


One thought here is this: will there ever be intermediate operators that 
should be running that are not connected to at least once source? The 
only case I can think of right now is async I/O. Or are there others? If 
we think that there will never be intermediate operators that are not 
connected to at least once source we might come up with a simpler 
solution.



3. Do we really want to store the finished flag in OperatorState? I was
assuming we want to have it more fine-grained on OperatorSubtaskState.
Maybe we can store the flag inside managed or raw state without changing
the format?


I think we cannot store it in `OperatorSubtaskState` because of how 
operator state (the actual `ListState` that operators use) is reshuffled 
on restore to all operators. So normally it doesn't make sense to say 
that one of the subtasks is done when operator state is involved. Only 
when all subtasks are done can we record this operator as done, I think.


Best,
Aljoscha


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Yun Gao
 Hi Avrid, 

 Very thanks for the feedbacks!

 For the second issue, sorry I think I might not make it very clear, 
I'm initially thinking the case that for example for a job with graph A -> B -> 
C, when we compute which tasks to trigger, A is still running, so we trigger A 
to start the checkpoint. However, before the triggering message reached A, A 
gets finished and the trigger message failed due to not found the task. In this 
case if we do not handle it, the checkpoint would failed due to timeout. 
However, by default failed checkpoint would cause job failure and we would also 
need to wait for a checkpoint interval for the next checkpoint. One solution 
would be check all the pending checkpoints to trigger B instead when JM is 
notified that A is finished.

   For the third issue, it should work if we store a special value for some 
filed in OperatorState or OperatorSubtaskState, for example, we might store a 
special subtaskState map inside the OperatorState to mark it is finished since 
the finished operator should always have an empty state. Very thanks for the 
advices! I'll try with this method. 

Best,
 Yun



--
From:Arvid Heise 
Send Time:2021 Jan. 5 (Tue.) 17:16
To:Yun Gao 
Cc:Aljoscha Krettek ; dev ; user 

Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately. My 
gut feeling says that this is something we should only address for new sinks 
where we decouple the semantics of commits and checkpoints anyways. @Aljoscha 
Krettek any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source partition 
that is finished before the first checkpoint. Then, we would need to store the 
finished state of the subtask somehow. So I'm assuming, we still need to 
trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was 
assuming we want to have it more fine-grained on OperatorSubtaskState. Maybe we 
can store the flag inside managed or raw state without changing the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao  wrote:

   Hi all,

 I tested the previous PoC with the current tests and I found some new 
issues that might cause divergence, and sorry for there might also be some 
reversal for some previous problems:

 1. Which operators should wait for one more checkpoint before close ?

One motivation for this FLIP is to ensure the 2PC sink commits the last 
part of data before closed, which makes the sink operator need to wait for one 
more checkpoint like onEndOfInput() -> waitForCheckpoint() -> 
notifyCheckpointComplete() -> close(). This lead to the issue which operators 
should wait for checkpoint? Possible options are 
 a. Make all the operators (or UDF) implemented 
notifyCheckpointCompleted method wait for one more checkpoint. One exception is 
that since we can only snapshot one or all tasks for a legacy source operator 
to avoid data repetition[1], we could not support legacy operators and its 
chained operators to wait for checkpoints since there will be deadlock if part 
of the tasks are finished, this would finally be solved after legacy source are 
deprecated. The PoC used this option for now.
b. Make operators (or UDF) implemented a special marker 
interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

  Previously I think we could postpone it, however, during testing I found 
that it might cause some problems since by default checkpoint failure would 
cause job failover, and the job would also need wait for another interval to 
trigger the next checkpoint. To pass the tests, I updated the PoC to include 
this part, and we may have a double think on if we need to include it or use 
some other options.

3. How to extend a new format for checkpoint meta ?

Sorry previously I gave a wrong estimation, after I extract a sub-component 
for (de)serialize operator state, I found the problem just goes to the new 
OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have 
different fields, thus they use different process when (de)serialize, which is 
a bit different from the case that we have a fixed steps and each step has 
different logic. Thus we might either
 a. Use base classes for each two version.
 b. Or have a unified framework contains all the possible fields across all 
version, and use empty field serializer to skip some fields in each version.

Best,
Yun

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks

--

Re: [DISCUSS] Drop Scala 2.11

2021-01-05 Thread Aljoscha Krettek
There is some new enthusiasm for bringing Scala 2.13 support to Flink: 
https://issues.apache.org/jira/browse/FLINK-13414.


One of the assumed prerequisites for this is dropping support for Scala 
2.11 because it will be too hard (impossible) to try and support three 
Scala versions at the same time.


What do you all think now that some time has passed?

I would still be in favour of dropping Scala 2.11 but I can see Theo's 
point of people being tied to distro releases. On the other hand, Scala 
2.11 is quite old now and some projects (such as Kafka) don't release 
newer versions with Scala 2.11 support.


Best,
Aljoscha

On 2020/09/13 21:48, Jeff Zhang wrote:

I would hope we can make scala shell with scala 2.12 before dropping 2.11,
many users are still using scala shell to try out new features of flink,
especially for new users.


Chesnay Schepler  于2020年9月13日周日 下午7:59写道:


Are we then also dropping the scala-shell, since it still doesn't work
on 2.12?

On 9/11/2020 2:12 PM, Timo Walther wrote:
> Big +1 to drop Scala 2.11
>
> This would mean that we can finally use Java 8 language features that
> are integrated with Scala.
>
> Regards,
> Timo
>
> On 11.09.20 13:15, Igal Shilman wrote:
>> @Galen  FYI: the upcoming StateFun release would use Scala2.12
>>
>> On Thu, Sep 10, 2020 at 5:14 PM Seth Wiesman 
>> wrote:
>>
>>> @glen
>>>
>>> Yes, we would absolutely migrate statefun. StateFun can be compiled
>>> with
>>> Scala 2.12 today, I'm not sure why it's not cross released.
>>>
>>> @aljoscha :)
>>>
>>> @mathieu Its on the roadmap but it's non-trivial and I'm not aware of
>>> anyone actively working on it.
>>>
>>> On Thu, Sep 10, 2020 at 10:09 AM Matthieu Bonneviot
>>>  wrote:
>>>
 That makes sense.
 We are using 2.12 for our production
 Also, for flink scala 2.12 support, it is in fact limited to scala
 2.12.7.
 It is binary incompatible with version 2.12 above (
 https://issues.apache.org/jira/browse/FLINK-12461 )
 That would be great to at least move to a more recent 2.12 version,
 and
 ideally to 2.13.

 Is there any scala support plan available?

 Matthieu


 On Thu, Sep 10, 2020 at 5:00 PM Aljoscha Krettek 
 wrote:

> Yes! I would be in favour of this since it's blocking us from
> upgrading
> certain dependencies.
>
> I would also be in favour of dropping Scala completely but that's a
> different story.
>
> Aljoscha
>
> On 10.09.20 16:51, Seth Wiesman wrote:
>> Hi Everyone,
>>
>> Think of this as a pre-flip, but what does everyone think about
 dropping
>> Scala 2.11 support from Flink.
>>
>> The last patch release was in 2017 and in that time the scala
 community
> has
>> released 2.13 and is working towards a 3.0 release. Apache Kafka and
> Spark
>> have both dropped 2.11 support in recent versions. In fact, Flink's
>> universal Kafka connector is stuck on 2.4 because that is the last
> version
>> with scala 2.11 support.
>>
>> What are people's thoughts on dropping Scala 2.11? How many are
>> still
> using
>> it in production?
>>
>> Seth
>>
>
>

 --
 Matthieu Bonneviot
 Senior R&D Engineer, DataDome
 M +33 7 68 29 79 34  <+33+7+68+29+79+34>
 E matthieu.bonnev...@datadome.co 
 W www.datadome.co
 <

http://www.datadome.co?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature

>

 [image: facebook]
 <

https://www.facebook.com/datadome/?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature

>
 [image:
 linkedin]
 <

https://fr.linkedin.com/company/datadome?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature

>
 [image:
 twitter]
 <

https://twitter.com/data_dome?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature

>

>>>
>>
>
>




--
Best Regards

Jeff Zhang


[jira] [Created] (FLINK-20854) Introduce BytesMultiMap to support buffering records

2021-01-05 Thread Jark Wu (Jira)
Jark Wu created FLINK-20854:
---

 Summary: Introduce BytesMultiMap to support buffering records
 Key: FLINK-20854
 URL: https://issues.apache.org/jira/browse/FLINK-20854
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Drop Scala 2.11

2021-01-05 Thread Jeff Zhang
Glad to see someone in community would like to drive this effort.
If scala 2.13 can do whatever scala 2.11 can do in flink (such as support
scala-shell, scala lambda udf and etc), then I would be 100% support of
dropping scala 2.11.

Aljoscha Krettek  于2021年1月5日周二 下午11:01写道:

> There is some new enthusiasm for bringing Scala 2.13 support to Flink:
> https://issues.apache.org/jira/browse/FLINK-13414.
>
> One of the assumed prerequisites for this is dropping support for Scala
> 2.11 because it will be too hard (impossible) to try and support three
> Scala versions at the same time.
>
> What do you all think now that some time has passed?
>
> I would still be in favour of dropping Scala 2.11 but I can see Theo's
> point of people being tied to distro releases. On the other hand, Scala
> 2.11 is quite old now and some projects (such as Kafka) don't release
> newer versions with Scala 2.11 support.
>
> Best,
> Aljoscha
>
> On 2020/09/13 21:48, Jeff Zhang wrote:
> >I would hope we can make scala shell with scala 2.12 before dropping 2.11,
> >many users are still using scala shell to try out new features of flink,
> >especially for new users.
> >
> >
> >Chesnay Schepler  于2020年9月13日周日 下午7:59写道:
> >
> >> Are we then also dropping the scala-shell, since it still doesn't work
> >> on 2.12?
> >>
> >> On 9/11/2020 2:12 PM, Timo Walther wrote:
> >> > Big +1 to drop Scala 2.11
> >> >
> >> > This would mean that we can finally use Java 8 language features that
> >> > are integrated with Scala.
> >> >
> >> > Regards,
> >> > Timo
> >> >
> >> > On 11.09.20 13:15, Igal Shilman wrote:
> >> >> @Galen  FYI: the upcoming StateFun release would use Scala2.12
> >> >>
> >> >> On Thu, Sep 10, 2020 at 5:14 PM Seth Wiesman 
> >> >> wrote:
> >> >>
> >> >>> @glen
> >> >>>
> >> >>> Yes, we would absolutely migrate statefun. StateFun can be compiled
> >> >>> with
> >> >>> Scala 2.12 today, I'm not sure why it's not cross released.
> >> >>>
> >> >>> @aljoscha :)
> >> >>>
> >> >>> @mathieu Its on the roadmap but it's non-trivial and I'm not aware
> of
> >> >>> anyone actively working on it.
> >> >>>
> >> >>> On Thu, Sep 10, 2020 at 10:09 AM Matthieu Bonneviot
> >> >>>  wrote:
> >> >>>
> >>  That makes sense.
> >>  We are using 2.12 for our production
> >>  Also, for flink scala 2.12 support, it is in fact limited to scala
> >>  2.12.7.
> >>  It is binary incompatible with version 2.12 above (
> >>  https://issues.apache.org/jira/browse/FLINK-12461 )
> >>  That would be great to at least move to a more recent 2.12 version,
> >>  and
> >>  ideally to 2.13.
> >> 
> >>  Is there any scala support plan available?
> >> 
> >>  Matthieu
> >> 
> >> 
> >>  On Thu, Sep 10, 2020 at 5:00 PM Aljoscha Krettek <
> aljos...@apache.org
> >> >
> >>  wrote:
> >> 
> >> > Yes! I would be in favour of this since it's blocking us from
> >> > upgrading
> >> > certain dependencies.
> >> >
> >> > I would also be in favour of dropping Scala completely but that's
> a
> >> > different story.
> >> >
> >> > Aljoscha
> >> >
> >> > On 10.09.20 16:51, Seth Wiesman wrote:
> >> >> Hi Everyone,
> >> >>
> >> >> Think of this as a pre-flip, but what does everyone think about
> >>  dropping
> >> >> Scala 2.11 support from Flink.
> >> >>
> >> >> The last patch release was in 2017 and in that time the scala
> >>  community
> >> > has
> >> >> released 2.13 and is working towards a 3.0 release. Apache Kafka
> and
> >> > Spark
> >> >> have both dropped 2.11 support in recent versions. In fact,
> Flink's
> >> >> universal Kafka connector is stuck on 2.4 because that is the
> last
> >> > version
> >> >> with scala 2.11 support.
> >> >>
> >> >> What are people's thoughts on dropping Scala 2.11? How many are
> >> >> still
> >> > using
> >> >> it in production?
> >> >>
> >> >> Seth
> >> >>
> >> >
> >> >
> >> 
> >>  --
> >>  Matthieu Bonneviot
> >>  Senior R&D Engineer, DataDome
> >>  M +33 7 68 29 79 34  <+33+7+68+29+79+34>
> >>  E matthieu.bonnev...@datadome.co 
> >>  W www.datadome.co
> >>  <
> >> 
> >>
> http://www.datadome.co?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature
> >> 
> >> >
> >> 
> >>  [image: facebook]
> >>  <
> >> 
> >>
> https://www.facebook.com/datadome/?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature
> >> 
> >> >
> >>  [image:
> >>  linkedin]
> >>  <
> >> 
> >>
> https://fr.linkedin.com/company/datadome?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature
> >> 
> >> >
> >>  [image:
> >>  twitter]
> >>  <
> >> 
> >>
> https://twitter.com/data_dome?utm_source=WiseStamp&utm_medium=email&utm_term=&utm_content=&utm_campaign=signature
> >> 
> >> >
> >> 

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Yun Gao
 Hi Aljoscha, 

 Very thanks for the feedbacks!

 For the second issue, I'm indeed thinking the race condition between 
deciding to trigger and operator get finished. And for this point,
 > One thought here is this: will there ever be intermediate operators that 
> should be running that are not connected to at least once source? The 
> only case I can think of right now is async I/O. Or are there others? If 
> we think that there will never be intermediate operators that are not 
> connected to at least once source we might come up with a simpler 
> solution.
 I think there are still cases that the intermediate operators runs with 
all its sources have finished, for example, source -> sink writer -> sink 
committer -> sink global committer,  since sink committer need to wait for one 
more checkpoint between endOfInput and close, 
it would continue to run after the source and sink writer are finished, until 
we could finish one checkpoint. And since the four operators could also be 
chained in one task, we may also need to consider the case that part of 
operators are finished when taking snapshot in
of the tasks.

   Best,
Yun



--
From:Aljoscha Krettek 
Send Time:2021 Jan. 5 (Tue.) 22:34
To:dev 
Cc:Yun Gao 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

On 2021/01/05 10:16, Arvid Heise wrote:
>1. I'd think that this is an orthogonal issue, which I'd solve separately.
>My gut feeling says that this is something we should only address for new
>sinks where we decouple the semantics of commits and checkpoints
>anyways. @Aljoscha
>Krettek  any idea on this one?

I also think it's somewhat orthogonal, let's see where we land here once 
the other issues are hammered out.

>2. I'm not sure I get it completely. Let's assume we have a source
>partition that is finished before the first checkpoint. Then, we would need
>to store the finished state of the subtask somehow. So I'm assuming, we
>still need to trigger some checkpointing code on finished subtasks.

What he's talking about here is the race condition between a) checkpoint 
coordinator decides to do a checkpoint and b) a source operator shuts 
down.

Normally, the checkpoint coordinator only needs to trigger sources, and 
not intermediate operators. When we allow sources to shut down, 
intermediate operators now can become the "head" of a pipeline and 
become the things that need to be triggered.

One thought here is this: will there ever be intermediate operators that 
should be running that are not connected to at least once source? The 
only case I can think of right now is async I/O. Or are there others? If 
we think that there will never be intermediate operators that are not 
connected to at least once source we might come up with a simpler 
solution.

>3. Do we really want to store the finished flag in OperatorState? I was
>assuming we want to have it more fine-grained on OperatorSubtaskState.
>Maybe we can store the flag inside managed or raw state without changing
>the format?

I think we cannot store it in `OperatorSubtaskState` because of how 
operator state (the actual `ListState` that operators use) is reshuffled 
on restore to all operators. So normally it doesn't make sense to say 
that one of the subtasks is done when operator state is involved. Only 
when all subtasks are done can we record this operator as done, I think.

Best,
Aljoscha



Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Arvid Heise
For 2) the race condition, I was more thinking of still injecting the
barrier at the source in all cases, but having some kind of short-cut to
immediately execute the RPC inside the respective taskmanager. However,
that may prove hard in case of dynamic scale-ins. Nevertheless, because of
this race condition, we should still take some time to think about it as it
effectively means we need to support handling a barrier in a finished task
anyways. Maybe a finished task is still assigned to a TM with JM as a
fallback?

For your question: will there ever be intermediate operators that should be
running that are not connected to at least once source?
I think there are plenty of examples if you go beyond chained operators and
fully connected exchanges. Think of any fan-in, let's assume you have
source S1...S4, with S1+S2->M1, and S3+S4->M2. If S1 is finished, S2 and M1
is still running. Or I didn't get your question ;).

On Tue, Jan 5, 2021 at 5:00 PM Yun Gao  wrote:

>  Hi Aljoscha,
>
>  Very thanks for the feedbacks!
>
>  For the second issue, I'm indeed thinking the race condition
> between deciding to trigger and operator get finished. And for this point,
>
>  >
> One thought here is this: will there ever be intermediate operators that
> > should be running that are not connected to at least once source? The
> > only case I can think of right now is async I/O. Or are there others? If
> > we think that there will never be intermediate operators that are not
> > connected to at least once source we might come up with a simpler
> > solution.
>
>  I think there are still cases that the intermediate operators runs
> with all its sources have finished, for example, source -> sink writer ->
> sink committer -> sink global committer,  since sink committer need to wait
> for one more checkpoint between endOfInput and close,
> it would continue to run after the source and sink writer are finished,
> until we could finish one checkpoint. And since the four operators could
> also be chained in one task, we may also need to consider the case that
> part of operators are finished when taking snapshot in
> of the tasks.
>
>Best,
> Yun
>
>
> --
> From:Aljoscha Krettek 
> Send Time:2021 Jan. 5 (Tue.) 22:34
> To:dev 
> Cc:Yun Gao 
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> On 2021/01/05 10:16, Arvid Heise wrote:
> >1. I'd think that this is an orthogonal issue, which I'd solve separately.
> >My gut feeling says that this is something we should only address for new
> >sinks where we decouple the semantics of commits and checkpoints
> >anyways. @Aljoscha
> >Krettek  any idea on this one?
>
> I also think it's somewhat orthogonal, let's see where we land here once
> the other issues are hammered out.
>
> >2. I'm not sure I get it completely. Let's assume we have a source
>
> >partition that is finished before the first checkpoint. Then, we would need
> >to store the finished state of the subtask somehow. So I'm assuming, we
> >still need to trigger some checkpointing code on finished subtasks.
>
> What he's talking about here is the race condition between a) checkpoint
> coordinator decides to do a checkpoint and b) a source operator shuts
> down.
>
> Normally, the checkpoint coordinator only needs to trigger sources, and
> not intermediate operators. When we allow sources to shut down,
> intermediate operators now can become the "head" of a pipeline and
> become the things that need to be triggered.
>
> One thought here is this: will there ever be intermediate operators that
> should be running that are not connected to at least once source? The
> only case I can think of right now is async I/O. Or are there others? If
> we think that there will never be intermediate operators that are not
> connected to at least once source we might come up with a simpler
> solution.
>
> >3. Do we really want to store the finished flag in OperatorState? I was
> >assuming we want to have it more fine-grained on OperatorSubtaskState.
> >Maybe we can store the flag inside managed or raw state without changing
> >the format?
>
> I think we cannot store it in `OperatorSubtaskState` because of how
> operator state (the actual `ListState` that operators use) is reshuffled
> on restore to all operators. So normally it doesn't make sense to say
> that one of the subtasks is done when operator state is involved. Only
> when all subtasks are done can we record this operator as done, I think.
>
> Best,
> Aljoscha
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert,

Re: Support local aggregate push down for Blink batch planner

2021-01-05 Thread Sebastian Liu
Hi Jinsong,

Thx a lot for your suggestion. These points really need to be clear in the
proposal.

For the semantic problem, I think the main point is the different returned
data types
for the target aggregate function and the row format returned by the
underlying storage.
That's why we provide the producedDataType in the SupportsAggregatePushDown
interface.
Need to let developers know that we need to handle the semantic differences
between
the underlying storage system and Flink in related connectors.
[Supplemented in proposal]

For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's also
a key point.
As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
and better to put it
behind the EnforceLocalXXAggRule. [Supplemented in proposal]

For the scalability of the interface, actually I don't exactly understand
your suggestion. Is it to add
an abstract class, to implement the SupportsAggregatePushDown interface,
and holds the
`List < CallExpression > aggregateExpressions, int[] GroupingFields,
DataType producedDataType`
fields?

Looking forward to your further feedback or guidance.

Jingsong Li  于2021年1月5日周二 下午2:44写道:

> Thanks for your proposal! Sebastian.
>
> +1 for SupportsAggregatePushDown. The above wonderful discussion has solved
> many of my concerns.
>
> ## Semantic problems
>
> We may need to add some mechanisms or comments, because as far as I know,
> the semantics of each database is actually different, which may need to be
> reflected in your specific implementation.
>
> For example, the AVG output types of various databases may be different.
> For example, MySQL outputs double, this is different from Flink. What
> should we do? (Lucky, avg will be splitted into sum and count, But we also
> need care about decimal and others)
>
> ## The phase of push-down rule
>
> I strongly recommend that you do not put it in the Volcano phase, which may
> make the cost calculation very troublesome.
> So in PHYSICAL_REWRITE?
>
> ## About interface
>
> For scalability, I slightly recommend that we introduce an `Aggregate`
> interface, it contains `List aggregateExpressions, int[]
> groupingFields, DataType producedDataType` fields. In this way, we can add
> fields easily without breaking compatibility.
>
> I think the current design is very good, just put forward some ideas.
>
> Best,
> Jingsong
>
> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu 
> wrote:
>
> > Hi Jark,
> >
> > Thx for your further feedback and help. The interface of
> > SupportsAggregatePushDown may indeed need some adjustments.
> >
> > For (1) Agree: Yeah, the upstream only need to know if the TableSource
> can
> > handle all of the aggregates.
> > It's better to just return a boolean type to indicate whether all of
> > aggregates push down was successful or not. [Resolved in proposal]
> >
> > For (2) Agree: The aggOutputDataType represent the produced data type of
> > the new table source to make sure that the new table source can
> > connect with the related exchange node. The format of this
> > aggOutputDataType is groupedFields's type + agg function's return type.
> > The reason for adding this parameter in this function is also to
> facilitate
> > the user to build the final output type. I have changed this parameter
> > to be producedDataType. [Resolved in proposal]
> >
> > For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
> > groupingFields. [Resolved in proposal]
> >
> > Thx again for the suggestion, looking for the further discussion.
> >
> > Jark Wu  于2021年1月5日周二 下午12:05写道:
> >
> > > I'm also +1 for idea#2.
> > >
> > > Regarding to the updated interface,
> > >
> > > Result applyAggregates(List aggregateExpressions,
> > >  int[] groupSet, DataType aggOutputDataType);
> > >
> > > final class Result {
> > >private final List acceptedAggregates;
> > >private final List remainingAggregates;
> > > }
> > >
> > > I have following comments:
> > >
> > > 1) Do we need the composite Result return type? Is a boolean return
> type
> > > enough?
> > > From my understanding, all of the aggregates should be accepted,
> > > otherwise the pushdown should fail.
> > > Therefore, users don't need to distinguish which aggregates are
> > > "accepted".
> > >
> > > 2) Does the `aggOutputDataType` represent the produced data type of the
> > > new source, or just the return type of all the agg functions?
> > > I would prefer to `producedDataType` just like
> > > `SupportsReadingMetadata` to reduce the effort for users to concat a
> > final
> > > output type.
> > > The return type of each agg function can be obtained from the
> > > `CallExpression`.
> > >
> > > 3) What do you think about renaming `groupSet` to `grouping` or
> > > `groupedFields` ?
> > > The `groupSet` may confuse users that it relates to "grouping
> sets".
> > >
> > >
> > > What do you think?
> > >
> > > Best,
> > > Jark
> > >
> > >
> > >
> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young  wrote:
> > >
> > >>

[jira] [Created] (FLINK-20855) Calculating numBuckets exceeds the maximum value of int and got a negative number

2021-01-05 Thread JieFang.He (Jira)
JieFang.He created FLINK-20855:
--

 Summary: Calculating numBuckets exceeds the maximum value of int 
and got a negative number
 Key: FLINK-20855
 URL: https://issues.apache.org/jira/browse/FLINK-20855
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.1, 1.12.0
Reporter: JieFang.He


When i run the TPCDS of 500G,i get a exception
{code:java}
Caused by: java.lang.IllegalArgumentException
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at 
org.apache.flink.table.runtime.hashtable.LongHashPartition.setNewBuckets(LongHashPartition.java:223)
at 
org.apache.flink.table.runtime.hashtable.LongHashPartition.(LongHashPartition.java:176)
at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.buildTableFromSpilledPartition(LongHybridHashTable.java:432)
at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.prepareNextPartition(LongHybridHashTable.java:354)
at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.nextMatching(LongHybridHashTable.java:145)
at LongHashJoinOperator$40166.endInput2$(Unknown Source)
at LongHashJoinOperator$40166.endInput(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:101)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.endHeadOperatorInput(OperatorChain.java:278)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.checkFinished(StreamTwoInputProcessor.java:211)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:183)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:349)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:564)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
{code}
The reason is: when calculate the numBuckets in LongHashPartition,the result 
exceeds the maximum value of int and got a negative number
{code:java}
LongHashPartition(
  LongHybridHashTable longTable,
  int partitionNum,
  BinaryRowDataSerializer buildSideSerializer,
  int bucketNumSegs,
  int recursionLevel,
  List buffers,
  int lastSegmentLimit) {
   this(longTable, buildSideSerializer, listToArray(buffers));
   this.partitionNum = partitionNum;
   this.recursionLevel = recursionLevel;

   int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize / 
16);
   MemorySegment[] buckets = new MemorySegment[bucketNumSegs];
   for (int i = 0; i < bucketNumSegs; i++) {
  buckets[i] = longTable.nextSegment();
   }
   setNewBuckets(buckets, numBuckets);
   this.finalBufferLimit = lastSegmentLimit;
}
{code}
A way to avoid the exception is to adjust the calculation order

change
{code:java}
int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs * segmentSize / 
16);
{code}
to
{code:java}
int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs / 16 * 
segmentSize);
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Support local aggregate push down for Blink batch planner

2021-01-05 Thread Jingsong Li
Hi Sebastian,

Well, I mean:

`boolean applyAggregates(int[] groupingFields, List
aggregateExpressions, DataType producedDataType);`
VS
```
boolean applyAggregates(Aggregation agg);

interface Aggregation {
  int[] groupingFields();
  List aggregateExpressions();
  DataType producedDataType();
}
```

Maybe I've over considered it, but I think Aggregation is a complicated
thing. Maybe we need to extend its parameters in the future, so make the
parameters interface, which is conducive to the future expansion without
destroying the compatibility of user implementation. If it is the way
before, users need to modify the code.

Best,
Jingsong

On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu  wrote:

> Hi Jinsong,
>
> Thx a lot for your suggestion. These points really need to be clear in the
> proposal.
>
> For the semantic problem, I think the main point is the different returned
> data types
> for the target aggregate function and the row format returned by the
> underlying storage.
> That's why we provide the producedDataType in the
> SupportsAggregatePushDown interface.
> Need to let developers know that we need to handle the semantic
> differences between
> the underlying storage system and Flink in related connectors.
> [Supplemented in proposal]
>
> For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's also
> a key point.
> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
> and better to put it
> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>
> For the scalability of the interface, actually I don't exactly understand
> your suggestion. Is it to add
> an abstract class, to implement the SupportsAggregatePushDown interface,
> and holds the
> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
> DataType producedDataType`
> fields?
>
> Looking forward to your further feedback or guidance.
>
> Jingsong Li  于2021年1月5日周二 下午2:44写道:
>
>> Thanks for your proposal! Sebastian.
>>
>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>> solved
>> many of my concerns.
>>
>> ## Semantic problems
>>
>> We may need to add some mechanisms or comments, because as far as I know,
>> the semantics of each database is actually different, which may need to be
>> reflected in your specific implementation.
>>
>> For example, the AVG output types of various databases may be different.
>> For example, MySQL outputs double, this is different from Flink. What
>> should we do? (Lucky, avg will be splitted into sum and count, But we also
>> need care about decimal and others)
>>
>> ## The phase of push-down rule
>>
>> I strongly recommend that you do not put it in the Volcano phase, which
>> may
>> make the cost calculation very troublesome.
>> So in PHYSICAL_REWRITE?
>>
>> ## About interface
>>
>> For scalability, I slightly recommend that we introduce an `Aggregate`
>> interface, it contains `List aggregateExpressions, int[]
>> groupingFields, DataType producedDataType` fields. In this way, we can add
>> fields easily without breaking compatibility.
>>
>> I think the current design is very good, just put forward some ideas.
>>
>> Best,
>> Jingsong
>>
>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu 
>> wrote:
>>
>> > Hi Jark,
>> >
>> > Thx for your further feedback and help. The interface of
>> > SupportsAggregatePushDown may indeed need some adjustments.
>> >
>> > For (1) Agree: Yeah, the upstream only need to know if the TableSource
>> can
>> > handle all of the aggregates.
>> > It's better to just return a boolean type to indicate whether all of
>> > aggregates push down was successful or not. [Resolved in proposal]
>> >
>> > For (2) Agree: The aggOutputDataType represent the produced data type of
>> > the new table source to make sure that the new table source can
>> > connect with the related exchange node. The format of this
>> > aggOutputDataType is groupedFields's type + agg function's return type.
>> > The reason for adding this parameter in this function is also to
>> facilitate
>> > the user to build the final output type. I have changed this parameter
>> > to be producedDataType. [Resolved in proposal]
>> >
>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
>> > groupingFields. [Resolved in proposal]
>> >
>> > Thx again for the suggestion, looking for the further discussion.
>> >
>> > Jark Wu  于2021年1月5日周二 下午12:05写道:
>> >
>> > > I'm also +1 for idea#2.
>> > >
>> > > Regarding to the updated interface,
>> > >
>> > > Result applyAggregates(List aggregateExpressions,
>> > >  int[] groupSet, DataType aggOutputDataType);
>> > >
>> > > final class Result {
>> > >private final List acceptedAggregates;
>> > >private final List remainingAggregates;
>> > > }
>> > >
>> > > I have following comments:
>> > >
>> > > 1) Do we need the composite Result return type? Is a boolean return
>> type
>> > > enough?
>> > > From my understanding, all of the aggregates should be accepted,
>> > > otherwise the pus

[jira] [Created] (FLINK-20856) Separate the implementation of stream window aggregate nodes

2021-01-05 Thread godfrey he (Jira)
godfrey he created FLINK-20856:
--

 Summary: Separate the implementation of stream window aggregate 
nodes
 Key: FLINK-20856
 URL: https://issues.apache.org/jira/browse/FLINK-20856
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.13.0


Stream window aggregate nodes include:
StreamExecGroupWindowAggregate
StreamExecGroupWindowTableAggregate
StreamExecPythonGroupWindowAggregate



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20858) Port StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to Java

2021-01-05 Thread godfrey he (Jira)
godfrey he created FLINK-20858:
--

 Summary: Port StreamExecPythonGroupWindowAggregate and 
BatchExecPythonGroupWindowAggregate to Java
 Key: FLINK-20858
 URL: https://issues.apache.org/jira/browse/FLINK-20858
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20857) Separate the implementation of batch window aggregate nodes

2021-01-05 Thread godfrey he (Jira)
godfrey he created FLINK-20857:
--

 Summary: Separate the implementation of batch window aggregate 
nodes
 Key: FLINK-20857
 URL: https://issues.apache.org/jira/browse/FLINK-20857
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
 Fix For: 1.13.0


Batch window aggregate nodes include:
BatchExecHashWindowAggregate
BatchExecLocalHashWindowAggregate
BatchExecSortWindowAggregate
BatchExecLocalSortWindowAggregate
BatchExecPythonGroupWindowAggregate



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Support local aggregate push down for Blink batch planner

2021-01-05 Thread Jark Wu
I think this may be over designed. We should have confidence in the
interface we design, the interface should be stable.
Wrapping things in a big context has a cost of losing user convenience.
Foremost, we don't see any parameters to add in the future. Do you know any
potential parameters?

Best,
Jark

On Wed, 6 Jan 2021 at 10:28, Jingsong Li  wrote:

> Hi Sebastian,
>
> Well, I mean:
>
> `boolean applyAggregates(int[] groupingFields, List
> aggregateExpressions, DataType producedDataType);`
> VS
> ```
> boolean applyAggregates(Aggregation agg);
>
> interface Aggregation {
>   int[] groupingFields();
>   List aggregateExpressions();
>   DataType producedDataType();
> }
> ```
>
> Maybe I've over considered it, but I think Aggregation is a complicated
> thing. Maybe we need to extend its parameters in the future, so make the
> parameters interface, which is conducive to the future expansion without
> destroying the compatibility of user implementation. If it is the way
> before, users need to modify the code.
>
> Best,
> Jingsong
>
> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu 
> wrote:
>
>> Hi Jinsong,
>>
>> Thx a lot for your suggestion. These points really need to be clear in
>> the proposal.
>>
>> For the semantic problem, I think the main point is the different
>> returned data types
>> for the target aggregate function and the row format returned by the
>> underlying storage.
>> That's why we provide the producedDataType in the
>> SupportsAggregatePushDown interface.
>> Need to let developers know that we need to handle the semantic
>> differences between
>> the underlying storage system and Flink in related connectors.
>> [Supplemented in proposal]
>>
>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's also
>> a key point.
>> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
>> and better to put it
>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>
>> For the scalability of the interface, actually I don't exactly understand
>> your suggestion. Is it to add
>> an abstract class, to implement the SupportsAggregatePushDown interface,
>> and holds the
>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>> DataType producedDataType`
>> fields?
>>
>> Looking forward to your further feedback or guidance.
>>
>> Jingsong Li  于2021年1月5日周二 下午2:44写道:
>>
>>> Thanks for your proposal! Sebastian.
>>>
>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>>> solved
>>> many of my concerns.
>>>
>>> ## Semantic problems
>>>
>>> We may need to add some mechanisms or comments, because as far as I know,
>>> the semantics of each database is actually different, which may need to
>>> be
>>> reflected in your specific implementation.
>>>
>>> For example, the AVG output types of various databases may be different.
>>> For example, MySQL outputs double, this is different from Flink. What
>>> should we do? (Lucky, avg will be splitted into sum and count, But we
>>> also
>>> need care about decimal and others)
>>>
>>> ## The phase of push-down rule
>>>
>>> I strongly recommend that you do not put it in the Volcano phase, which
>>> may
>>> make the cost calculation very troublesome.
>>> So in PHYSICAL_REWRITE?
>>>
>>> ## About interface
>>>
>>> For scalability, I slightly recommend that we introduce an `Aggregate`
>>> interface, it contains `List aggregateExpressions, int[]
>>> groupingFields, DataType producedDataType` fields. In this way, we can
>>> add
>>> fields easily without breaking compatibility.
>>>
>>> I think the current design is very good, just put forward some ideas.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu 
>>> wrote:
>>>
>>> > Hi Jark,
>>> >
>>> > Thx for your further feedback and help. The interface of
>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>> >
>>> > For (1) Agree: Yeah, the upstream only need to know if the TableSource
>>> can
>>> > handle all of the aggregates.
>>> > It's better to just return a boolean type to indicate whether all of
>>> > aggregates push down was successful or not. [Resolved in proposal]
>>> >
>>> > For (2) Agree: The aggOutputDataType represent the produced data type
>>> of
>>> > the new table source to make sure that the new table source can
>>> > connect with the related exchange node. The format of this
>>> > aggOutputDataType is groupedFields's type + agg function's return type.
>>> > The reason for adding this parameter in this function is also to
>>> facilitate
>>> > the user to build the final output type. I have changed this parameter
>>> > to be producedDataType. [Resolved in proposal]
>>> >
>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed to
>>> use
>>> > groupingFields. [Resolved in proposal]
>>> >
>>> > Thx again for the suggestion, looking for the further discussion.
>>> >
>>> > Jark Wu  于2021年1月5日周二 下午12:05写道:
>>> >
>>> > > I'm also +1 for idea#2.
>>> > >
>>> > > Regarding to the updated i

[jira] [Created] (FLINK-20859) java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

2021-01-05 Thread jack sun (Jira)
jack sun created FLINK-20859:


 Summary: java.lang.NoClassDefFoundError: 
org/apache/parquet/avro/AvroParquetWriter
 Key: FLINK-20859
 URL: https://issues.apache.org/jira/browse/FLINK-20859
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.12.0
Reporter: jack sun
 Attachments: 1.png, 2.png

按照flink 1.12 官方 StreamingFileSink 示例,发生运行错误

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
at 
org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:35)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.lang.ClassNotFoundException: 
org.apache.parquet.avro.AvroParquetWriter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-155: Introduce a few convenient operations in Table API

2021-01-05 Thread Dian Fu
Hi all,

I have updated the FLIP about temporal join, sql hints and window TVF.

Regards,
Dian

> 在 2021年1月5日,上午11:58,Dian Fu  写道:
> 
> Thanks a lot for your comments!
> 
> Regarding to Python Table API examples: I thought it should be 
> straightforward about how to use these operations in Python Table API and so 
> have not added them. However, the suggestions make sense to me and I have 
> added some examples about how to use them in Python Table API to make it more 
> clear.
> 
> Regarding to dropDuplicates vs deduplicate: +1 to use deduplicate. It's more 
> consistent with the feature/concept which is already documented clearly in 
> Flink.
> 
> Regarding to `myTable.coalesce($("a"), 1).as("a")`: I'm still in favor of 
> fillna for now. Compared to coalesce, fillna could handle multiple columns in 
> one method call. For the naming convention, the name "fillna/dropna/replace" 
> comes from Pandas [1][2][3].
> 
> Regarding to `event-time/processing-time temporal join, SQL Hints, window 
> TVF`: Good catch! Definitely we should support them in Table API. I will 
> update the FLIP about these functionalities.
> 
> [1] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.fillna.html 
> 
> [2] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.dropna.html 
> 
> [3] 
> https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.replace.html 
> 
>> 在 2021年1月4日,下午10:59,Timo Walther > > 写道:
>> 
>> Hi Dian,
>> 
>> thanks for the proposed FLIP. I haven't taken a deep look at the proposal 
>> yet but will do so shortly. In general, we should aim to make the Table API 
>> as concise and self-explaining as possible. E.g. `dropna` does not sound 
>> obvious to me.
>> 
>> Regarding `myTable.coalesce($("a"), 1).as("a")`: Instead of introducing more 
>> top-level functions, maybe we should also consider introducing more building 
>> blocks e.g. for applying an expression to every column. A more functional 
>> approach (e.g. with lamba function) could solve more use cases.
>> 
>> Regards,
>> Timo
>> 
>> On 04.01.21 15:35, Seth Wiesman wrote:
>>> This makes sense, I have some questions about method names.
>>> What do you think about renaming `dropDuplicates` to `deduplicate`? I don't
>>> think that drop is the right word to use for this operation, it implies
>>> records are filtered where this operator actually issues updates and
>>> retractions. Also, deduplicate is already how we talk about this feature in
>>> the docs so I think it would be easier for users to find.
>>> For null handling, I don't know how close we want to stick with SQL
>>> conventions but what about making `coalesce` a top-level method? Something
>>> like:
>>> myTable.coalesce($("a"), 1).as("a")
>>> We can require the next method to be an `as`. There is already precedent
>>> for this sort of thing, `GroupedTable#aggregate` can only be followed by
>>> `select`.
>>> Seth
>>> On Mon, Jan 4, 2021 at 6:27 AM Wei Zhong >> > wrote:
 Hi Dian,
 
 Big +1 for making the Table API easier to use. Java users and Python users
 can both benefit from it. I think it would be better if we add some Python
 API examples.
 
 Best,
 Wei
 
 
> 在 2021年1月4日,20:03,Dian Fu  > 写道:
> 
> Hi all,
> 
> I'd like to start a discussion about introducing a few convenient
 operations in Table API from the perspective of ease of use.
> 
> Currently some tasks are not easy to express in Table API e.g.
 deduplication, topn, etc, or not easy to express when there are hundreds of
 columns in a table, e.g. null data handling, etc.
> 
> I'd like to propose to introduce a few operations in Table API with the
 following purposes:
> - Make Table API users to easily leverage the powerful features already
 in SQL, e.g. deduplication, topn, etc
> - Provide some convenient operations, e.g. introducing a series of
 operations for null data handling (it may become a problem when there are
 hundreds of columns), data sampling and splitting (which is a very common
 use case in ML which usually needs to split a table into multiple tables
 for training and validation separately).
> 
> Please refer to FLIP-155 [1] for more details.
> 
> Looking forward to your feedback!
> 
> Regards,
> Dian
> 
> [1]
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-155%3A+Introduce+a+few+convenient+operations+in+Table+API
  
 
 
 
>> 
> 



[VOTE] FLIP-153: Support state access in Python DataStream API

2021-01-05 Thread Shuiqiang Chen
Hi devs,

The discussion of the FLIP-153 [1] seems has reached a consensus through
the mailing thread [2]. I would like to start a vote for it.

The vote will be opened until 11th January (72h), unless there is an
objection or no enough votes.

Best,
Shuiqiang

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API

[2]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-153-Support-state-access-in-Python-DataStream-API-td47127.html


Re: Support local aggregate push down for Blink batch planner

2021-01-05 Thread Jingsong Li
Hi Jark,

I don't want to limit this interface to LocalAgg Push down. Actually,
sometimes, we can push whole aggregation to source too.

So, this rule can do something more advanced. For example, we can push down
group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1, f2)".
Then, we need to add more information to push down.

Best,
Jingsong

On Wed, Jan 6, 2021 at 11:02 AM Jark Wu  wrote:

> I think this may be over designed. We should have confidence in the
> interface we design, the interface should be stable.
> Wrapping things in a big context has a cost of losing user convenience.
> Foremost, we don't see any parameters to add in the future. Do you know
> any potential parameters?
>
> Best,
> Jark
>
> On Wed, 6 Jan 2021 at 10:28, Jingsong Li  wrote:
>
>> Hi Sebastian,
>>
>> Well, I mean:
>>
>> `boolean applyAggregates(int[] groupingFields, List
>> aggregateExpressions, DataType producedDataType);`
>> VS
>> ```
>> boolean applyAggregates(Aggregation agg);
>>
>> interface Aggregation {
>>   int[] groupingFields();
>>   List aggregateExpressions();
>>   DataType producedDataType();
>> }
>> ```
>>
>> Maybe I've over considered it, but I think Aggregation is a complicated
>> thing. Maybe we need to extend its parameters in the future, so make the
>> parameters interface, which is conducive to the future expansion without
>> destroying the compatibility of user implementation. If it is the way
>> before, users need to modify the code.
>>
>> Best,
>> Jingsong
>>
>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu 
>> wrote:
>>
>>> Hi Jinsong,
>>>
>>> Thx a lot for your suggestion. These points really need to be clear in
>>> the proposal.
>>>
>>> For the semantic problem, I think the main point is the different
>>> returned data types
>>> for the target aggregate function and the row format returned by the
>>> underlying storage.
>>> That's why we provide the producedDataType in the
>>> SupportsAggregatePushDown interface.
>>> Need to let developers know that we need to handle the semantic
>>> differences between
>>> the underlying storage system and Flink in related connectors.
>>> [Supplemented in proposal]
>>>
>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's also
>>> a key point.
>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
>>> and better to put it
>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>
>>> For the scalability of the interface, actually I don't exactly
>>> understand your suggestion. Is it to add
>>> an abstract class, to implement the SupportsAggregatePushDown interface,
>>> and holds the
>>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>>> DataType producedDataType`
>>> fields?
>>>
>>> Looking forward to your further feedback or guidance.
>>>
>>> Jingsong Li  于2021年1月5日周二 下午2:44写道:
>>>
 Thanks for your proposal! Sebastian.

 +1 for SupportsAggregatePushDown. The above wonderful discussion has
 solved
 many of my concerns.

 ## Semantic problems

 We may need to add some mechanisms or comments, because as far as I
 know,
 the semantics of each database is actually different, which may need to
 be
 reflected in your specific implementation.

 For example, the AVG output types of various databases may be different.
 For example, MySQL outputs double, this is different from Flink. What
 should we do? (Lucky, avg will be splitted into sum and count, But we
 also
 need care about decimal and others)

 ## The phase of push-down rule

 I strongly recommend that you do not put it in the Volcano phase, which
 may
 make the cost calculation very troublesome.
 So in PHYSICAL_REWRITE?

 ## About interface

 For scalability, I slightly recommend that we introduce an `Aggregate`
 interface, it contains `List aggregateExpressions, int[]
 groupingFields, DataType producedDataType` fields. In this way, we can
 add
 fields easily without breaking compatibility.

 I think the current design is very good, just put forward some ideas.

 Best,
 Jingsong

 On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu 
 wrote:

 > Hi Jark,
 >
 > Thx for your further feedback and help. The interface of
 > SupportsAggregatePushDown may indeed need some adjustments.
 >
 > For (1) Agree: Yeah, the upstream only need to know if the
 TableSource can
 > handle all of the aggregates.
 > It's better to just return a boolean type to indicate whether all of
 > aggregates push down was successful or not. [Resolved in proposal]
 >
 > For (2) Agree: The aggOutputDataType represent the produced data type
 of
 > the new table source to make sure that the new table source can
 > connect with the related exchange node. The format of this
 > aggOutputDataType is groupedFields's type + agg function's return
 type

Re: [VOTE] FLIP-153: Support state access in Python DataStream API

2021-01-05 Thread Dian Fu
+1 (binding)

> 在 2021年1月6日,下午1:12,Shuiqiang Chen  写道:
> 
> Hi devs,
> 
> The discussion of the FLIP-153 [1] seems has reached a consensus through
> the mailing thread [2]. I would like to start a vote for it.
> 
> The vote will be opened until 11th January (72h), unless there is an
> objection or no enough votes.
> 
> Best,
> Shuiqiang
> 
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
> 
> [2]:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-153-Support-state-access-in-Python-DataStream-API-td47127.html



[jira] [Created] (FLINK-20860) Allow streaming operators to use managed memory

2021-01-05 Thread Jark Wu (Jira)
Jark Wu created FLINK-20860:
---

 Summary: Allow streaming operators to use managed memory
 Key: FLINK-20860
 URL: https://issues.apache.org/jira/browse/FLINK-20860
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration, Runtime / Task
Reporter: Jark Wu
 Fix For: 1.13.0


We are planning to use some batch algorithms
(sorting & bytes hash table) to improve the performance of streaming SQL
operators, especially for the the mini-batch operators introduced by FLIP-145.

Currently, we have to buffer input records and
accumulators in heap (i.e. Java HashMap) which is not efficient and there
are potential risks of full GC and OOM.
With the managed memory, we can fully use the memory to buffer more data
without worrying about OOM and improve the performance a lot. However, the 
managed memory is not allowed to be used in streaming operators. 

As discussed in the mailing list [1], we have reached a consensus that we can 
extend the configuration {{taskmanager.memory.managed.consumer-weights}} to 
have 2 more options {{OPERATOR}} and {{STATE_BACKEND}}, the available consumer 
options will be :

* `OPERATOR` for both streaming and bath operators
* `STATE_BACKEND` for state backends
* `PYTHON` for python processes
* `DATAPROC` as a legacy key for state backend or batch operators if
`STATE_BACKEND` or `OPERATOR` are not specified.

The previous default value is {{DATAPROC:70,PYTHON:30}}, the new default value 
will be {{OPERATOR:70,STATEBACKEND:70,PYTHON:30}}.

The weight for OPERATOR and STATE_BACKEND will be the same value to align with 
previous behaviors.


[1]: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Allow-streaming-operators-to-use-managed-memory-td47327.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-05 Thread Jark Wu
Thanks all for the discussion.

I have created an issue FLINK-20860 [1] to support this.

In conclusion, we will extend the configuration
`taskmanager.memory.managed.consumer-weights` to have 2 more consumer
kinds: OPERATOR and STATE_BACKEND, the available consumer kinds will be :

* `OPERATOR` for both streaming and bath operators
* `STATE_BACKEND` for state backends
* `PYTHON` for python processes
* `DATAPROC` as a legacy key for state backend or batch operators if
`STATE_BACKEND` or `OPERATOR` are not specified.

The previous default value is DATAPROC:70,PYTHON:30, the new default value
will be OPERATOR:70,STATE_BACKEND:70,PYTHON:30.

The weight for OPERATOR and STATE_BACKEND will be the same value to align
with previous behaviors.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-20860

On Tue, 5 Jan 2021 at 18:35, Xintong Song  wrote:

> >
> > Would the default weight for OPERATOR and STATE_BACKEND be the same
> value?
> >
>
> I would say yes, to align with previous behaviors.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann  wrote:
>
> > +1 for Jark's and Xintong's proposal.
> >
> > Would the default weight for OPERATOR and STATE_BACKEND be the same
> value?
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li 
> wrote:
> >
> > > +1 for allowing streaming operators to use managed memory.
> > >
> > > The memory use of streams requires some hierarchy, and the bottom layer
> > is
> > > undoubtedly the current StateBackend.
> > > Let the stream operators freely use the managed memory, which will make
> > the
> > > memory management model to be unified and give the operator free space.
> > >
> > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> > > `STATE_BACKEND` or `OPERATOR`.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu  wrote:
> > >
> > > > +1 to Xingtong's proposal!
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song 
> > wrote:
> > > >
> > > > > +1 for allowing streaming operators to use managed memory.
> > > > >
> > > > > As for the consumer names, I'm afraid using `DATAPROC` for both
> > > streaming
> > > > > ops and state backends will not work. Currently, RocksDB state
> > backend
> > > > uses
> > > > > a shared piece of memory for all the states within that slot. It's
> > not
> > > > the
> > > > > operator's decision how much memory it uses for the states.
> > > > >
> > > > > I would suggest the following. (IIUC, the same as what Jark
> proposed)
> > > > > * `OPERATOR` for both streaming and bath operators
> > > > > * `STATE_BACKEND` for state backends
> > > > > * `PYTHON` for python processes
> > > > > * `DATAPROC` as a legacy key for state backend or batch operators
> if
> > > > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu  wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > > > `STATE_BACKEND`, because they have different scope (slot vs.
> > > operator).
> > > > > > But @Xintong Song  may have more insights
> > on
> > > > it.
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > >
> > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <
> aljos...@apache.org
> > >
> > > > > wrote:
> > > > > >
> > > > > >> I agree, we should allow streaming operators to use managed
> memory
> > > for
> > > > > >> other use cases.
> > > > > >>
> > > > > >> Do you think we need an additional "consumer" setting or that
> they
> > > > would
> > > > > >> just use `DATAPROC` and decide by themselves what to use the
> > memory
> > > > for?
> > > > > >>
> > > > > >> Best,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >> On 2020/12/22 17:14, Jark Wu wrote:
> > > > > >> >Hi all,
> > > > > >> >
> > > > > >> >I found that currently the managed memory can only be used in 3
> > > > > workloads
> > > > > >> >[1]:
> > > > > >> >- state backends for streaming jobs
> > > > > >> >- sorting, hash tables for batch jobs
> > > > > >> >- python UDFs
> > > > > >> >
> > > > > >> >And the configuration option
> > > > > >> `taskmanager.memory.managed.consumer-weights`
> > > > > >> >only allows values: PYTHON and DATAPROC (state in streaming or
> > > > > algorithms
> > > > > >> >in batch).
> > > > > >> >I'm confused why it doesn't allow streaming operators to use
> > > managed
> > > > > >> memory
> > > > > >> >for purposes other than state backends.
> > > > > >> >
> > > > > >> >The background is that we are planning to use some batch
> > algorithms
> > > > > >> >(sorting & bytes hash table) to improve the performance of
> > > streaming
> > > > > SQL
> > > > > >> >operators, especially for the mini-batch operators.
> > > > > >> >Currently, the mini-batch operators are buffering input records
> > and
> > > > > >> >accumulat

Re: Support local aggregate push down for Blink batch planner

2021-01-05 Thread Jingsong Li
Hi,

I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
1)). Can we push it down? I'm not sure that a single call expression can
express it, and how we should embody it and convey it to users.

Best,
Jingsong

On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li  wrote:

> Hi Jark,
>
> I don't want to limit this interface to LocalAgg Push down. Actually,
> sometimes, we can push whole aggregation to source too.
>
> So, this rule can do something more advanced. For example, we can push
> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
> f2)". Then, we need to add more information to push down.
>
> Best,
> Jingsong
>
> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu  wrote:
>
>> I think this may be over designed. We should have confidence in the
>> interface we design, the interface should be stable.
>> Wrapping things in a big context has a cost of losing user convenience.
>> Foremost, we don't see any parameters to add in the future. Do you know
>> any potential parameters?
>>
>> Best,
>> Jark
>>
>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li  wrote:
>>
>>> Hi Sebastian,
>>>
>>> Well, I mean:
>>>
>>> `boolean applyAggregates(int[] groupingFields, List
>>> aggregateExpressions, DataType producedDataType);`
>>> VS
>>> ```
>>> boolean applyAggregates(Aggregation agg);
>>>
>>> interface Aggregation {
>>>   int[] groupingFields();
>>>   List aggregateExpressions();
>>>   DataType producedDataType();
>>> }
>>> ```
>>>
>>> Maybe I've over considered it, but I think Aggregation is a complicated
>>> thing. Maybe we need to extend its parameters in the future, so make the
>>> parameters interface, which is conducive to the future expansion without
>>> destroying the compatibility of user implementation. If it is the way
>>> before, users need to modify the code.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu 
>>> wrote:
>>>
 Hi Jinsong,

 Thx a lot for your suggestion. These points really need to be clear in
 the proposal.

 For the semantic problem, I think the main point is the different
 returned data types
 for the target aggregate function and the row format returned by the
 underlying storage.
 That's why we provide the producedDataType in the
 SupportsAggregatePushDown interface.
 Need to let developers know that we need to handle the semantic
 differences between
 the underlying storage system and Flink in related connectors.
 [Supplemented in proposal]

 For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's 
 also
 a key point.
 As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
 and better to put it
 behind the EnforceLocalXXAggRule. [Supplemented in proposal]

 For the scalability of the interface, actually I don't exactly
 understand your suggestion. Is it to add
 an abstract class, to implement the SupportsAggregatePushDown
 interface, and holds the
 `List < CallExpression > aggregateExpressions, int[] GroupingFields,
 DataType producedDataType`
 fields?

 Looking forward to your further feedback or guidance.

 Jingsong Li  于2021年1月5日周二 下午2:44写道:

> Thanks for your proposal! Sebastian.
>
> +1 for SupportsAggregatePushDown. The above wonderful discussion has
> solved
> many of my concerns.
>
> ## Semantic problems
>
> We may need to add some mechanisms or comments, because as far as I
> know,
> the semantics of each database is actually different, which may need
> to be
> reflected in your specific implementation.
>
> For example, the AVG output types of various databases may be
> different.
> For example, MySQL outputs double, this is different from Flink. What
> should we do? (Lucky, avg will be splitted into sum and count, But we
> also
> need care about decimal and others)
>
> ## The phase of push-down rule
>
> I strongly recommend that you do not put it in the Volcano phase,
> which may
> make the cost calculation very troublesome.
> So in PHYSICAL_REWRITE?
>
> ## About interface
>
> For scalability, I slightly recommend that we introduce an `Aggregate`
> interface, it contains `List aggregateExpressions,
> int[]
> groupingFields, DataType producedDataType` fields. In this way, we can
> add
> fields easily without breaking compatibility.
>
> I think the current design is very good, just put forward some ideas.
>
> Best,
> Jingsong
>
> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu 
> wrote:
>
> > Hi Jark,
> >
> > Thx for your further feedback and help. The interface of
> > SupportsAggregatePushDown may indeed need some adjustments.
> >
> > For (1) Agree: Yeah, the upstream only need to know if the
> TableSource can
> > handle all of the aggregates.
> > It's

Task scheduling of Flink

2021-01-05 Thread penguin.
Hello! Do you know how to modify the task scheduling method of Flink?

Re: [VOTE] FLIP-153: Support state access in Python DataStream API

2021-01-05 Thread Xingbo Huang
+1 (non-binding)

Best,
Xingbo

Dian Fu  于2021年1月6日周三 下午1:38写道:

> +1 (binding)
>
> > 在 2021年1月6日,下午1:12,Shuiqiang Chen  写道:
> >
> > Hi devs,
> >
> > The discussion of the FLIP-153 [1] seems has reached a consensus through
> > the mailing thread [2]. I would like to start a vote for it.
> >
> > The vote will be opened until 11th January (72h), unless there is an
> > objection or no enough votes.
> >
> > Best,
> > Shuiqiang
> >
> > [1]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
> >
> > [2]:
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-153-Support-state-access-in-Python-DataStream-API-td47127.html
>
>


Re: [VOTE] FLIP-153: Support state access in Python DataStream API

2021-01-05 Thread Wei Zhong
+1 (non-binding)

Best,
Wei

> 在 2021年1月6日,14:05,Xingbo Huang  写道:
> 
> +1 (non-binding)
> 
> Best,
> Xingbo
> 
> Dian Fu  于2021年1月6日周三 下午1:38写道:
> 
>> +1 (binding)
>> 
>>> 在 2021年1月6日,下午1:12,Shuiqiang Chen  写道:
>>> 
>>> Hi devs,
>>> 
>>> The discussion of the FLIP-153 [1] seems has reached a consensus through
>>> the mailing thread [2]. I would like to start a vote for it.
>>> 
>>> The vote will be opened until 11th January (72h), unless there is an
>>> objection or no enough votes.
>>> 
>>> Best,
>>> Shuiqiang
>>> 
>>> [1]:
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>> 
>>> [2]:
>>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-153-Support-state-access-in-Python-DataStream-API-td47127.html
>> 
>> 



Re: Support local aggregate push down for Blink batch planner

2021-01-05 Thread Jark Wu
I think filter expressions and grouping sets are semantic arguments instead
of utilities.
If we want to push them into sources, the connector developers should be
aware of them.
Wrapping them in a context implicitly is error-prone that the existing
connector will produce wrong results
 when upgrading to new Flink versions (as we are pushing
grouping_sets/filter_args, but connector ignores it).
I think for these cases, providing a new default method to override might
be a better choice.

Best,
Jark

On Wed, 6 Jan 2021 at 13:56, Jingsong Li  wrote:

> Hi,
>
> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
> 1)). Can we push it down? I'm not sure that a single call expression can
> express it, and how we should embody it and convey it to users.
>
> Best,
> Jingsong
>
> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li  wrote:
>
>> Hi Jark,
>>
>> I don't want to limit this interface to LocalAgg Push down. Actually,
>> sometimes, we can push whole aggregation to source too.
>>
>> So, this rule can do something more advanced. For example, we can push
>> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
>> f2)". Then, we need to add more information to push down.
>>
>> Best,
>> Jingsong
>>
>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu  wrote:
>>
>>> I think this may be over designed. We should have confidence in the
>>> interface we design, the interface should be stable.
>>> Wrapping things in a big context has a cost of losing user convenience.
>>> Foremost, we don't see any parameters to add in the future. Do you know
>>> any potential parameters?
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li  wrote:
>>>
 Hi Sebastian,

 Well, I mean:

 `boolean applyAggregates(int[] groupingFields, List
 aggregateExpressions, DataType producedDataType);`
 VS
 ```
 boolean applyAggregates(Aggregation agg);

 interface Aggregation {
   int[] groupingFields();
   List aggregateExpressions();
   DataType producedDataType();
 }
 ```

 Maybe I've over considered it, but I think Aggregation is a complicated
 thing. Maybe we need to extend its parameters in the future, so make the
 parameters interface, which is conducive to the future expansion without
 destroying the compatibility of user implementation. If it is the way
 before, users need to modify the code.

 Best,
 Jingsong

 On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu 
 wrote:

> Hi Jinsong,
>
> Thx a lot for your suggestion. These points really need to be clear in
> the proposal.
>
> For the semantic problem, I think the main point is the different
> returned data types
> for the target aggregate function and the row format returned by the
> underlying storage.
> That's why we provide the producedDataType in the
> SupportsAggregatePushDown interface.
> Need to let developers know that we need to handle the semantic
> differences between
> the underlying storage system and Flink in related connectors.
> [Supplemented in proposal]
>
> For the phase of the new PushLocalAggIntoTableSourceScanRule rule,
> it's also a key point.
> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
> and better to put it
> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>
> For the scalability of the interface, actually I don't exactly
> understand your suggestion. Is it to add
> an abstract class, to implement the SupportsAggregatePushDown
> interface, and holds the
> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
> DataType producedDataType`
> fields?
>
> Looking forward to your further feedback or guidance.
>
> Jingsong Li  于2021年1月5日周二 下午2:44写道:
>
>> Thanks for your proposal! Sebastian.
>>
>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>> solved
>> many of my concerns.
>>
>> ## Semantic problems
>>
>> We may need to add some mechanisms or comments, because as far as I
>> know,
>> the semantics of each database is actually different, which may need
>> to be
>> reflected in your specific implementation.
>>
>> For example, the AVG output types of various databases may be
>> different.
>> For example, MySQL outputs double, this is different from Flink. What
>> should we do? (Lucky, avg will be splitted into sum and count, But we
>> also
>> need care about decimal and others)
>>
>> ## The phase of push-down rule
>>
>> I strongly recommend that you do not put it in the Volcano phase,
>> which may
>> make the cost calculation very troublesome.
>> So in PHYSICAL_REWRITE?
>>
>> ## About interface
>>
>> For scalability, I slightly recommend that we introduce an `Aggregate`
>> interface, it

Re: Support local aggregate push down for Blink batch planner

2021-01-05 Thread Jingsong Li
> I think filter expressions and grouping sets are semantic arguments
instead of utilities. If we want to push them into sources, the connector
developers should be aware of them.Wrapping them in a context implicitly is
error-prone that the existing connector will produce wrong results when
upgrading to new Flink versions.

We can have some mechanism to check the upgrading.

> I think for these cases, providing a new default method to override might
be a better choice.

Then we will have three or more methods. For the API level, I really don't
like it...

Best,
Jingsong

On Wed, Jan 6, 2021 at 2:10 PM Jark Wu  wrote:

> I think filter expressions and grouping sets are semantic arguments
> instead of utilities.
> If we want to push them into sources, the connector developers should be
> aware of them.
> Wrapping them in a context implicitly is error-prone that the existing
> connector will produce wrong results
>  when upgrading to new Flink versions (as we are pushing
> grouping_sets/filter_args, but connector ignores it).
> I think for these cases, providing a new default method to override might
> be a better choice.
>
> Best,
> Jark
>
> On Wed, 6 Jan 2021 at 13:56, Jingsong Li  wrote:
>
>> Hi,
>>
>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
>> 1)). Can we push it down? I'm not sure that a single call expression can
>> express it, and how we should embody it and convey it to users.
>>
>> Best,
>> Jingsong
>>
>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li 
>> wrote:
>>
>>> Hi Jark,
>>>
>>> I don't want to limit this interface to LocalAgg Push down. Actually,
>>> sometimes, we can push whole aggregation to source too.
>>>
>>> So, this rule can do something more advanced. For example, we can push
>>> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
>>> f2)". Then, we need to add more information to push down.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu  wrote:
>>>
 I think this may be over designed. We should have confidence in the
 interface we design, the interface should be stable.
 Wrapping things in a big context has a cost of losing user convenience.
 Foremost, we don't see any parameters to add in the future. Do you know
 any potential parameters?

 Best,
 Jark

 On Wed, 6 Jan 2021 at 10:28, Jingsong Li 
 wrote:

> Hi Sebastian,
>
> Well, I mean:
>
> `boolean applyAggregates(int[] groupingFields, List
> aggregateExpressions, DataType producedDataType);`
> VS
> ```
> boolean applyAggregates(Aggregation agg);
>
> interface Aggregation {
>   int[] groupingFields();
>   List aggregateExpressions();
>   DataType producedDataType();
> }
> ```
>
> Maybe I've over considered it, but I think Aggregation is a
> complicated thing. Maybe we need to extend its parameters in the future, 
> so
> make the parameters interface, which is conducive to the future expansion
> without destroying the compatibility of user implementation. If it is the
> way before, users need to modify the code.
>
> Best,
> Jingsong
>
> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu 
> wrote:
>
>> Hi Jinsong,
>>
>> Thx a lot for your suggestion. These points really need to be clear
>> in the proposal.
>>
>> For the semantic problem, I think the main point is the different
>> returned data types
>> for the target aggregate function and the row format returned by the
>> underlying storage.
>> That's why we provide the producedDataType in the
>> SupportsAggregatePushDown interface.
>> Need to let developers know that we need to handle the semantic
>> differences between
>> the underlying storage system and Flink in related connectors.
>> [Supplemented in proposal]
>>
>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule,
>> it's also a key point.
>> As you suggested, we should put it into the PHYSICAL_REWRITE rule
>> set, and better to put it
>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>
>> For the scalability of the interface, actually I don't exactly
>> understand your suggestion. Is it to add
>> an abstract class, to implement the SupportsAggregatePushDown
>> interface, and holds the
>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>> DataType producedDataType`
>> fields?
>>
>> Looking forward to your further feedback or guidance.
>>
>> Jingsong Li  于2021年1月5日周二 下午2:44写道:
>>
>>> Thanks for your proposal! Sebastian.
>>>
>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>>> solved
>>> many of my concerns.
>>>
>>> ## Semantic problems
>>>
>>> We may need to add some mechanisms or comments, because as far as I
>>> know,
>>> the semantics of 

Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-05 Thread Yun Tang
I think using managed memory within streaming operator is a good idea and I 
just have a question over last conclusion:

If both OPERATOR and STATE_BACKEND set as 70 to align with previous behavior, 
what will happen if one slot has both consumers of managed streaming operator 
and state backend?

As you can see previous DATAPROC + PYTHON = 100, which describes the situation 
when one slot has both consumers of managed python and state backend.

Best
Yun Tang

From: Jark Wu 
Sent: Wednesday, January 6, 2021 13:51
To: dev 
Subject: Re: [DISCUSS] Allow streaming operators to use managed memory

Thanks all for the discussion.

I have created an issue FLINK-20860 [1] to support this.

In conclusion, we will extend the configuration
`taskmanager.memory.managed.consumer-weights` to have 2 more consumer
kinds: OPERATOR and STATE_BACKEND, the available consumer kinds will be :

* `OPERATOR` for both streaming and bath operators
* `STATE_BACKEND` for state backends
* `PYTHON` for python processes
* `DATAPROC` as a legacy key for state backend or batch operators if
`STATE_BACKEND` or `OPERATOR` are not specified.

The previous default value is DATAPROC:70,PYTHON:30, the new default value
will be OPERATOR:70,STATE_BACKEND:70,PYTHON:30.

The weight for OPERATOR and STATE_BACKEND will be the same value to align
with previous behaviors.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-20860

On Tue, 5 Jan 2021 at 18:35, Xintong Song  wrote:

> >
> > Would the default weight for OPERATOR and STATE_BACKEND be the same
> value?
> >
>
> I would say yes, to align with previous behaviors.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann  wrote:
>
> > +1 for Jark's and Xintong's proposal.
> >
> > Would the default weight for OPERATOR and STATE_BACKEND be the same
> value?
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li 
> wrote:
> >
> > > +1 for allowing streaming operators to use managed memory.
> > >
> > > The memory use of streams requires some hierarchy, and the bottom layer
> > is
> > > undoubtedly the current StateBackend.
> > > Let the stream operators freely use the managed memory, which will make
> > the
> > > memory management model to be unified and give the operator free space.
> > >
> > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> > > `STATE_BACKEND` or `OPERATOR`.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu  wrote:
> > >
> > > > +1 to Xingtong's proposal!
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song 
> > wrote:
> > > >
> > > > > +1 for allowing streaming operators to use managed memory.
> > > > >
> > > > > As for the consumer names, I'm afraid using `DATAPROC` for both
> > > streaming
> > > > > ops and state backends will not work. Currently, RocksDB state
> > backend
> > > > uses
> > > > > a shared piece of memory for all the states within that slot. It's
> > not
> > > > the
> > > > > operator's decision how much memory it uses for the states.
> > > > >
> > > > > I would suggest the following. (IIUC, the same as what Jark
> proposed)
> > > > > * `OPERATOR` for both streaming and bath operators
> > > > > * `STATE_BACKEND` for state backends
> > > > > * `PYTHON` for python processes
> > > > > * `DATAPROC` as a legacy key for state backend or batch operators
> if
> > > > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu  wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > > > `STATE_BACKEND`, because they have different scope (slot vs.
> > > operator).
> > > > > > But @Xintong Song  may have more insights
> > on
> > > > it.
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > >
> > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <
> aljos...@apache.org
> > >
> > > > > wrote:
> > > > > >
> > > > > >> I agree, we should allow streaming operators to use managed
> memory
> > > for
> > > > > >> other use cases.
> > > > > >>
> > > > > >> Do you think we need an additional "consumer" setting or that
> they
> > > > would
> > > > > >> just use `DATAPROC` and decide by themselves what to use the
> > memory
> > > > for?
> > > > > >>
> > > > > >> Best,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >> On 2020/12/22 17:14, Jark Wu wrote:
> > > > > >> >Hi all,
> > > > > >> >
> > > > > >> >I found that currently the managed memory can only be used in 3
> > > > > workloads
> > > > > >> >[1]:
> > > > > >> >- state backends for streaming jobs
> > > > > >> >- sorting, hash tables for batch jobs
> > > > > >> >- python UDFs
> > > > > >> >
> > > > > >> >And the configuration option
> > > > > >> `taskmanager.memory.managed.consumer-weights`
> > > > > >> >only allows values:

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

2021-01-05 Thread Yun Tang
The design looks good to me now. +1 to start the vote if there are no more 
comments..

Best
Yun Tang

From: Dian Fu 
Sent: Tuesday, January 5, 2021 13:32
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Thanks Shuiqiang for driving this.

The design looks good to me. +1 to start the vote if there are no more comments.

Regards,
Dian

> 在 2021年1月4日,下午7:40,Shuiqiang Chen  写道:
>
> Hi Yu,
>
> Thanks a lot for your suggestions.
>
> I have addressed your inlined comments in the FLIP and also added a new
> section "State backed access synchronization" that explains the way to make
> sure there is no concurrent access to the state backend. Please have a look.
>
> Best,
> Shuiqiang
>
>
> Yu Li  于2021年1月4日周一 下午4:15写道:
>
>> Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.
>>
>> *bq. However, all the state access will be synchronized in the Java
>> operator and so there will be no concurrent access to the state backend.*
>> Could you add a section to explicitly mention this in the FLIP document? I
>> think single-threaded state access is an important prerequisite and it's
>> important for later contributors to know about this clearly, from both the
>> design doc and source codes.
>>
>> The other parts LGTM, added some minor inline comments in the FLIP, please
>> take a look.
>>
>> Thanks.
>>
>> Best Regards,
>> Yu
>>
>>
>> On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen  wrote:
>>
>>> Hi wei,
>>>
>>> Big thanks for pointing out the mistakes! I have updated the FLIP
>>> according to your suggestions.
>>>
>>> Best,
>>> Shuiqiang
>>>
 在 2020年12月18日,下午2:37,Wei Zhong  写道:

 Hi Shuiqiang,

 Thanks for driving this. +1 for this feature, just a minor comment to
>>> the design doc.

 The interface of the `AppendingState` should be:

 class AppendingState(State, Generic[IN, OUT]):

  @abstractmethod
  def get(self) -> OUT:
  pass

  @abstractmethod
  def add(self, value: IN) -> None:
  pass

 The output type and the input type of the `AppendingState` maybe
>>> different. And the definition of the child classes should be:

 class MergingState(AppendingState[IN, OUT]):
   pass


 class ListState(MergingState[T, Iterable[T]]):

  @abstractmethod
  def update(self, values: List[T]) -> None:
  pass

  @abstractmethod
  def add_all(self, values: List[T]) -> None:
  pass

  def __iter__(self) -> Iterator[T]:
  return iter(self.get())

 Best,
 Wei

> 在 2020年12月17日,21:06,Shuiqiang Chen  写道:
>
> Hi Yun,
>
> Highly appreciate for your questions! I have the corresponding answers
>>> as bellow:
>
> Re 1: You are right that the state access occurs in an async thread.
>>> However, all the state access will be synchrouzed in the Java operator
>> and
>>> so there will be no concurrent access to the state backend.
>
> Re 2: I think it could be handled well in Python DataStream API. In
>>> this case, there will be two operators and so also two keyed state
>> backend.
>
> Re 3: Sure, you are right. We will store the current key which may be
>>> used by the timer.
>
> Re 4: Good point. State migration is still not covered in the current
>>> FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal
>> to
>>> this FLIP. I have updated the FLIP and added clear description for this.
>
> Re 5: Good point. We may need to introduce a Python querable state
>>> client if we want to support Queryable state for Python operators. I'd
>> like
>>> to cover it in a separate FLIP. I have updated the FLIP and add it as a
>>> future work.
>
> Best,
> Shuiqiang
>
>> 在 2020年12月17日,下午12:08,Yun Tang  写道:
>>
>> Hi Shuiqiang,
>>
>> Thanks for driving this. I have several questions below:
>>
>>
>> 1.  Thread safety of state write-access. As you might know, state
>>> access is not thread-safe [1] in Flink, we depend on task single thread
>>> access. Since you change the state access to another async thread, can we
>>> still ensure this? It also includes not allow user to access state in its
>>> java operator along with the bundled python operator.
>> 2.  Number of keyed state backend per task. Flink would only have one
>>> keyed state-backend per operator and would only have one keyed state
>>> backend per operator chain (in the head operator if possible). However,
>>> once we use experimental features such as reinterpretAsKeyedStream [2],
>> we
>>> could have two keyed state-backend in one operator chain within one task.
>>> Can python datastream API could handle this well?
>> 3.  Time to set current key. As we still need current key when
>>> registering timer [3], we need some place to hole the current key even
>> not
>>> registered in keyed state backend

[jira] [Created] (FLINK-20861) Provide an option for serializing DECIMALs in JSON as plain number instead of scientific notation

2021-01-05 Thread Q Kang (Jira)
Q Kang created FLINK-20861:
--

 Summary: Provide an option for serializing DECIMALs in JSON as 
plain number instead of scientific notation
 Key: FLINK-20861
 URL: https://issues.apache.org/jira/browse/FLINK-20861
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Q Kang


When using DECIMAL types in Flink SQL along with JSON format, it is quite 
common to see that some large values are written out as scientific notation. 
For example:

Definition: `orderId DECIMAL(20, 0)`

Input: `\{"orderId":454621864049246170}`

Output (without transformations): `\{"orderId":4.5462186404924617E+17}`

However, values in plain numbers are easier to understand and more convenient 
for the case shown above. So we can provide a boolean option (say 
`json.use-plain-decimals`?) to make this behavior tunable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-153: Support state access in Python DataStream API

2021-01-05 Thread Yun Tang
+1 (binding)

Best
Yun Tang

From: Wei Zhong 
Sent: Wednesday, January 6, 2021 14:07
To: dev 
Subject: Re: [VOTE] FLIP-153: Support state access in Python DataStream API

+1 (non-binding)

Best,
Wei

> 在 2021年1月6日,14:05,Xingbo Huang  写道:
>
> +1 (non-binding)
>
> Best,
> Xingbo
>
> Dian Fu  于2021年1月6日周三 下午1:38写道:
>
>> +1 (binding)
>>
>>> 在 2021年1月6日,下午1:12,Shuiqiang Chen  写道:
>>>
>>> Hi devs,
>>>
>>> The discussion of the FLIP-153 [1] seems has reached a consensus through
>>> the mailing thread [2]. I would like to start a vote for it.
>>>
>>> The vote will be opened until 11th January (72h), unless there is an
>>> objection or no enough votes.
>>>
>>> Best,
>>> Shuiqiang
>>>
>>> [1]:
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>>
>>> [2]:
>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-153-Support-state-access-in-Python-DataStream-API-td47127.html
>>
>>



Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-05 Thread Xintong Song
Thanks for driving the discussion, @Jark. The conclusion LGTM.

@Yun,
Since the streaming operators did not use managed memory previously, I
don't think it's possible for any use cases with managed memory streaming
operators to align with the previous behaviors.
No matter how the consumer weights are configured, some of the managed
memory has to be transferred from the previous consumers to the new managed
memory streaming operators.

Thank you~

Xintong Song



On Wed, Jan 6, 2021 at 2:38 PM Yun Tang  wrote:

> I think using managed memory within streaming operator is a good idea and
> I just have a question over last conclusion:
>
> If both OPERATOR and STATE_BACKEND set as 70 to align with previous
> behavior, what will happen if one slot has both consumers of managed
> streaming operator and state backend?
>
> As you can see previous DATAPROC + PYTHON = 100, which describes the
> situation when one slot has both consumers of managed python and state
> backend.
>
> Best
> Yun Tang
> 
> From: Jark Wu 
> Sent: Wednesday, January 6, 2021 13:51
> To: dev 
> Subject: Re: [DISCUSS] Allow streaming operators to use managed memory
>
> Thanks all for the discussion.
>
> I have created an issue FLINK-20860 [1] to support this.
>
> In conclusion, we will extend the configuration
> `taskmanager.memory.managed.consumer-weights` to have 2 more consumer
> kinds: OPERATOR and STATE_BACKEND, the available consumer kinds will be :
>
> * `OPERATOR` for both streaming and bath operators
> * `STATE_BACKEND` for state backends
> * `PYTHON` for python processes
> * `DATAPROC` as a legacy key for state backend or batch operators if
> `STATE_BACKEND` or `OPERATOR` are not specified.
>
> The previous default value is DATAPROC:70,PYTHON:30, the new default value
> will be OPERATOR:70,STATE_BACKEND:70,PYTHON:30.
>
> The weight for OPERATOR and STATE_BACKEND will be the same value to align
> with previous behaviors.
>
> Best,
> Jark
>
> [1]: https://issues.apache.org/jira/browse/FLINK-20860
>
> On Tue, 5 Jan 2021 at 18:35, Xintong Song  wrote:
>
> > >
> > > Would the default weight for OPERATOR and STATE_BACKEND be the same
> > value?
> > >
> >
> > I would say yes, to align with previous behaviors.
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann 
> wrote:
> >
> > > +1 for Jark's and Xintong's proposal.
> > >
> > > Would the default weight for OPERATOR and STATE_BACKEND be the same
> > value?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li 
> > wrote:
> > >
> > > > +1 for allowing streaming operators to use managed memory.
> > > >
> > > > The memory use of streams requires some hierarchy, and the bottom
> layer
> > > is
> > > > undoubtedly the current StateBackend.
> > > > Let the stream operators freely use the managed memory, which will
> make
> > > the
> > > > memory management model to be unified and give the operator free
> space.
> > > >
> > > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> > > > `STATE_BACKEND` or `OPERATOR`.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu  wrote:
> > > >
> > > > > +1 to Xingtong's proposal!
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song 
> > > wrote:
> > > > >
> > > > > > +1 for allowing streaming operators to use managed memory.
> > > > > >
> > > > > > As for the consumer names, I'm afraid using `DATAPROC` for both
> > > > streaming
> > > > > > ops and state backends will not work. Currently, RocksDB state
> > > backend
> > > > > uses
> > > > > > a shared piece of memory for all the states within that slot.
> It's
> > > not
> > > > > the
> > > > > > operator's decision how much memory it uses for the states.
> > > > > >
> > > > > > I would suggest the following. (IIUC, the same as what Jark
> > proposed)
> > > > > > * `OPERATOR` for both streaming and bath operators
> > > > > > * `STATE_BACKEND` for state backends
> > > > > > * `PYTHON` for python processes
> > > > > > * `DATAPROC` as a legacy key for state backend or batch operators
> > if
> > > > > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > > > > >
> > > > > > Thank you~
> > > > > >
> > > > > > Xintong Song
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu 
> wrote:
> > > > > >
> > > > > > > Hi Aljoscha,
> > > > > > >
> > > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > > > > `STATE_BACKEND`, because they have different scope (slot vs.
> > > > operator).
> > > > > > > But @Xintong Song  may have more
> insights
> > > on
> > > > > it.
> > > > > > >
> > > > > > > Best,
> > > > > > > Jark
> > > > > > >
> > > > > > >
> > > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <
> > aljos...@apache.org
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> I agree, we should allow streaming operators to use managed
>