Re: Kafka producer exactly once

2021-01-08 Thread Till Rohrmann
Hi Pramod,

Flink's Kafka connector uses transactions in order to support exactly once
semantic.

Cheers,
Till

On Thu, Jan 7, 2021 at 11:17 PM Pramod Immaneni  wrote:

> Is there a Kafka producer that can do exactly once semantic without the use
> of transactions?
>
> Thanks
>


Re: Kafka producer exactly once

2021-01-08 Thread Piotr Nowojski
Hi Pramod,

Moreover I don't think there is a way to implement exactly once producer
without some use of transactions one way or another.

Best,
Piotrek

pt., 8 sty 2021 o 09:34 Till Rohrmann  napisał(a):

> Hi Pramod,
>
> Flink's Kafka connector uses transactions in order to support exactly once
> semantic.
>
> Cheers,
> Till
>
> On Thu, Jan 7, 2021 at 11:17 PM Pramod Immaneni  wrote:
>
> > Is there a Kafka producer that can do exactly once semantic without the
> use
> > of transactions?
> >
> > Thanks
> >
>


[jira] [Created] (FLINK-20894) [Local Agg Pushdown] Introduce SupportsAggregatePushDown interface

2021-01-08 Thread Sebastian Liu (Jira)
Sebastian Liu created FLINK-20894:
-

 Summary: [Local Agg Pushdown] Introduce SupportsAggregatePushDown 
interface
 Key: FLINK-20894
 URL: https://issues.apache.org/jira/browse/FLINK-20894
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Sebastian Liu


Will introduce the SupportsAggregatePushDown interface for local agg pushdown



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20895) [Local Agg Pushdown] Support LocalAggregatePushDown in Blink planner

2021-01-08 Thread Sebastian Liu (Jira)
Sebastian Liu created FLINK-20895:
-

 Summary: [Local Agg Pushdown] Support LocalAggregatePushDown in 
Blink planner
 Key: FLINK-20895
 URL: https://issues.apache.org/jira/browse/FLINK-20895
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Sebastian Liu


Will add related rule to support LocalAggregatePushDown in Blink planner



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20896) [Local Agg Pushdown] Support SupportsAggregatePushDown for JDBC TableSource

2021-01-08 Thread Sebastian Liu (Jira)
Sebastian Liu created FLINK-20896:
-

 Summary: [Local Agg Pushdown] Support SupportsAggregatePushDown 
for JDBC TableSource
 Key: FLINK-20896
 URL: https://issues.apache.org/jira/browse/FLINK-20896
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Ecosystem
Reporter: Sebastian Liu


Will add SupportsAggregatePushDown implementation for JDBC TableSource.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Support local aggregate push down for Blink batch planner

2021-01-08 Thread Sebastian Liu
Hi Jark,

Cool, following your suggestions I have created three related subtasks
under Flink-20791.
Hope to assign these subtasks to me too, when you have time. And I
will push forward the relevant implementation.

Jark Wu  于2021年1月8日周五 下午12:30写道:

> Hi Sebastian,
>
> I assigned the issue to you. But I suggest creating sub-tasks under this
> issue. Because I think this would be a big contribution.
> For example, you can split it into:
> 1. Introduce SupportsAggregatePushDown interface
> 2. Support SupportsAggregatePushDown in planner
> 3. Support SupportsAggregatePushDown for JDBC source
> 4. ...
>
> Best,
> Jark
>
> On Thu, 7 Jan 2021 at 23:27, Sebastian Liu  wrote:
>
>> Hi Jark,
>>
>> Seems that we have reached the agreement on the proposal. Could you
>> please help to assign the below jira ticket to me?
>> https://issues.apache.org/jira/browse/FLINK-20791
>>
>> Jark Wu  于2021年1月7日周四 上午10:25写道:
>>
>>> Thanks for updating the design doc.
>>> It looks good to me.
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 7 Jan 2021 at 10:16, Jingsong Li  wrote:
>>>
 Sounds good to me.

 We don't have to worry about future changes, because it has covered all
 the capabilities of calcite aggregation.

 Best,
 Jingsong

 On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu 
 wrote:

> Hi Jark,
>
> Sounds good to me. For better scalability in the future, we could add
> the AggregateExpression.
> ```
>
> public class AggregateExpression implements ResolvedExpression {
>
>private final FunctionDefinition functionDefinition;
>
>private final List args;
>
>private final @Nullable CallExpression filterExpression;
>
>private final DataType resultType;
>
>private final boolean distinct;
>
>private final boolean approximate;
>
>
>
>private final boolean ignoreNulls;
>
> }
> ```
>
> And we really only need one GroupingSets parameter for grouping. I
> have updated the related interface in the proposal.
> Appreciate the continued feedback and help.
>
> Jark Wu  于2021年1月6日周三 下午9:34写道:
>
>> Hi Liu, Jingsong,
>>
>> Regarding the agg with filter, I think in theory we can support
>> pushing such a pattern into source.
>> We don't need to support it in the first version, but in the long
>> term, we can support it.
>> The designed interface should be future proof.
>>
>> Considering filter arg and distinct flag should be part of the
>> aggregate expression.
>> I'm wondering if CallExpression is a good representation for it.
>> What do you think about proposing the following `AggregateExpression`
>> to replace the `CallExpression`?
>>
>> class AggregateExpression implements ResolvedExpression {
>> private final FunctionDefinition functionDefinition;
>> private final List args;
>> private final @Nullable CallExpression filterExpr;
>> private final boolean distinct;
>> }
>>
>> Besides, we don't need both groupingFields and groupingSets.
>> `groupingSets` should be a superset of groupingFields.
>> Then the interface of SupportsAggregatePushDown can be:
>>
>> interface SupportsAggregatePushDown {
>>
>>   boolean applyAggregates(
>> List groupingSets,
>> List aggregates,
>> DataType producedDataType);
>> }
>>
>> What do you think?
>>
>> Best,
>> Jark
>>
>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu 
>> wrote:
>>
>>> Hi Jingsong, Jark,
>>>
>>> Thx so much for our discussion, and the cases mentioned above are
>>> really worthy for further discussion.
>>>
>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>> For the current Blink Planner, the optimized plan will be:
>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) ->
>>> LocalAgg -> Exchange -> FinalAgg.
>>> As there is a Calc above the TableSource, this pattern can't match
>>> the LocalAggPushDownRule in the current design.
>>>
>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>>> call_center group by rollup(cc_class, cc_employees);
>>> For the current Blink Planner, the optimized plan will be:
>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg ->
>>> Calc.
>>> It's also not covered by the current LocalAggPushDownRule design.
>>>
>>> 3. I want to add a case which we haven't discussed yet.
>>> Aggregate with Having clause.
>>> eg: select COUNT(1) from call_center group by cc_class having
>>> max(cc_tax_percentage) > 0.2;
>>> For the current Blink Planner, the optimized plan will be:
>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>

[jira] [Created] (FLINK-20897) Support DataStream batch mode in StreamTableEnvironment

2021-01-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-20897:


 Summary: Support DataStream batch mode in StreamTableEnvironment
 Key: FLINK-20897
 URL: https://issues.apache.org/jira/browse/FLINK-20897
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


The batch mode of DataStream API should also trigger the batch mode in 
StreamTableEnvironment. So that both API have a consistent streaming/batch mode 
behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20898) Code of BatchExpand & LocalNoGroupingAggregateWithoutKeys grows beyond 64 KB

2021-01-08 Thread Sebastian Liu (Jira)
Sebastian Liu created FLINK-20898:
-

 Summary: Code of BatchExpand & LocalNoGroupingAggregateWithoutKeys 
grows beyond 64 KB 
 Key: FLINK-20898
 URL: https://issues.apache.org/jira/browse/FLINK-20898
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Sebastian Liu


When we write a complex batch aggregation SQL, the generated code can easily 
exceed the 64KB size limitation for BatchExpand and 
LocalNoGroupingAggregateWithoutKeys operator. Especially for the analyze table 
scenario. 

For a simple sql of
{code:java}
analyze table tpc_ds.call_center compute statistics for all columns{code}
the underlying sql to execute will be:
{code:java}
SELECT CAST(COUNT(1) AS BIGINT),
CAST(COUNT(DISTINCT `cc_call_center_sk`) AS BIGINT),
CAST(
(COUNT(1) - COUNT(`cc_call_center_sk`)) AS BIGINT
),
CAST(8.0 AS DOUBLE),
CAST(8.0 AS INTEGER),
CAST(MAX(`cc_call_center_sk`) AS BIGINT),
CAST(MIN(`cc_call_center_sk`) AS BIGINT),
CAST(COUNT(DISTINCT `cc_call_center_id`) AS BIGINT),
CAST(
(COUNT(1) - COUNT(`cc_call_center_id`)) AS BIGINT
),
CAST(
AVG(CAST(CHAR_LENGTH(`cc_call_center_id`) AS DOUBLE)) AS DOUBLE
),
CAST(MAX(CHAR_LENGTH(`cc_call_center_id`)) AS INTEGER),
CAST(MAX(`cc_call_center_id`) AS VARCHAR),
CAST(MIN(`cc_call_center_id`) AS VARCHAR),
CAST(COUNT(DISTINCT `cc_rec_start_date`) AS BIGINT),
CAST(
(COUNT(1) - COUNT(`cc_rec_start_date`)) AS BIGINT
),
CAST(12.0 AS DOUBLE),
CAST(12.0 AS INTEGER),
CAST(MAX(`cc_rec_start_date`) AS DATE),
CAST(MIN(`cc_rec_start_date`) AS DATE),
CAST(COUNT(DISTINCT `cc_rec_end_date`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_rec_end_date`)) AS BIGINT),
CAST(12.0 AS DOUBLE),
CAST(12.0 AS INTEGER),
CAST(MAX(`cc_rec_end_date`) AS DATE),
CAST(MIN(`cc_rec_end_date`) AS DATE),
CAST(COUNT(DISTINCT `cc_closed_date_sk`) AS BIGINT),
CAST(
(COUNT(1) - COUNT(`cc_closed_date_sk`)) AS BIGINT
),
CAST(8.0 AS DOUBLE),
CAST(8.0 AS INTEGER),
CAST(MAX(`cc_closed_date_sk`) AS BIGINT),
CAST(MIN(`cc_closed_date_sk`) AS BIGINT),
CAST(COUNT(DISTINCT `cc_open_date_sk`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_open_date_sk`)) AS BIGINT),
CAST(8.0 AS DOUBLE),
CAST(8.0 AS INTEGER),
CAST(MAX(`cc_open_date_sk`) AS BIGINT),
CAST(MIN(`cc_open_date_sk`) AS BIGINT),
CAST(COUNT(DISTINCT `cc_name`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_name`)) AS BIGINT),
CAST(
AVG(CAST(CHAR_LENGTH(`cc_name`) AS DOUBLE)) AS DOUBLE
),
CAST(MAX(CHAR_LENGTH(`cc_name`)) AS INTEGER),
CAST(MAX(`cc_name`) AS VARCHAR),
CAST(MIN(`cc_name`) AS VARCHAR),
CAST(COUNT(DISTINCT `cc_class`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_class`)) AS BIGINT),
CAST(
AVG(CAST(CHAR_LENGTH(`cc_class`) AS DOUBLE)) AS DOUBLE
),
CAST(MAX(CHAR_LENGTH(`cc_class`)) AS INTEGER),
CAST(MAX(`cc_class`) AS VARCHAR),
CAST(MIN(`cc_class`) AS VARCHAR),
CAST(COUNT(DISTINCT `cc_employees`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_employees`)) AS BIGINT),
CAST(4.0 AS DOUBLE),
CAST(4.0 AS INTEGER),
CAST(MAX(`cc_employees`) AS INTEGER),
CAST(MIN(`cc_employees`) AS INTEGER),
CAST(COUNT(DISTINCT `cc_sq_ft`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_sq_ft`)) AS BIGINT),
CAST(4.0 AS DOUBLE),
CAST(4.0 AS INTEGER),
CAST(MAX(`cc_sq_ft`) AS INTEGER),
CAST(MIN(`cc_sq_ft`) AS INTEGER),
CAST(COUNT(DISTINCT `cc_hours`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_hours`)) AS BIGINT),
CAST(
AVG(CAST(CHAR_LENGTH(`cc_hours`) AS DOUBLE)) AS DOUBLE
),
CAST(MAX(CHAR_LENGTH(`cc_hours`)) AS INTEGER),
CAST(MAX(`cc_hours`) AS VARCHAR),
CAST(MIN(`cc_hours`) AS VARCHAR),
CAST(COUNT(DISTINCT `cc_manager`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_manager`)) AS BIGINT),
CAST(
AVG(CAST(CHAR_LENGTH(`cc_manager`) AS DOUBLE)) AS DOUBLE
),
CAST(MAX(CHAR_LENGTH(`cc_manager`)) AS INTEGER),
CAST(MAX(`cc_manager`) AS VARCHAR),
CAST(MIN(`cc_manager`) AS VARCHAR),
CAST(COUNT(DISTINCT `cc_mkt_id`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_mkt_id`)) AS BIGINT),
CAST(4.0 AS DOUBLE),
CAST(4.0 AS INTEGER),
CAST(MAX(`cc_mkt_id`) AS INTEGER),
CAST(MIN(`cc_mkt_id`) AS INTEGER),
CAST(COUNT(DISTINCT `cc_mkt_class`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_mkt_class`)) AS BIGINT),
CAST(
AVG(CAST(CHAR_LENGTH(`cc_mkt_class`) AS DOUBLE)) AS DOUBLE
),
CAST(MAX(CHAR_LENGTH(`cc_mkt_class`)) AS INTEGER),
CAST(MAX(`cc_mkt_class`) AS VARCHAR),
CAST(MIN(`cc_mkt_class`) AS VARCHAR),
CAST(COUNT(DISTINCT `cc_mkt_desc`) AS BIGINT),
CAST((COUNT(1) - COUNT(`cc_mkt_desc`)) AS BIGINT),
CAST(
AVG(CAST(CHAR_LENGTH(`cc_mkt_desc`) AS DOUBLE)) AS DOUBLE
),
CAST(MAX(C

Re: Kafka producer exactly once

2021-01-08 Thread Aljoscha Krettek

On 2021/01/07 14:17, Pramod Immaneni wrote:

Is there a Kafka producer that can do exactly once semantic without the use
of transactions?


I'm afraid not right now. There are some ideas about using a WAL (write 
ahead log) and then periodically "shipping" that to Kafka but nothing 
concrete.


Best,
Aljoscha


Re: Kafka producer exactly once

2021-01-08 Thread Aljoscha Krettek

On 2021/01/08 10:00, Piotr Nowojski wrote:

Moreover I don't think there is a way to implement exactly once producer
without some use of transactions one way or another.


There are some ways I can think of. If messages have consistent IDs, we 
could check whether a message is already in Kafka before writing it.  
That way, you could achieve exactly-once semantics with an 
"at-least-once" system.


I'm not saying that it would be good, or high-performance, but I think 
there are ways. 😅


Best,
Aljoscha


Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-01-08 Thread Aljoscha Krettek

Hi Nicholas,

Thanks for starting the discussion!

I think we might be able to simplify this a bit and re-use existing 
functionality.


There is already `Source.restoreEnumerator()` and 
`SplitEnumerator.snapshotState(). This seems to be roughly what the 
Hybrid Source needs. When the initial source finishes, we can take a 
snapshot (which should include data that the follow-up sources need for 
initialization). Then we need a function that maps the enumerator 
checkpoint types between initial source and new source and we are good 
to go. We wouldn't need to introduce any additional interfaces for 
sources to implement, which would fragment the ecosystem between sources 
that can be used in a Hybrid Source and sources that cannot be used in a 
Hybrid Source.


What do you think?

Best,
Aljoscha

On 2020/11/03 02:34, Nicholas Jiang wrote:

Hi devs,

I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid
source is a source that contains a list of concrete sources. The hybrid
source reads from each contained source in the defined order. It switches
from source A to the next source B when source A finishes.

In practice, many Flink jobs need to read data from multiple sources in
sequential order. Change Data Capture (CDC) and machine learning feature
backfill are two concrete scenarios of this consumption pattern. Users may
have to either run two different Flink jobs or have some hacks in the
SourceFunction to address such use cases.

To support above scenarios smoothly, the Flink jobs need to first read from
HDFS for historical data then switch to Kafka for real-time records. The
hybrid source has several benefits from the user's perspective:

- Switching among multiple sources is easy based on the switchable source
implementations of different connectors.
- This supports to automatically switching for user-defined switchable
source that constitutes hybrid source.
- There is complete and effective mechanism to support smooth source
migration between historical and real-time data.

Therefore, in this discussion, we propose to introduce a “Hybrid Source” API
built on top of the new Source API (FLIP-27) to help users to smoothly
switch sources. For more detail, please refer to the FLIP design doc[1].

I'm looking forward to your feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source


Best,
Nicholas Jiang



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


[jira] [Created] (FLINK-20899) encounter ClassCastException when calculating cost in HepPlanner

2021-01-08 Thread godfrey he (Jira)
godfrey he created FLINK-20899:
--

 Summary: encounter ClassCastException when calculating cost in 
HepPlanner
 Key: FLINK-20899
 URL: https://issues.apache.org/jira/browse/FLINK-20899
 Project: Flink
  Issue Type: Bug
Reporter: godfrey he
Assignee: godfrey he
 Fix For: 1.13.0


After TREACE level is enabled for Logger, we will encounter ClassCastException 
when executing the following query:
{code:sql}
SELECT COUNT(*) FROM MyTable
{code}

the exception detail is:

{code:java}
java.lang.ClassCastException: org.apache.calcite.plan.RelOptCostImpl$Factory 
cannot be cast to org.apache.flink.table.planner.plan.cost.FlinkCostFactory

at 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:53)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41)
at 
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
Source)
at 
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:288)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38)
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
Source)
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
Source)
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
Source)
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
Source)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:269)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost$$anonfun$getCumulativeCost$1.apply(FlinkRelMdCumulativeCost.scala:41)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost$$anonfun$getCumulativeCost$1.apply(FlinkRelMdCumulativeCost.scala:40)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:39)
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
Source)
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
Source)
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:269)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:247)
at 
org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1023)
at org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:159)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
{code}

The reason is the HepPlanner will calcite the cost of the optimized plan when 
TRACE is enabled, while the cost factory in hep planner is 
{{RelOptCostImpl.Factory}} not the sub-class of FlinkCostFactory.

The solution is we should create HepPlanner with the cost factory in the 
RelOptCluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

2021-01-08 Thread Aljoscha Krettek
Till or Chesnay (cc'ed), have you thought about adding a hook on the 
JobMaster/JobManager to allow external systems to get push notifications 
about submitted jobs.


If they are ok with such as future, would you maybe be interested in 
implementing it yourself, Wenhao?


Best,
Aljoscha

On 2020/09/28 11:14, 季文昊 wrote:

Hi Aljoscha,

Yes, that is not enough, since the `JobListener`s are called only once when
`excute()` or `executeAsync()` is called. And in order to sync the status,
I also have to call `JobClient#getJobStatus` periodically.

On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek 
wrote:


Hi,

I understand from your email that
`StreamExecutionEnvironment.registerJobListener()` would not be enought
for you because you want to be notified of changes on the cluster side,
correct? That is when the job status changes on the master.

Best,
Aljoscha

On 23.09.20 14:31, 季文昊 wrote:
> Hi there,
>
> I'm working on a Flink platform in my corp, which provides a service to
> provision and manage multiple dedicated Flink clusters. The problem is
that
> we want to sync a job status without delay after its submission through
our
> platform as long as it has been changed.
>
> Since we want to update this in-time and make our services stateless,
> pulling a job's status periodically is not a good solution. I do not find
> any proper way to achieve this by letting a job manager push changes
> directly to our platform except changing the source code, which registers
> an additional `JobStatusListener` in the method
> `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
>
> I wonder if we can enhance `JobStatusListener` a little bit so that a
Flink
> user can register his custom JobStatusListener at the startup.
>
> To be specific, we can have a `JobStatusListenerFactory` interface and
its
> corresponding `ServiceLoader`, where
> the JobStatusListenerFactory will have the following method:
>   - JobStatusListener createJobStatusListener(Properties properties);
>
> Custom listeners will be created during the JobMaster#startScheduling
> method.
>
> If someone would like to implement his own JobStatusListener, he will
> package all the related classes into a standalone jar with a
>
`META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> file and place it under the `lib/` directory.
>
> In addition, I find that there is a Jira ticket similar to what I'm
> asking: FLINK-17104 but I do not see any comment or update yet. Hope
anyone
> could help me move on this feature or give me some suggestions about it.
>
> Thanks,
> Wenhao
>




[jira] [Created] (FLINK-20900) Extend documentation guidelines to cover formatting of commands

2021-01-08 Thread Matthias (Jira)
Matthias created FLINK-20900:


 Summary: Extend documentation guidelines to cover formatting of 
commands
 Key: FLINK-20900
 URL: https://issues.apache.org/jira/browse/FLINK-20900
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Matthias


As part of refactoring the deployment documentation we came up with some 
agreements that we could add to the docs guidelines to get a more consistent 
look & feel:
* long parameter names for commands
* one parameter per line
* parameter line indented
* command preceded by `$`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

2021-01-08 Thread Till Rohrmann
At the moment, this requirement has not come up very often. In general, I
am always a bit cautious when adding functionality which executes user code
in the JobManager because it can easily become a stability problem. On the
other hand, I can't think of a different solution other than polling the
job status atm.

Cheers,
Till

On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek  wrote:

> Till or Chesnay (cc'ed), have you thought about adding a hook on the
> JobMaster/JobManager to allow external systems to get push notifications
> about submitted jobs.
>
> If they are ok with such as future, would you maybe be interested in
> implementing it yourself, Wenhao?
>
> Best,
> Aljoscha
>
> On 2020/09/28 11:14, 季文昊 wrote:
> >Hi Aljoscha,
> >
> >Yes, that is not enough, since the `JobListener`s are called only once
> when
> >`excute()` or `executeAsync()` is called. And in order to sync the status,
> >I also have to call `JobClient#getJobStatus` periodically.
> >
> >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek 
> >wrote:
> >
> >> Hi,
> >>
> >> I understand from your email that
> >> `StreamExecutionEnvironment.registerJobListener()` would not be enought
> >> for you because you want to be notified of changes on the cluster side,
> >> correct? That is when the job status changes on the master.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 23.09.20 14:31, 季文昊 wrote:
> >> > Hi there,
> >> >
> >> > I'm working on a Flink platform in my corp, which provides a service
> to
> >> > provision and manage multiple dedicated Flink clusters. The problem is
> >> that
> >> > we want to sync a job status without delay after its submission
> through
> >> our
> >> > platform as long as it has been changed.
> >> >
> >> > Since we want to update this in-time and make our services stateless,
> >> > pulling a job's status periodically is not a good solution. I do not
> find
> >> > any proper way to achieve this by letting a job manager push changes
> >> > directly to our platform except changing the source code, which
> registers
> >> > an additional `JobStatusListener` in the method
> >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> >> >
> >> > I wonder if we can enhance `JobStatusListener` a little bit so that a
> >> Flink
> >> > user can register his custom JobStatusListener at the startup.
> >> >
> >> > To be specific, we can have a `JobStatusListenerFactory` interface and
> >> its
> >> > corresponding `ServiceLoader`, where
> >> > the JobStatusListenerFactory will have the following method:
> >> >   - JobStatusListener createJobStatusListener(Properties properties);
> >> >
> >> > Custom listeners will be created during the JobMaster#startScheduling
> >> > method.
> >> >
> >> > If someone would like to implement his own JobStatusListener, he will
> >> > package all the related classes into a standalone jar with a
> >> >
> >>
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> >> > file and place it under the `lib/` directory.
> >> >
> >> > In addition, I find that there is a Jira ticket similar to what I'm
> >> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
> >> anyone
> >> > could help me move on this feature or give me some suggestions about
> it.
> >> >
> >> > Thanks,
> >> > Wenhao
> >> >
> >>
> >>
>


[jira] [Created] (FLINK-20901) Introduce DeclarativeSlotPool methods to set resource requirements to absolute values

2021-01-08 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-20901:
-

 Summary: Introduce DeclarativeSlotPool methods to set resource 
requirements to absolute values
 Key: FLINK-20901
 URL: https://issues.apache.org/jira/browse/FLINK-20901
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.13.0


I propose to add {{DeclarativeSlotPool.setResourceRequirements(ResourceCounter 
resourceRequirements)}} which sets the resource requirements to 
{{resourceRequirements}}. This will make the operation of this component easier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Support local aggregate push down for Blink batch planner

2021-01-08 Thread Jark Wu
Great! Thanks for pushing this work.
Looking forward to the pull requests.

Best,
Jark

On Fri, 8 Jan 2021 at 17:57, Sebastian Liu  wrote:

> Hi Jark,
>
> Cool, following your suggestions I have created three related subtasks
> under Flink-20791.
> Hope to assign these subtasks to me too, when you have time. And I
> will push forward the relevant implementation.
>
> Jark Wu  于2021年1月8日周五 下午12:30写道:
>
>> Hi Sebastian,
>>
>> I assigned the issue to you. But I suggest creating sub-tasks under this
>> issue. Because I think this would be a big contribution.
>> For example, you can split it into:
>> 1. Introduce SupportsAggregatePushDown interface
>> 2. Support SupportsAggregatePushDown in planner
>> 3. Support SupportsAggregatePushDown for JDBC source
>> 4. ...
>>
>> Best,
>> Jark
>>
>> On Thu, 7 Jan 2021 at 23:27, Sebastian Liu  wrote:
>>
>>> Hi Jark,
>>>
>>> Seems that we have reached the agreement on the proposal. Could you
>>> please help to assign the below jira ticket to me?
>>> https://issues.apache.org/jira/browse/FLINK-20791
>>>
>>> Jark Wu  于2021年1月7日周四 上午10:25写道:
>>>
 Thanks for updating the design doc.
 It looks good to me.

 Best,
 Jark

 On Thu, 7 Jan 2021 at 10:16, Jingsong Li 
 wrote:

> Sounds good to me.
>
> We don't have to worry about future changes, because it has covered
> all the capabilities of calcite aggregation.
>
> Best,
> Jingsong
>
> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu 
> wrote:
>
>> Hi Jark,
>>
>> Sounds good to me. For better scalability in the future, we could add
>> the AggregateExpression.
>> ```
>>
>> public class AggregateExpression implements ResolvedExpression {
>>
>>private final FunctionDefinition functionDefinition;
>>
>>private final List args;
>>
>>private final @Nullable CallExpression filterExpression;
>>
>>private final DataType resultType;
>>
>>private final boolean distinct;
>>
>>private final boolean approximate;
>>
>>
>>
>>private final boolean ignoreNulls;
>>
>> }
>> ```
>>
>> And we really only need one GroupingSets parameter for grouping. I
>> have updated the related interface in the proposal.
>> Appreciate the continued feedback and help.
>>
>> Jark Wu  于2021年1月6日周三 下午9:34写道:
>>
>>> Hi Liu, Jingsong,
>>>
>>> Regarding the agg with filter, I think in theory we can support
>>> pushing such a pattern into source.
>>> We don't need to support it in the first version, but in the long
>>> term, we can support it.
>>> The designed interface should be future proof.
>>>
>>> Considering filter arg and distinct flag should be part of the
>>> aggregate expression.
>>> I'm wondering if CallExpression is a good representation for it.
>>> What do you think about proposing the following
>>> `AggregateExpression` to replace the `CallExpression`?
>>>
>>> class AggregateExpression implements ResolvedExpression {
>>> private final FunctionDefinition functionDefinition;
>>> private final List args;
>>> private final @Nullable CallExpression filterExpr;
>>> private final boolean distinct;
>>> }
>>>
>>> Besides, we don't need both groupingFields and groupingSets.
>>> `groupingSets` should be a superset of groupingFields.
>>> Then the interface of SupportsAggregatePushDown can be:
>>>
>>> interface SupportsAggregatePushDown {
>>>
>>>   boolean applyAggregates(
>>> List groupingSets,
>>> List aggregates,
>>> DataType producedDataType);
>>> }
>>>
>>> What do you think?
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu 
>>> wrote:
>>>
 Hi Jingsong, Jark,

 Thx so much for our discussion, and the cases mentioned above are
 really worthy for further discussion.

 1. For aggregate with filter expressions: eg: select COUNT(1)
 FILTER(WHERE cc_call_center_sk > 3) from call_center;
 For the current Blink Planner, the optimized plan will be:
 TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) ->
 LocalAgg -> Exchange -> FinalAgg.
 As there is a Calc above the TableSource, this pattern can't match
 the LocalAggPushDownRule in the current design.

 2. For the grouping set or rollup use case: eg: select COUNT(1)
 from call_center group by rollup(cc_class, cc_employees);
 For the current Blink Planner, the optimized plan will be:
 TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg ->
 Calc.
 It's also not covered by the current LocalAggPushDownRule design.

 3. I want to add a case which we haven't discussed yet.
 Aggregate with Having clause.
>

[jira] [Created] (FLINK-20902) Please remove my email id

2021-01-08 Thread Suryanarayana Murthy Maganti (Jira)
Suryanarayana Murthy Maganti created FLINK-20902:


 Summary: Please remove my email id 
 Key: FLINK-20902
 URL: https://issues.apache.org/jira/browse/FLINK-20902
 Project: Flink
  Issue Type: Wish
  Components: API / Core
Reporter: Suryanarayana Murthy Maganti


Hey Admin, I am getting bunch of emails from Apache flink , please remove my 
subscription. This is the second time i am requesting, not sure how to 
unsubscribe.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Core limit exceeded in GridGain Control Center

2021-01-08 Thread Carsten

Hello all,

after an install marathon yesterday night, I was ready to test Ignite 
2.9.1 + GridGainControl Center 2020.12.


But when starting everything (4 nodes + control center) I got the 
message "Core limit has been exceeded for the current license".


I was just wondering what the core limit is and were to find information 
on it (i did search but came up empty)


Any advice is greatly appreciated


All the best,

Carsten




Re: Kafka producer exactly once

2021-01-08 Thread Piotr Nowojski
Yes, I meant true general purpose exactly-once :)

> There are some ideas about using a WAL (write ahead log) and then
periodically "shipping" that to Kafka but nothing concrete.

But that would still need to be using Kafka transactions for "shipping"
records. That's what I meant that one way or another we need transactions :)

Piotrek

pt., 8 sty 2021 o 12:44 Aljoscha Krettek  napisał(a):

> On 2021/01/08 10:00, Piotr Nowojski wrote:
> >Moreover I don't think there is a way to implement exactly once producer
> >without some use of transactions one way or another.
>
> There are some ways I can think of. If messages have consistent IDs, we
> could check whether a message is already in Kafka before writing it.
> That way, you could achieve exactly-once semantics with an
> "at-least-once" system.
>
> I'm not saying that it would be good, or high-performance, but I think
> there are ways. 😅
>
> Best,
> Aljoscha
>


[jira] [Created] (FLINK-20903) Remove SchedulerNG.initialize method

2021-01-08 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-20903:
-

 Summary: Remove SchedulerNG.initialize method
 Key: FLINK-20903
 URL: https://issues.apache.org/jira/browse/FLINK-20903
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.13.0


The {{SchedulerNG}} no longer needs the {{initialize}} method because the 
{{JobMaster}} is now started with a valid {{MainThreadExecutor}}. Hence, we can 
simplify the {{SchedulerNG}} by passing the {{ComponentMainThreadExecutor}} to 
the constructor of the implementations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

2021-01-08 Thread Jeff Zhang
Hi Till,

IIUC for application mode, we already allow to run user code in job manager

Till Rohrmann  于2021年1月8日周五 下午9:53写道:

> At the moment, this requirement has not come up very often. In general, I
> am always a bit cautious when adding functionality which executes user code
> in the JobManager because it can easily become a stability problem. On the
> other hand, I can't think of a different solution other than polling the
> job status atm.
>
> Cheers,
> Till
>
> On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek 
> wrote:
>
> > Till or Chesnay (cc'ed), have you thought about adding a hook on the
> > JobMaster/JobManager to allow external systems to get push notifications
> > about submitted jobs.
> >
> > If they are ok with such as future, would you maybe be interested in
> > implementing it yourself, Wenhao?
> >
> > Best,
> > Aljoscha
> >
> > On 2020/09/28 11:14, 季文昊 wrote:
> > >Hi Aljoscha,
> > >
> > >Yes, that is not enough, since the `JobListener`s are called only once
> > when
> > >`excute()` or `executeAsync()` is called. And in order to sync the
> status,
> > >I also have to call `JobClient#getJobStatus` periodically.
> > >
> > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek 
> > >wrote:
> > >
> > >> Hi,
> > >>
> > >> I understand from your email that
> > >> `StreamExecutionEnvironment.registerJobListener()` would not be
> enought
> > >> for you because you want to be notified of changes on the cluster
> side,
> > >> correct? That is when the job status changes on the master.
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> On 23.09.20 14:31, 季文昊 wrote:
> > >> > Hi there,
> > >> >
> > >> > I'm working on a Flink platform in my corp, which provides a service
> > to
> > >> > provision and manage multiple dedicated Flink clusters. The problem
> is
> > >> that
> > >> > we want to sync a job status without delay after its submission
> > through
> > >> our
> > >> > platform as long as it has been changed.
> > >> >
> > >> > Since we want to update this in-time and make our services
> stateless,
> > >> > pulling a job's status periodically is not a good solution. I do not
> > find
> > >> > any proper way to achieve this by letting a job manager push changes
> > >> > directly to our platform except changing the source code, which
> > registers
> > >> > an additional `JobStatusListener` in the method
> > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> > >> >
> > >> > I wonder if we can enhance `JobStatusListener` a little bit so that
> a
> > >> Flink
> > >> > user can register his custom JobStatusListener at the startup.
> > >> >
> > >> > To be specific, we can have a `JobStatusListenerFactory` interface
> and
> > >> its
> > >> > corresponding `ServiceLoader`, where
> > >> > the JobStatusListenerFactory will have the following method:
> > >> >   - JobStatusListener createJobStatusListener(Properties
> properties);
> > >> >
> > >> > Custom listeners will be created during the
> JobMaster#startScheduling
> > >> > method.
> > >> >
> > >> > If someone would like to implement his own JobStatusListener, he
> will
> > >> > package all the related classes into a standalone jar with a
> > >> >
> > >>
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > >> > file and place it under the `lib/` directory.
> > >> >
> > >> > In addition, I find that there is a Jira ticket similar to what I'm
> > >> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
> > >> anyone
> > >> > could help me move on this feature or give me some suggestions about
> > it.
> > >> >
> > >> > Thanks,
> > >> > Wenhao
> > >> >
> > >>
> > >>
> >
>


-- 
Best Regards

Jeff Zhang


Re: Core limit exceeded in GridGain Control Center

2021-01-08 Thread Arvid Heise
Hi Carsten,

you probably picked the wrong dev list. You probably want to go to the
Ignite user list instead.

Best,

Arvid

On Fri, Jan 8, 2021 at 3:53 PM Carsten 
wrote:

> Hello all,
>
> after an install marathon yesterday night, I was ready to test Ignite
> 2.9.1 + GridGainControl Center 2020.12.
>
> But when starting everything (4 nodes + control center) I got the
> message "Core limit has been exceeded for the current license".
>
> I was just wondering what the core limit is and were to find information
> on it (i did search but came up empty)
>
> Any advice is greatly appreciated
>
>
> All the best,
>
> Carsten
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Core limit exceeded in GridGain Control Center

2021-01-08 Thread Carsten

Hi Arvid,

I saw that as soon as I sent the mail :)


Sorry all !!

Carsten


Am 08.01.21 um 13:10 schrieb Arvid Heise:

Hi Carsten,

you probably picked the wrong dev list. You probably want to go to the
Ignite user list instead.

Best,

Arvid

On Fri, Jan 8, 2021 at 3:53 PM Carsten 
wrote:


Hello all,

after an install marathon yesterday night, I was ready to test Ignite
2.9.1 + GridGainControl Center 2020.12.

But when starting everything (4 nodes + control center) I got the
message "Core limit has been exceeded for the current license".

I was just wondering what the core limit is and were to find information
on it (i did search but came up empty)

Any advice is greatly appreciated


All the best,

Carsten







[jira] [Created] (FLINK-20904) Maven enforce goal dependency-convergence failed on flink-avro-glue-schema-registry

2021-01-08 Thread Linyu Yao (Jira)
Linyu Yao created FLINK-20904:
-

 Summary: Maven enforce goal dependency-convergence failed on 
flink-avro-glue-schema-registry
 Key: FLINK-20904
 URL: https://issues.apache.org/jira/browse/FLINK-20904
 Project: Flink
  Issue Type: New Feature
  Components: Build System / CI
Affects Versions: 1.13.0
Reporter: Linyu Yao


I'm creating a pull request to add integration with AWS Glue Schema Registry, 
more details of this new feature can be found here 
https://issues.apache.org/jira/browse/FLINK-19667

The package can build successfully locally and pass end to end test. But the 
new added module compiling failed in CI. See more 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=11770&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb]

How can replicate this issue on my local environment and fix it?
{code:java}
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (dependency-convergence) @ 
flink-avro-glue-schema-registry ---
[WARNING] 
Dependency convergence error for software.amazon.awssdk:http-client-spi:2.15.32 
paths to dependency are:
+-org.apache.flink:flink-avro-glue-schema-registry:1.13-SNAPSHOT
  +-software.amazon.glue:schema-registry-serde:1.0.0
+-software.amazon.glue:schema-registry-common:1.0.0
  +-software.amazon.awssdk:glue:2.15.32
+-software.amazon.awssdk:protocol-core:2.15.32
  +-software.amazon.awssdk:http-client-spi:2.15.32
and
+-org.apache.flink:flink-avro-glue-schema-registry:1.13-SNAPSHOT
  +-software.amazon.glue:schema-registry-serde:1.0.0
+-software.amazon.glue:schema-registry-common:1.0.0
  +-software.amazon.awssdk:glue:2.15.32
+-software.amazon.awssdk:auth:2.15.32
  +-software.amazon.awssdk:http-client-spi:2.15.32
and
+-org.apache.flink:flink-avro-glue-schema-registry:1.13-SNAPSHOT
  +-software.amazon.glue:schema-registry-serde:1.0.0
+-software.amazon.glue:schema-registry-common:1.0.0
  +-software.amazon.awssdk:glue:2.15.32
+-software.amazon.awssdk:http-client-spi:2.15.32
and
+-org.apache.flink:flink-avro-glue-schema-registry:1.13-SNAPSHOT
  +-software.amazon.glue:schema-registry-serde:1.0.0
+-software.amazon.glue:schema-registry-common:1.0.0
  +-software.amazon.awssdk:glue:2.15.32
+-software.amazon.awssdk:aws-core:2.15.32
  +-software.amazon.awssdk:http-client-spi:2.15.32
and
+-org.apache.flink:flink-avro-glue-schema-registry:1.13-SNAPSHOT
  +-software.amazon.glue:schema-registry-serde:1.0.0
+-software.amazon.glue:schema-registry-common:1.0.0
  +-software.amazon.awssdk:glue:2.15.32
+-software.amazon.awssdk:apache-client:2.15.32
  +-software.amazon.awssdk:http-client-spi:2.15.32
and
+-org.apache.flink:flink-avro-glue-schema-registry:1.13-SNAPSHOT
  +-software.amazon.glue:schema-registry-serde:1.0.0
+-software.amazon.glue:schema-registry-common:1.0.0
  +-software.amazon.awssdk:glue:2.15.32
+-software.amazon.awssdk:netty-nio-client:2.15.32
  +-software.amazon.awssdk:http-client-spi:2.15.32
and
+-org.apache.flink:flink-avro-glue-schema-registry:1.13-SNAPSHOT
  +-software.amazon.glue:schema-registry-serde:1.0.0
+-software.amazon.glue:schema-registry-common:1.0.0
  +-software.amazon.awssdk:cloudwatch:2.15.30
+-software.amazon.awssdk:aws-query-protocol:2.15.30
  +-software.amazon.awssdk:http-client-spi:2.15.30
...{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)