Renaming the metrics

2020-06-22 Thread Ori Popowski
I have two Flink clusters sending metrics via Prometheus and they share all
the metric names (i.e.
flink_taskmanager_job_task_operator_currentOutputWatermark).

I want to change the flink_ prefix to something else to distinguish between
the clusters (maybe the job-name).

How can I do it?

Thanks.


[ANNOUNCE] Weekly Community Update 2020/25

2020-06-22 Thread Konstantin Knauf
Dear community,

happy to share this week's community update: release testing for Flink
1.11.0 is slowly converging, and the first feature discussions for the
upcoming release cycle are coming up.

Flink Development
==

* [releases] The community has published another non-voting release
candidate for Flink 1.11.0 to facilitate further release testing. [1]

* [connectors] Jacky Lau proposes to drop support for ElesaticSearch 5.x to
facilitate the implementation of FLIP-127, a source connector for
ElasticsSearch. [2]

* [connectors] Gyula proposes to upgrade the HBase connector to 2.2.x. and
dropping support for 1.4.x. The discussion indicates that there are still a
lot of HBase 1.x installations out there and hence there are reservations
on dropping support for it now. [3]

* [docs] Flink Master has been re-renamed to Jobmanager as part of a larger
discussion about the usage of the terms slave/master in Apache Flink. [4,5]

* [sql] Fabian has started a discussion on a SQL syntax for a Table API's
StatementSet. A StatementSet allows users to group multiple INSERT INTO
statements, so that they are compiled into a single Flink job. This
functionality has been added in FLIP-84, but is not exposed in SQL yet. [6]

* [sql] Jingsong Li has started a discussion on how to ensure that new SQL
features are also supported by the Flink SQL client going forward. So far,
new features are often only available in the Flink SQL client with a delay
as developers forget to explicitly add support for them directly. [7]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-11-0-release-candidate-2-tp42620.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Re-DISCUSS-Drop-connectors-for-5-x-and-restart-the-flink-es-source-connector-tp42671.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tp42657.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Re-renaming-Flink-Master-back-to-JobManager-tp42510p42512.html
[5] https://issues.apache.org/jira/browse/FLINK-18209
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Sql-client-lack-support-for-new-features-tp42640.html

flink-packages.org
==

* lukess has published a MetricsReporter for SignalFX. [7]

[7] https://flink-packages.org/packages/flink-metrics-signalfx

Notable Bugs
==

Of course, there is still a lot of activity around release testing, but
nothing exciting for any of the released versions.

Events, Blog Posts, Misc
===

* Yu Li joined the Apache Flink PMC. Congratulations! [8]

* On the Ververica blog, Qian Yu explains how Weibo uses Apache Flink for
real time feature extraction and online model training. [9]

* Jeff Zhang has published the first part of a blog post series on Flink on
Zeppelin on the Apache Flink blog. [10]

* The Call for Proposals of Flink Forward Global taking place virtually,
October 19-21, has been extended by 1 week until the 28th of June. We are
looking forward to your submissions. If you feel unsure about submitting,
please do not hesitate to reach out to me or anyone at Ververica. We are
happy to help. [11]

[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Yu-Li-is-now-part-of-the-Flink-PMC-tp42567.html

[9]
https://www.ververica.com/blog/flink-for-online-machine-learning-and-real-time-processing-at-weibo
[10] https://flink.apache.org/news/2020/06/15/flink-on-zeppelin-part1.html
[11] https://www.flink-forward.org/global-2020/call-for-presentations


Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Flink(1.8.3) UI exception is overwritten, and the cause of the failure is not displayed

2020-06-22 Thread Andrew
versin: 1.8.3graph: source -> map -> sink


Scenes??
 source subtask failed causes the graph to restart, but the exception 
displayed on the flink UI is not the cause of the task failure


displayed??
JM log:
020-06-22 14:29:01.087 INFO 
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job 
baseInfoAdapter_20601 (20601159280210484110080369520601) switched from state 
RUNNING to FAILING.
java.lang.Exception: Could not perform checkpoint 87 for operator Sink: 
adapterOutput (19/30).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:597)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:270)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:186)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:105)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:769)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 87 for operator 
Sink: adapterOutput (19/30).
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1115)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1057)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:731)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:643)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:588)
        ... 8 common frames omitted
Caused by: java.lang.Exception: Failed to send data to Kafka: The server 
disconnected before a response was received.
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:375)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:363)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)
        ... 13 common frames omitted





TM log??Running to Cannceling
2020-06-22 15:39:19.816 INFO  com.xxx.client.consumer.GroupConsumer 
 - consumer xxx to jmq1230:xxx,READ,xxx,NONE is stopped.
2020-06-22 15:39:19.816 INFO  org.apache.flink.runtime.taskmanager.Task 
 - Source: baseInfo (79/90) (4e62a84f251d9c68a54e464cff51171e) switched 
from RUNNING to CANCELING.





Is this a known issue?

Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-22 Thread jack
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑




Best,
Jack







在 2020-06-22 14:28:04,"jincheng sun"  写道:

您好,jack:


Table API  不用 if/else 直接用类似逻辑即可:


val t1 = table.filter('x  > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")





Best,
Jincheng






jack  于2020年6月19日周五 上午10:35写道:





测试使用如下结构:
table= t_env.from_path("source")


if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")




我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??








在 2020-06-19 10:08:25,"jack"  写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
>"logType":"syslog",
>"message":"sla;flkdsjf"
>}
>{
>"logType":"alarm",
>"message":"sla;flkdsjf"
>}
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
>   insert_into(sink1)
>elif logType=="alarm":
>   insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")\
>  .filter("logType=alarm")\
>  .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>


what to consider when testing a data stream application using the TPC-H benchmark data?

2020-06-22 Thread Felipe Gutierrez
Hi all,

I would like to create some data stream queries tests using the TPC-H
benchmark. I saw that there are some examples of TPC Q3[1] and Q10[2],
however, they are using DataSet. If I consider creating these queries
but using DataStream what are the caveats that I have to ensure when
implementing the source function? I mean, the frequency of emitting
items is certainly the first. I suppose that I would change the
frequency of the workload globally for all data sources. Is only it or
do you have other things to consider?

[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
[2] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Problems with type erasure

2020-06-22 Thread Vincenzo Pronestì

Hi there,

I need to execute the following code:

 72: KeyedStream, String> keyedDelays = delays
 73:   .flatMap(new Query1FlatMap())74: .keyBy(item -> item.f0);

but I keep getting this error message:

The program finished with the following exception:

The return type of function 'Custom Source' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.

org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)

org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)

org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)
org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)

I've read this guide 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html 
(there's an example with Tuple2> which is the same I need) and I 
think I have two options:
1 - Implement ResultTypeQueryable> in myQuery1FlatMap
 class. I did this by adding:
@Override public TypeInformation> getProducedType() {
return TypeInformation.of(new TypeHint>(){});
}

2 - Use the returns method right after the flatMap(new Query1FlatMap()), like 
this:
TypeInformation> tInfo = TypeInformation.of(new 
TypeHint>(){});
KeyedStream, String> keyedDelays = delays
.flatMap(new Query1FlatMap()).returns(tInfo).keyBy(item -> 
item.f0);
Actually I've also tried with:
TypeHint> tHint =new TypeHint>(){};
KeyedStream, String> keyedDelays = delays
.flatMap(new Query1FlatMap()).returns(tHint).keyBy(item -> 
item.f0);

The problem is none of all these things works and the error message is always 
the same as above. Does any of you know how I can fix this?
Also I'm having the same issue with another code where the keyed stream has two Tuple2 (i.e. 
Tuple2, Integer>, Tuple>). Would the solution work even 
in this last case? Or, do I need to change something because of the double Tuple2?

Thank you for your attention.
Best regards,
Vincenzo



Re: Problems with type erasure

2020-06-22 Thread Yun Gao
Hi Vincenzo:

Could you also attach the codes before line 72, namely how `delays` is 
defined ? Since the exception says the return type of "Custom Source" could not 
be defined, and I think it should refer to `delays`, and the exception is 
thrown when an operator is called on `delays` and Flink tries to create a new 
transformation based on the information of `delays`.

Best,
 Yun


 --Original Mail --
Sender:Vincenzo Pronestì 
Send Date:Mon Jun 22 19:02:05 2020
Recipients:flink-user 
Subject:Problems with type erasure

Hi there,
I need to execute the following code:
 72: KeyedStream, String> keyedDelays = delays 73:   
.flatMap(new Query1FlatMap()) 74:  .keyBy(item -> 
item.f0);but I keep getting this error message:The program finished with the 
following exception:The return type of function 'Custom Source' could not be 
determined automatically, due to type erasure. You can give type information 
hints by using the returns(...) method on the result of the transformation 
call, or by letting your function implement the 'ResultTypeQueryable' 
interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've
 read this guide 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
 (there's an example with Tuple2> which is the same I need) and 
I think I have two options:1 - Implement ResultTypeQueryable> in my Query1FlatMap class. I did this by adding:@Overridepublic 
TypeInformation> getProducedType() {return 
TypeInformation.of(new TypeHint>(){});}2 - Use the 
returns method right after the flatMap(new Query1FlatMap()), like 
this:TypeInformation> tInfo = TypeInformation.of(new 
TypeHint>(){});KeyedStream, String> keyedDelays = delays.flatMap(new 
Query1FlatMap()).returns(tInfo).keyBy(item -> item.f0);Actually 
I've also tried with:TypeHint> tHint = new 
TypeHint>(){};KeyedStream, 
String> keyedDelays = delays.flatMap(new 
Query1FlatMap()).returns(tHint).keyBy(item -> item.f0);The 
problem is none of all these things works and the error message is always the 
same as above. Does any of you know how I can fix this?Also I'm having the same 
issue with another code where the keyed stream has two Tuple2 (i.e. 
Tuple2, Integer>, Tuple>). Would the solution work even 
in this last case? Or, do I need to change something because of the double 
Tuple2?Thank you for your attention.Best regards,Vincenzo 

Re: Problems with type erasure

2020-06-22 Thread Arvid Heise
Hi Vincenzo,

the preferred way to get the type information for tuples is to use
org.apache.flink.api.common.typeinfo.Types. For Tuple2, Integer>, you'd perform

Types.TUPLE(Types.TUPLE(Types.STRING, Types.STRING), Types.INT)

Nested tuples are not an issue in general.

On Mon, Jun 22, 2020 at 2:18 PM Yun Gao  wrote:

> Hi Vincenzo:
>
> Could you also attach the codes before line 72, namely how `delays` is
> defined ? Since the exception says the return type of "Custom Source" could
> not be defined, and I think it should refer to `delays`, and the exception
> is thrown when an operator is called on `delays` and Flink tries to create
> a new transformation based on the information of `delays`.
>
> Best,
>  Yun
>
> --Original Mail --
> *Sender:*Vincenzo Pronestì 
> *Send Date:*Mon Jun 22 19:02:05 2020
> *Recipients:*flink-user 
> *Subject:*Problems with type erasure
>
>> Hi there,
>>
>> I need to execute the following code:
>>
>>  72: KeyedStream, String> keyedDelays = delays 73:
>>.flatMap(new Query1FlatMap()) 74:  .keyBy(item -> 
>> item.f0);but I keep getting this error message:The program finished with the 
>> following exception:The return type of function 'Custom Source' could not be 
>> determined automatically, due to type erasure. You can give type information 
>> hints by using the returns(...) method on the result of the transformation 
>> call, or by letting your function implement the 'ResultTypeQueryable' 
>> interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've
>>  read this guide 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
>>  (there's an example with Tuple2> which is the same I need) 
>> and I think I have two options:1 - Implement 
>> ResultTypeQueryable> in my Query1FlatMap class. I did 
>> this by adding:@Overridepublic TypeInformation> 
>> getProducedType() {return TypeInformation.of(new TypeHint> Double>>(){});}2 - Use the returns method right after the flatMap(new 
>> Query1FlatMap()), like this:TypeInformation> tInfo = 
>> TypeInformation.of(new TypeHint>(){});
>> KeyedStream, String> keyedDelays = delays 
>>.flatMap(new Query1FlatMap()).returns(tInfo).keyBy(item 
>> -> item.f0);Actually I've also tried with:TypeHint> 
>> tHint = new TypeHint>(){};KeyedStream> Double>, String> keyedDelays = delays.flatMap(new 
>> Query1FlatMap()).returns(tHint).keyBy(item -> item.f0);The 
>> problem is none of all these things works and the error message is always 
>> the same as above. Does any of you know how I can fix this?Also I'm having 
>> the same issue with another code where the keyed stream has two Tuple2 (i.e. 
>> Tuple2, Integer>, Tuple>). Would the solution work 
>> even in this last case? Or, do I need to change something because of the 
>> double Tuple2?Thank you for your attention.Best regards,Vincenzo
>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: what to consider when testing a data stream application using the TPC-H benchmark data?

2020-06-22 Thread Arvid Heise
Hi Felipe,

The examples are pretty old (6 years), hence they still use DataSet.

You should be fine by mostly replacing sources with file sources (no need
to write your own source, except you want to generators) and using global
windows for joining.

However, why not use SQL for TPC-H? We have an e2e test [1], where some
TPC-H queries are used (in slightly modified form) [2].
We also have TPC-DS queries as e2e tests [3].

[1]
https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpch-test
[2]
https://github.com/apache/flink/tree/master/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query
[3]
https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpcds-test

On Mon, Jun 22, 2020 at 12:35 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi all,
>
> I would like to create some data stream queries tests using the TPC-H
> benchmark. I saw that there are some examples of TPC Q3[1] and Q10[2],
> however, they are using DataSet. If I consider creating these queries
> but using DataStream what are the caveats that I have to ensure when
> implementing the source function? I mean, the frequency of emitting
> items is certainly the first. I suppose that I would change the
> frequency of the workload globally for all data sources. Is only it or
> do you have other things to consider?
>
> [1]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
> [2]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink(1.8.3) UI exception is overwritten, and the cause of the failure is not displayed

2020-06-22 Thread Arvid Heise
Hi Andrew,

this looks like your Flink cluster has a flaky connection to the Kafka
cluster or your Kafka cluster was down.

Since the operator failed on the sync part of the snapshot, it resorted to
failure to avoid having inconsistent operator state. If you configured
restarts, it just restart from your last checkpoint 86 and recompute the
data.

What would be your expectation? That the checkpoint fails but the job
continues without restart?

In general, the issue with Kafka is that the transaction used for exactly
once, eventually time out. So if too many checkpoints cannot be taken,
you'd ultimately have incorrect data. Hence, failing and restarting is
cleaner as it guarantees consistent data.

On Mon, Jun 22, 2020 at 10:16 AM Andrew <874269...@qq.com> wrote:

> versin: 1.8.3
> graph: source -> map -> sink
>
> Scenes:
>  source subtask failed causes the graph to restart, but the exception
> displayed on the flink UI is not the cause of the task failure
>
> displayed:
> JM log:
> 020-06-22 14:29:01.087 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
> baseInfoAdapter_20601 (20601159280210484110080369520601) switched from
> state RUNNING to FAILING.
> java.lang.Exception: Could not perform checkpoint 87 for operator Sink:
> adapterOutput (19/30).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:597)
> at
> org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:270)
> at
> org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:186)
> at
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:105)
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:769)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not complete snapshot 87 for
> operator Sink: adapterOutput (19/30).
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1115)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1057)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:731)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:643)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:588)
> ... 8 common frames omitted
> Caused by: java.lang.Exception: Failed to send data to Kafka: The server
> disconnected before a response was received.
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:375)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:363)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)
> ... 13 common frames omitted
>
>
> TM log:Running to Cannceling
> 2020-06-22 15:39:19.816 INFO  com.xxx.client.consumer.GroupConsumer  -
> consumer xxx to jmq1230:xxx,READ,xxx,NONE is stopped.
> 2020-06-22 15:39:19.816 INFO  org.apache.flink.runtime.taskmanager.Task  -
> Source: baseInfo (79/90) (4e62a84f251d9c68a54e464cff51171e) switched from
> RUNNING to CANCELING.
>
>
> Is this a known issue?
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Renaming the metrics

2020-06-22 Thread Arvid Heise
Hi Ori,

I see that the PrometheusPushGatewayReporter [1] has an option for a job
name, maybe you can use that.

I'm also including Chesnay who probably has more ideas.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

On Mon, Jun 22, 2020 at 9:01 AM Ori Popowski  wrote:

> I have two Flink clusters sending metrics via Prometheus and they share
> all the metric names (i.e.
> flink_taskmanager_job_task_operator_currentOutputWatermark).
>
> I want to change the flink_ prefix to something else to distinguish
> between the clusters (maybe the job-name).
>
> How can I do it?
>
> Thanks.
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Unaligned Checkpoint and Exactly Once

2020-06-22 Thread Arvid Heise
Hi Lu,

Thank you for your interest in unaligned checkpoints!

I just published some PRs that will warn you if you set both unaligned
checkpoints and AT_LEAST_ONCE. It's indeed not possible or even meaningful
to use them at the same time. AT_LEAST_ONCE has no alignment phase, so it's
faster than both EXACTLY_ONCE options (aligned and unaligned).



On Mon, Jun 22, 2020 at 5:20 AM Zhijiang  wrote:

> From implementation or logic complication perspective, the AT_LEAST_ONCE
> is somehow simpler compared with EXACTLY_ONCE w/o unaligned, since
> it can always process data without blocking any channels.
>
> --
> From:Lu Weizheng 
> Send Time:2020年6月22日(星期一) 10:53
> To:Zhijiang ; user@flink.apache.org <
> user@flink.apache.org>
> Subject:回复: Unaligned Checkpoint and Exactly Once
>
> Thank you Zhijiang! The second question about config is just because I
> find a method in InputProcessorUtil. I guess AT_LEAST_ONCE  mode is a
> simpler way to handle checkpont barrier?
>
> private static CheckpointBarrierHandler createCheckpointBarrierHandler(
>   StreamConfig config,
>   InputGate[] inputGates,
>   SubtaskCheckpointCoordinator checkpointCoordinator,
>   String taskName,
>   AbstractInvokable toNotifyOnCheckpoint) {
>switch (config.getCheckpointMode()) {
>   case EXACTLY_ONCE:
>  if (config.isUnalignedCheckpointsEnabled()) {
> return new AlternatingCheckpointBarrierHandler(
>new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, 
> inputGates),
>new CheckpointBarrierUnaligner(checkpointCoordinator, 
> taskName, toNotifyOnCheckpoint, inputGates),
>toNotifyOnCheckpoint);
>  }
>  return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, 
> inputGates);
>   case AT_LEAST_ONCE:
>  int numInputChannels = 
> Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
>  return new CheckpointBarrierTracker(numInputChannels, 
> toNotifyOnCheckpoint);
>   default:
>  throw new UnsupportedOperationException("Unrecognized Checkpointing 
> Mode: " + config.getCheckpointMode());
>}
> }
>
>
> --
> *发件人:* Zhijiang 
> *发送时间:* 2020年6月22日 10:41
> *收件人:* Lu Weizheng ; user@flink.apache.org <
> user@flink.apache.org>
> *主题:* Re: Unaligned Checkpoint and Exactly Once
>
> Hi Weizheng,
>
> The unaligned checkpoint (UC) only supports exactly-once mode in Flink
> 1.11 except savepoint mode. The savepoint is probably used in job rescaling
> scenario and we plan to support it in future release version. Of course UC
> can satisfy exactly-once semantic as promised.
>
> Regarding the config issue, i am not sure I get your point here. The first
> config is for describing whether the current setting mode (actually only
> exactly-once) enables UC or not, and the second config is for setting the
> different mode (exactly-once or at least-once). I guess you refer to
> merge them by using the first config form. But somehow they seem two
> different dimensions for config the checkpoint. One is for the semantic
> of data processing guarantee. And the other is for how we realize two
> different mechanisms to guarantee one (exactly-once) of the semantics.
>
>
> Best,
> Zhijiang
>
> --
> From:Lu Weizheng 
> Send Time:2020年6月22日(星期一) 07:20
> To:user@flink.apache.org 
> Subject:Unaligned Checkpoint and Exactly Once
>
> Hi there,
>
> The new feature in Flink 1.11 will provide us the Unaligned Checkpoint
> which means a operator subtask does not need to wait all the Checkpoint
> barrier and will not block some channels. As the Checkpoint barrier is the
> key mechanism for Exactly Once guarantee, I am not sure Unaligned
> Checkpoint could still achieve Exactly Once guarantee or only AT Least Once?
>
> FLIP-76
> 
> :
> Unaligned checkpoints will initially be an optional feature. After
> collecting experience and implementing all necessary extensions, unaligned
> checkpoint will probably be enabled by default for exactly once.
>
> What's more, in the following two configs,
>
> Config 1
> env.getCheckpointConfig().enableUnalignedCheckpoints();
>
> Config 2
>
> checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
>
>
> Does Config 2 use a even simpler way for Checkpoint than Unaligned
> Checkpoint?
>
> Hope for replies!
>
> Weizheng
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander 

Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-22 Thread Arvid Heise
If we support both HBase 1 and 2, maybe it's a good time to pull them out
to Bahir and list them in flink-packages to avoid adding even more modules
to Flink core?

On Mon, Jun 22, 2020 at 4:05 AM OpenInx  wrote:

> Hi
>
> According to my observation in the hbase community, there are still lots
> of hbase users running their production cluster with version 1.x (1.4x or
> 1.5.x). so I'd like to suggest that
> supporting both hbase1.x & hbase2.x connector.
>
> Thanks.
>
> On Sat, Jun 20, 2020 at 2:41 PM Ming Li  wrote:
>
>> +1 to support both HBase 2.x and Hbase 1.4.x,  just as what we are doing
>> for Kafka.
>>
>> On Fri, Jun 19, 2020 at 4:02 PM Yu Li  wrote:
>>
>>> One supplement:
>>>
>>> I noticed that there are discussions in HBase ML this March about
>>> removing stable-1 pointer and got consensus [1], and will follow up in
>>> HBase community about why we didn't take real action. However, this doesn't
>>> change my previous statement / stand due to the number of 1.x usages in
>>> production.
>>>
>>> Best Regards,
>>> Yu
>>>
>>> [1]
>>> http://mail-archives.apache.org/mod_mbox/hbase-dev/202003.mbox/%3c30180be2-bd93-d414-a158-16c9c8d01...@apache.org%3E
>>>
>>> On Fri, 19 Jun 2020 at 15:54, Yu Li  wrote:
>>>
 +1 on upgrading the HBase version of the connector, and 1.4.3 is indeed
 an old version.

 OTOH, AFAIK there're still quite some 1.x HBase clusters in production.
 We could also see that the HBase community is still maintaining 1.x release
 lines (with "stable-1 release" point to 1.4.13) [1]

 Please also notice that HBase follows semantic versioning [2] [3] thus
 don't promise any kind of compatibility (source/binary/wire, etc.) between
 major versions. So if we only maintain 2.x connector, it would not be able
 to work with 1.x HBase clusters.

 I totally understand the additional efforts of maintaining two modules,
 but since we're also reserving multiple versions for kafka connector,
 meantime considering the current HBase in-production status, I'd still
 suggest to get both 1.4.13 and 2.2.5 supported.

 Best Regards,
 Yu

 [1] http://hbase.apache.org/downloads.html
 [2] https://hbase.apache.org/book.html#hbase.versioning
 [3] https://semver.org/


 On Fri, 19 Jun 2020 at 14:58, Leonard Xu  wrote:

> +1 to support HBase 2.2.x, and +1 to retain HBase 1.4.3 until we
> deprecates finished(maybe one version is enough).
>
> Currently we only support HBase 1.4.3 which is pretty old, and I’m
> making a flink-sql-connector-hbase[1] shaded jar for pure SQL user, the
> dependencies is a little more complex.
>
>
> 在 2020年6月19日,14:20,jackylau  写道:
>
> + 1 to support HBase 2.x and the hbase 2.x client dependencies are
> simple and clear. Tbe hbase project shades them all
>
>
> Best,
> Leonard Xu
> [1] https://github.com/apache/flink/pull/12687
>
>
>>
>> --
>> Best Regards
>> Michael Li
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Why side-outputs are only supported by Process functions?

2020-06-22 Thread Arvid Heise
Hi Ivneet,

Q1) you can read about the deprecation of split in FLINK-11084 [1]. In
general side-outputs subsume the functionality and allow some advanced
cases (like emitting the same record into two outputs).

Q2) It's simply a matter of API design. The basic idea is to keep most
interfaces as sleek as possible (MapFunction) to not overload new users
completely.

Now for your actual problem. You can use the same receipt as on
stackoverflow, but use Tuple2 instead of Either.

[1] https://issues.apache.org/jira/browse/FLINK-11084

On Mon, Jun 22, 2020 at 3:21 AM ivneet kaur  wrote:

> Hi folks,
> I want to split my stream for some invalid message handling, and need help
> understanding a few things.
> Question 1: Why is *split *operator deprecated?
> Question 2: Why side-outputs are only supported for ProcessFunction,
> KeyedProcessFunction etc.
>
> The doc on side-outputs says: "*You can use the Context parameter, which
> is exposed to users in the above functions, to emit data to a side output
> identified by an OutputTag*",
> my question really is why is Context parameter only available to these
> functions?
>
> My understanding is that the process functions are meant to allow dealing
> with timers, timestamps and watermarks. Is there an inherent connection
> between side outputs and timers that I am missing? I don't think I need any
> such handling.
>
> For use instead of a RichAsyncFunction etc., there does not seem to exist
> a dedicated async flavor of process functions.
>
> Side-outputs from RichAsyncFunction thread
> 
>  suggests
> using Either. For Java developers, there is no standard way
> of achieving this, any other suggestions?
>
> Best,
> Ivneet
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: State backend considerations

2020-06-22 Thread Arvid Heise
Hi Nick,

Both questions are hard to answer given that it depends on your hardware,
access patterns (read/update), record size/structure, parallelism, and
probably a ton of other parameters.

The usual approach is to simply evaluate it in your setting. Since it's a
matter of configuration, you can do some A/B testing.

In general, you need RocksDB if you want to have incremental checkpoints,
which is recommended if you have rather few updates and big state.

On Mon, Jun 22, 2020 at 2:13 AM Nick Bendtner  wrote:

> Hi guys,
> I have a few questions on state backends.
> Is there a guideline on how big the state has to be where it makes sense
> to use RocksDB rather than FsStatebackend ? Is there an analysis on
> latency for a full checkpoint for FsSateBackend based on increase in state
> size ?
>
>
> Best,
> Nick.
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-22 Thread Gyula Fóra
If we were to go the bahir route, I don't see the point in migrating the
1.4.x version there since that's already available in Flink. To me that is
almost the same as dropping explicit support for 1.4 and telling users to
use older connector versions if they wish to keep using it.

If we want to keep 1.4 around for legacy users and slowly deprecate that,
we can do that inside Flink and only push the 2.4.x version to bahir.

What do you think?

Gyula

On Mon, Jun 22, 2020 at 3:16 PM Arvid Heise  wrote:

> If we support both HBase 1 and 2, maybe it's a good time to pull them out
> to Bahir and list them in flink-packages to avoid adding even more modules
> to Flink core?
>
> On Mon, Jun 22, 2020 at 4:05 AM OpenInx  wrote:
>
>> Hi
>>
>> According to my observation in the hbase community, there are still lots
>> of hbase users running their production cluster with version 1.x (1.4x or
>> 1.5.x). so I'd like to suggest that
>> supporting both hbase1.x & hbase2.x connector.
>>
>> Thanks.
>>
>> On Sat, Jun 20, 2020 at 2:41 PM Ming Li  wrote:
>>
>>> +1 to support both HBase 2.x and Hbase 1.4.x,  just as what we are doing
>>> for Kafka.
>>>
>>> On Fri, Jun 19, 2020 at 4:02 PM Yu Li  wrote:
>>>
 One supplement:

 I noticed that there are discussions in HBase ML this March about
 removing stable-1 pointer and got consensus [1], and will follow up in
 HBase community about why we didn't take real action. However, this doesn't
 change my previous statement / stand due to the number of 1.x usages in
 production.

 Best Regards,
 Yu

 [1]
 http://mail-archives.apache.org/mod_mbox/hbase-dev/202003.mbox/%3c30180be2-bd93-d414-a158-16c9c8d01...@apache.org%3E

 On Fri, 19 Jun 2020 at 15:54, Yu Li  wrote:

> +1 on upgrading the HBase version of the connector, and 1.4.3 is
> indeed an old version.
>
> OTOH, AFAIK there're still quite some 1.x HBase clusters in
> production. We could also see that the HBase community is still 
> maintaining
> 1.x release lines (with "stable-1 release" point to 1.4.13) [1]
>
> Please also notice that HBase follows semantic versioning [2] [3] thus
> don't promise any kind of compatibility (source/binary/wire, etc.) between
> major versions. So if we only maintain 2.x connector, it would not be able
> to work with 1.x HBase clusters.
>
> I totally understand the additional efforts of maintaining two
> modules, but since we're also reserving multiple versions for kafka
> connector, meantime considering the current HBase in-production status, 
> I'd
> still suggest to get both 1.4.13 and 2.2.5 supported.
>
> Best Regards,
> Yu
>
> [1] http://hbase.apache.org/downloads.html
> [2] https://hbase.apache.org/book.html#hbase.versioning
> [3] https://semver.org/
>
>
> On Fri, 19 Jun 2020 at 14:58, Leonard Xu  wrote:
>
>> +1 to support HBase 2.2.x, and +1 to retain HBase 1.4.3 until we
>> deprecates finished(maybe one version is enough).
>>
>> Currently we only support HBase 1.4.3 which is pretty old, and I’m
>> making a flink-sql-connector-hbase[1] shaded jar for pure SQL user, the
>> dependencies is a little more complex.
>>
>>
>> 在 2020年6月19日,14:20,jackylau  写道:
>>
>> + 1 to support HBase 2.x and the hbase 2.x client dependencies are
>> simple and clear. Tbe hbase project shades them all
>>
>>
>> Best,
>> Leonard Xu
>> [1] https://github.com/apache/flink/pull/12687
>>
>>
>>>
>>> --
>>> Best Regards
>>> Michael Li
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: what to consider when testing a data stream application using the TPC-H benchmark data?

2020-06-22 Thread Felipe Gutierrez
Hi Arvid,

thanks for the references. I didn't find those tests before. I will
definitely consider them to test my application.

The thing is that I am testing a pre-aggregation stream operator that I
have implemented. Particularly I need a high workload to create
backpressure on the shuffle phase, after the keyBy transformation is done.
And I am monitoring the throughput only of this operator. So, I will stick
with the source function but consider what there is on the other references.

I know that the Table API already has a pre-agg [2]. However, mine works a
little bit differently.

[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Mon, Jun 22, 2020 at 2:54 PM Arvid Heise  wrote:

> Hi Felipe,
>
> The examples are pretty old (6 years), hence they still use DataSet.
>
> You should be fine by mostly replacing sources with file sources (no need
> to write your own source, except you want to generators) and using global
> windows for joining.
>
> However, why not use SQL for TPC-H? We have an e2e test [1], where some
> TPC-H queries are used (in slightly modified form) [2].
> We also have TPC-DS queries as e2e tests [3].
>
> [1]
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpch-test
> [2]
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query
> [3]
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpcds-test
>
> On Mon, Jun 22, 2020 at 12:35 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi all,
>>
>> I would like to create some data stream queries tests using the TPC-H
>> benchmark. I saw that there are some examples of TPC Q3[1] and Q10[2],
>> however, they are using DataSet. If I consider creating these queries
>> but using DataStream what are the caveats that I have to ensure when
>> implementing the source function? I mean, the frequency of emitting
>> items is certainly the first. I suppose that I would change the
>> frequency of the workload globally for all data sources. Is only it or
>> do you have other things to consider?
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
>> [2]
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
>>
>> Thanks,
>> Felipe
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Renaming the metrics

2020-06-22 Thread Chesnay Schepler

There's currently no way to change this.

A related enhancement was proposed on FLINK-17495 that would at least 
allow you to attach a custom label, but the initial implementation 
wasn't general enough.


On 22/06/2020 15:08, Arvid Heise wrote:

Hi Ori,

I see that the PrometheusPushGatewayReporter [1] has an option for a 
job name, maybe you can use that.


I'm also including Chesnay who probably has more ideas.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter


On Mon, Jun 22, 2020 at 9:01 AM Ori Popowski > wrote:


I have two Flink clusters sending metrics via Prometheus and they
share all the metric names (i.e.
flink_taskmanager_job_task_operator_currentOutputWatermark).

I want to change the flink_ prefix to something else to
distinguish between the clusters (maybe the job-name).

How can I do it?

Thanks.



--

Arvid Heise| Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache 
FlinkConference


Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 
BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason, 
Ji (Toni) Cheng





Re: what is the "Flink" recommended way of assigning a backfill to an average on an event time keyed windowed stream?

2020-06-22 Thread Arvid Heise
Hi Marco,

That's a lot of code to digest. So I'm sorry if I did get something wrong.

>From your example, it looks like you want to use the average within a
tumble window. If no record for a particular key has been emitted in that
time, you want to repeat the last value.

I'd use a dummy record to force the windows to be triggered and then ignore
it on aggregation. Here is a sketch

aggregateTimeSeriesStream.keyBy(1).
  .process()
  .window()
  .reduce()
  .process()

This approach needs more functions, but they can be chained mostly.
However, it should be smaller pieces of work that should be easier to
maintain and test. You especially save the trouble of writing the window
logic yourself.

On Thu, Jun 18, 2020 at 7:11 PM Marco Villalobos 
wrote:

> I came up with a solution for backfills. However, at this moment, I am not
> happy with my solution.
> I think there might be other facilities within Flink which allow me to
> implement a better more efficient or more scalable solution.
>
> In another post, rmetz...@apache.org suggested that I use a process
> function and a timer. He was right in that I should use that approach. I
> want to thank him.
>
> The averages are computed by a ProcessWindowFunction that keys by the name
> and window size and uses a tumbling event time window.
>
> However, after that average is complete, I then use a KeyedProcessFunction
> that is keyed by window size. I then use a somewhat brute force approach
> with ValueState> to track names that need a value and a MAP
> STATE to determine which values exist and which ones are backfilled.
> It also cleans up stale values.
>
> I committed my code to a branch
> https://github.com/minmay/flink-patterns/tree/feature/backfill , and I
> also created a pull request
> https://github.com/minmay/flink-patterns/pull/1/files to share my
> experience.
>
> I am open critical comments on my approach, lack of understanding of
> Flink, algorithms and data-structures used. Please refrain from comments on
> my code style though.
>
> I'll also copy and paste my solution below.
>
> package mvillalobos.flink.patterns.timeseries.average;
>
> import com.google.common.collect.ImmutableList;
> import org.apache.flink.api.common.state.MapState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.io.jdbc.JDBCOptions;
> import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple7;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import
> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import picocli.CommandLine;
>
> import java.io.File;
> import java.sql.Connection;
> import java.sql.DriverManager;
> import java.sql.ResultSet;
> import java.sql.Statement;
> import java.sql.Timestamp;
> import java.time.Instant;
> import java.time.temporal.ChronoUnit;
> import java.util.Comparator;
> import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.TreeSet;
> import java.util.concurrent.Callable;
> import java.util.stream.Collectors;
> import java.util.stream.StreamSupport;
>
> @CommandLine.Command(name = "Time Series Average",
> mixinStandardHelpOptions = true,
> description = "Compute the average of the time series with a 15
> minute tumbling event time window and upsert the results into an Apache
> Derby database.")
> public class TimeSeriesAverageApp implements Callable {
>
> private final static Logger logger =
> LoggerFactory.getLogger(TimeSeriesAverageApp.class);
>
> @CommandLine.Option(names = {"-f", "--input-file"}, description = "The
> CSV input file of time series data. Each line must be in the format:
> String, double, Instant.")
> private File inputFile;
>
> @Override
> public In

Re: Problems with type erasure

2020-06-22 Thread Vincenzo Pronestì

Hi Yun,

after reading your message I checked the source and managed to fix the 
problem. So thank you Yun.


In case someone has the same problem. The source is a Kafka Consumer and 
as such it need a class that implements DeserializationSchema. One of 
the required methods is getProducedType. In my case


@Override public TypeInformation getProducedType() {
return TypeInformation.of(Delay.class);
}

After implementing this method I was able to remove the TypeInformation 
from the flatMap function call.


Thank you.

Best regards,

Vincenzo

On 22/06/20 14:17, Yun Gao wrote:


Hi Vincenzo:

    Could you also attach the codes before line 72, namely how 
`delays` is defined ? Since the exception says the return type of 
"Custom Source" could not be defined, and I think it should refer to 
`delays`, and the exception is thrown when an operator is called on 
`delays` and Flink tries to create a new transformation based on the 
information of `delays`.


Best,
 Yun

--Original Mail --
*Sender:*Vincenzo Pronestì 
*Send Date:*Mon Jun 22 19:02:05 2020
*Recipients:*flink-user 
*Subject:*Problems with type erasure

Hi there,

I need to execute the following code:

  72: KeyedStream, String> keyedDelays = delays 73:   .flatMap(new Query1FlatMap())74: .keyBy(item -> item.f0);but I keep getting this error message:The program finished with 
the following exception:The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation 
call, or by letting your function implement the 'ResultTypeQueryable' 
interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've
 read this guidehttps://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html  (there's an example with Tuple2> which is the same I need) and I think I have two options:1 - Implement 
ResultTypeQueryable> in myQuery1FlatMap class. I did this by adding:@Overridepublic TypeInformation> getProducedType() {return TypeInformation.of(new 
TypeHint>(){});}2 - Use the returns method right after the flatMap(new Query1FlatMap()), like this:TypeInformation> tInfo = TypeInformation.of(new 
TypeHint>(){});KeyedStream, String> keyedDelays = delays.flatMap(new Query1FlatMap()).returns(tInfo).keyBy(item -> item.f0);Actually 
I've also tried with:TypeHint> tHint =new TypeHint>(){};KeyedStream, String> keyedDelays = delays.flatMap(new 
Query1FlatMap()).returns(tHint).keyBy(item -> item.f0);The problem is none of all these things works and the error message is always the same as above. Does any of you know how I can fix this?Also I'm having the same issue with 
another code where the keyed stream has two Tuple2 (i.e. Tuple2, Integer>, Tuple>). Would the solution work even in this last case? Or, do I need to change something because of the double 
Tuple2?Thank you for your attention.Best regards,Vincenzo



Re: Submitted Flink Jobs EMR are failing (Could not start rest endpoint on any port in port range 8081)

2020-06-22 Thread Arvid Heise
Hi Sateesh,

the solution still applies, there are not all entries listed in the conf
template.

>From what you have written, it's most certainly that the first jobs are not
finished (hence port is taken). Make sure you don't use the detached mode
when submitting.
You can see the status of the jobs in YARN resource manager which also
links to the respective Flink JobManagers.

And yes, by default, each job creates a new YARN session unless you use
them explicitly [1].

If you need more help, please post your steps.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session

On Thu, Jun 18, 2020 at 4:15 PM sk_ac...@yahoo.com 
wrote:

> I am using EMR 5.30.0 and trying to submit a Flink (1.10.0) job using the
> following command
>
> flink run -m yarn-cluster /home/hadoop/flink--test-0.0.1-SNAPSHOT.jar
>
> and i am getting the following error:
>
> Caused by:
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
> YARN application unexpectedly switched to state FAILED during deployment.
>
> After going through the logs on the worker nodes and job manager logs it
> looks like there is a port conflict
>
> 2020-06-17 21:40:51,199 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not
> start cluster entrypoint YarnJobClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
> to initialize the cluster entrypoint YarnJobClusterEntrypoint.
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
> Caused by: org.apache.flink.util.FlinkException: Could not create the
> DispatcherResourceManagerComponent.
> at
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:215)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
> ... 2 more
> Caused by: java.net.BindException: Could not start rest endpoint on
> any port in port range 8081
> at
> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:219)
> at
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:165)
> ... 9 more
>
> There seems to be JIRA Ticket (
> https://issues.apache.org/jira/browse/FLINK-15394) open for this (though
> it is for 1.9 version of Flink) and the suggested solution is to use port
> range for **rest.bind-port** in Flink config File.
>
> How ever in 1.10 version of Flink we only the following the the Yan Conf
> YML File
>
> rest.port: 8081
>
> Another issue i am facing is i have submitted multiple Flink jobs (same
> job multiple times) using AWS Console and via Add Step ui. Only one of the
> job succeeded and the rest have failed with the error posted above. And
> when i go to Flink UI it doesn't show any jobs at all.
>
> Wondering whether each of the submitted jobs trying to create a Flink Yarn
> session instead of using the existing one.
>
> Thanks
> Sateesh
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Interact with different S3 buckets from a shared Flink cluster

2020-06-22 Thread Arvid Heise
Hi Ricardo,

one option is to use s3p for checkpointing (Presto) and s3a for custom
applications and attach different configurations.

In general, I'd recommend to use a cluster per application to exactly avoid
such issues. I'd use K8s and put the respective IAM roles on each
application pod (e.g. with kiam).

On Thu, Jun 18, 2020 at 1:46 AM Ricardo Cardante <
ricardocarda...@tutanota.com> wrote:

> Hi!
>
>
> We are working in a use case where we have a shared Flink cluster to
> deploy multiple jobs from different teams. With this strategy, we are
> facing a challenge regarding the interaction with S3. Given that we already
> configured S3 for the state backend (through flink-conf.yaml) every time we
> use API functions that communicate with the file system (e.g., DataStream
> readFile) the applicational configurations appear to be overridden by those
> of the cluster while attempting to communicate with external S3 buckets.
> What we've thought so far:
>
>
> 1. Provide a core-site.xml resource file targeting the external S3 buckets
> we want to interact with. We've tested, and the credentials ultimately seem
> to be ignored in behalf of the IAM roles that are pre-loaded with the
> instances;
>
> 2. Load the cluster instances with multiple IAM roles. The problem with
> this is that we would allow each job to interact with out-of-scope buckets;
>
> 3. Spin multiple clusters with different configurations - we would like to
> avoid this since we started from the premise of sharing a single cluster
> per context;
>
>
> What would be a clean/recommended solution to interact with multiple S3
> buckets with different security policies from a shared Flink cluster?
>
> Thanks in advance.
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Unable to run flink job in dataproc cluster with jobmanager provided

2020-06-22 Thread Chesnay Schepler
Is your user-jar packaging and relocating Flink classes? If so, then 
your job actually operate against the classes provided by the cluster, 
which, well, just wouldn't work.


On 18/06/2020 09:34, Sourabh Mehta wrote:

Hi ,
application is using 1.10.0 but cluster is setup on 1.9.0.

Yes I do have access. please find below starting logs from cluster


2020-06-17 11:28:18,989 INFO 
 org.apache.shaded.flink.table.module.ModuleManager  - Got 
FunctionDefinition equals from module core
2020-06-17 11:28:20,538 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.rpc.address, localhost
2020-06-17 11:28:20,538 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.rpc.port, 6123
2020-06-17 11:28:20,538 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.heap.size, 1024m
2020-06-17 11:28:20,538 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.heap.size, 1024m
2020-06-17 11:28:20,538 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-17 11:28:20,538 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: parallelism.default, 1
2020-06-17 11:28:20,539 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-06-17 11:28:20,539 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.rpc.address, cluster-flink-poc-m
2020-06-17 11:28:20,539 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.heap.mb, 12288
2020-06-17 11:28:20,539 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.heap.mb, 12288
2020-06-17 11:28:20,540 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.numberOfTaskSlots, 4
2020-06-17 11:28:20,540 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: parallelism.default, 28
2020-06-17 11:28:20,540 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.network.numberOfBuffers, 2048
2020-06-17 11:28:20,540 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: fs.hdfs.hadoopconf, /etc/hadoop/conf
2020-06-17 11:28:20,550 INFO 
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils 
 - The configuration option Key: 'taskmanager.cpu.cores' , default: 
null (fallback keys: []) required for local execution is not set, 
setting it to its default value 1.7976931348623157E308
2020-06-17 11:28:20,552 INFO 
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils 
 - The configuration option Key: 'taskmanager.memory.task.heap.size' , 
default: null (fallback keys: []) required for local execution is not 
set, setting it to its default value 9223372036854775807 bytes
2020-06-17 11:28:20,552 INFO 
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils 
 - The configuration option Key: 
'taskmanager.memory.task.off-heap.size' , default: 0 bytes (fallback 
keys: []) required for local execution is not set, setting it to its 
default value 9223372036854775807 bytes
2020-06-17 11:28:20,552 INFO 
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils 
 - The configuration option Key: 'taskmanager.memory.network.min' , 
default: 64 mb (fallback keys: [{key=taskmanager.network.memory.min, 
isDeprecated=true}]) required for local execution is not set, setting 
it to its default value 64 mb
2020-06-17 11:28:20,553 INFO 
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils 
 - The configuration option Key: 'taskmanager.memory.network.max' , 
default: 1 gb (fallback keys: [{key=taskmanager.network.memory.max, 
isDeprecated=true}]) required for local execution is not set, setting 
it to its default value 64 mb
2020-06-17 11:28:20,553 INFO 
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils 
 - The configuration option Key: 'taskmanager.memory.managed.size' , 
default: null (fallback keys: [{key=taskmanager.memory.size, 
isDeprecated=true}]) required for local execution is not set, setting 
it to its default value 128 mb
2020-06-17 11:28:20,558 INFO 
 org.apache.shaded.flink.runtime.minicluster.MiniCluster - Starting 
Flink Mini Cluster
2020-06-17 11:28:20,561 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.rpc.address, localhost
2020-06-17 11:28:20,561 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
confi

Re: what to consider when testing a data stream application using the TPC-H benchmark data?

2020-06-22 Thread Arvid Heise
If you are interested in measuring performance, you should also take a look
at our benchmark repo [1] and particular the Throughput job [2].

[1] https://github.com/dataArtisans/performance
[2]
https://github.com/dataArtisans/performance/blob/master/flink-jobs/src/main/java/com/github/projectflink/streaming/Throughput.java

On Mon, Jun 22, 2020 at 3:36 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Arvid,
>
> thanks for the references. I didn't find those tests before. I will
> definitely consider them to test my application.
>
> The thing is that I am testing a pre-aggregation stream operator that I
> have implemented. Particularly I need a high workload to create
> backpressure on the shuffle phase, after the keyBy transformation is done.
> And I am monitoring the throughput only of this operator. So, I will stick
> with the source function but consider what there is on the other references.
>
> I know that the Table API already has a pre-agg [2]. However, mine works a
> little bit differently.
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Mon, Jun 22, 2020 at 2:54 PM Arvid Heise  wrote:
>
>> Hi Felipe,
>>
>> The examples are pretty old (6 years), hence they still use DataSet.
>>
>> You should be fine by mostly replacing sources with file sources (no need
>> to write your own source, except you want to generators) and using global
>> windows for joining.
>>
>> However, why not use SQL for TPC-H? We have an e2e test [1], where some
>> TPC-H queries are used (in slightly modified form) [2].
>> We also have TPC-DS queries as e2e tests [3].
>>
>> [1]
>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpch-test
>> [2]
>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/test-scripts/test-data/tpch/modified-query
>> [3]
>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpcds-test
>>
>> On Mon, Jun 22, 2020 at 12:35 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I would like to create some data stream queries tests using the TPC-H
>>> benchmark. I saw that there are some examples of TPC Q3[1] and Q10[2],
>>> however, they are using DataSet. If I consider creating these queries
>>> but using DataStream what are the caveats that I have to ensure when
>>> implementing the source function? I mean, the frequency of emitting
>>> items is certainly the first. I suppose that I would change the
>>> frequency of the workload globally for all data sources. Is only it or
>>> do you have other things to consider?
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
>>>
>>> Thanks,
>>> Felipe
>>> --
>>> -- Felipe Gutierrez
>>> -- skype: felipe.o.gutierrez
>>> -- https://felipeogutierrez.blogspot.com
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Renaming the metrics

2020-06-22 Thread Ori Popowski
Thanks for answering.

Unrelated to Flink, but if anyone knows a way to rename the metrics inside
Prometheus I'd appreciate if you can share.

About the push gateway - I think I'll stick with the pull options because
it looks like a better fit to the use case

On Mon, Jun 22, 2020 at 4:47 PM Chesnay Schepler  wrote:

> There's currently no way to change this.
>
> A related enhancement was proposed on FLINK-17495 that would at least
> allow you to attach a custom label, but the initial implementation wasn't
> general enough.
>
> On 22/06/2020 15:08, Arvid Heise wrote:
>
> Hi Ori,
>
> I see that the PrometheusPushGatewayReporter [1] has an option for a job
> name, maybe you can use that.
>
> I'm also including Chesnay who probably has more ideas.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter
>
> On Mon, Jun 22, 2020 at 9:01 AM Ori Popowski  wrote:
>
>> I have two Flink clusters sending metrics via Prometheus and they share
>> all the metric names (i.e.
>> flink_taskmanager_job_task_operator_currentOutputWatermark).
>>
>> I want to change the flink_ prefix to something else to distinguish
>> between the clusters (maybe the job-name).
>>
>> How can I do it?
>>
>> Thanks.
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing
> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
>
>
>
>


Re: [EXTERNAL] Re: Renaming the metrics

2020-06-22 Thread Slotterback, Chris
Hi Ori,

Another more temporary brute-force option, while not officially flink, could be 
building a modified version of the metrics plugin into flink where you manually 
manipulate the prefixes yourself. It’s actually pretty easy to build the jar, 
and to test it you drop the jar into the plugin path. I’ve done something 
similar where I actually filter our a lot of the prefixes that I don’t want, 
because too many metric points were being generated from some custom metrics. 
The config for the filter is loaded from the flink conf, you could possibly 
implement something similar where you pass the job name in each clusters config:

https://github.com/cslotterback/flink/commit/fd8e1f77a83a3ae1253da53596d22471bb6fe902
and
https://github.com/cslotterback/flink/commit/ce3797ea46f3321885c4352ecc36b9385b7ca0ce

This isn’t what I’d call ideal, but it gets the job done. I would love a 
generic flink-approved method of configuring Prometheus metrics.

Chris


From: Ori Popowski 
Date: Monday, June 22, 2020 at 12:22 PM
Cc: user 
Subject: [EXTERNAL] Re: Renaming the metrics

Thanks for answering.

Unrelated to Flink, but if anyone knows a way to rename the metrics inside 
Prometheus I'd appreciate if you can share.

About the push gateway - I think I'll stick with the pull options because it 
looks like a better fit to the use case

On Mon, Jun 22, 2020 at 4:47 PM Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
There's currently no way to change this.

A related enhancement was proposed on FLINK-17495 that would at least allow you 
to attach a custom label, but the initial implementation wasn't general enough.

On 22/06/2020 15:08, Arvid Heise wrote:
Hi Ori,

I see that the PrometheusPushGatewayReporter [1] has an option for a job name, 
maybe you can use that.

I'm also including Chesnay who probably has more ideas.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

On Mon, Jun 22, 2020 at 9:01 AM Ori Popowski 
mailto:ori@gmail.com>> wrote:
I have two Flink clusters sending metrics via Prometheus and they share all the 
metric names (i.e. flink_taskmanager_job_task_operator_currentOutputWatermark).

I want to change the flink_ prefix to something else to distinguish between the 
clusters (maybe the job-name).

How can I do it?

Thanks.


--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]


Follow us @VervericaData

--

Join Flink 
Forward
 - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing 
Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng




Rocksdb state directory path in EMR

2020-06-22 Thread Sudan S
Hi,

I have enabled rocksdb(State store) with s3 (external checkpoint) on EMR .

I am using rocksdb as state store with ValueState and checkpoints are
stored in s3.

I am able to see checkpoints in s3 and functionality with respect to state
store is working fine.

But i am trying to dissect rocksdb folder structure but i am not able to
find them on EMR. I am checking /mnt/yarn folder.

Cn u plz help on where flink persists store on EMR when rocksdb is used as
state backend?

-- 
*"The information contained in this e-mail and any accompanying documents 
may contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if 
this message has been addressed to you in error, please immediately alert 
the sender by replying to this e-mail and then delete this message, 
including any attachments. Any dissemination, distribution or other use of 
the contents of this message by anyone other than the intended recipient is 
strictly prohibited. All messages sent to and from this e-mail address may 
be monitored as permitted by applicable law and regulations to ensure 
compliance with our internal policies and to protect our business."*


Re: Trouble with large state

2020-06-22 Thread Jeff Henrikson

Bhaskar,

I think I am unstuck.  The performance numbers I sent after throttling 
were due to a one character error in business logic.  I think I now have 
something good enough to work with for now.  I will repost if I 
encounter further unexpected issues.


Adding application-level throttling ends up resolving both my symptom of 
slow/failing checkpoints, and also my symptom of crashes during long runs.


Many thanks!


Jeff


On 6/20/20 11:46 AM, Jeff Henrikson wrote:

Bhaskar,

 > Glad to know some progress.

Yeah, some progress.  Yet overnight run didn't look as good as I hoped.

The throttling required to not crash during snapshots seems to be quite 
different from the throttling required to crash not during snapshots. So 
the lowest common denominator is quite a large performance penalty.


What's worse, the rate of input that makes the snapshot performance go 
from good to bad seems to change significantly as the state size grows. 
Here is checkpoint history from an overnight run.


Parameters:

     - 30 minutes minimum between snapshots
     - incremental snapshot mode
     - inputs throttled to 100 events per sec per input per slot,
   which is around 1/4 of the unthrottled throughput

Checkpoint history:

 ID    Status    Acknowledged    Trigger Time    Latest 
Acknowledgement    End to End Duration    State Size    Buffered During 
Alignment
 12    COMPLETED    304/304    8:52:22    10:37:18    1h 44m 55s
60.5 GB    0 B
 11    COMPLETED    304/304    6:47:03    8:22:19    1h 35m 16s
53.3 GB    0 B
 10    COMPLETED    304/304    5:01:20    6:17:00    1h 15m 39s
41.0 GB    0 B
 9    COMPLETED    304/304    3:47:43    4:31:19    43m 35s    34.1 
GB    0 B
 8    COMPLETED    304/304    2:40:58    3:17:42    36m 43s    27.8 
GB    0 B
 7    COMPLETED    304/304    1:39:15    2:10:57    31m 42s    23.1 
GB    0 B
 6    COMPLETED    304/304    0:58:02    1:09:13    11m 11s    17.4 
GB    0 B
 5    COMPLETED    304/304    0:23:27    0:28:01    4m 33s    14.3 
GB    0 B
 4    COMPLETED    304/304    23:52:29    23:53:26    56s    12.7 
GB    0 B
 3    COMPLETED    304/304    23:20:59    23:22:28    1m 29s    10.8 
GB    0 B
 2    COMPLETED    304/304    22:46:17    22:50:58    4m 40s    7.40 
GB    0 B


As you can see, GB/minute varies drastically.  GB/minute also varies 
drastically with full checkpoint mode.


I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the 
checkpoint GB/minute getting so slow, it will crash soon.


I'm really wishing state.backend.async=false worked for 
RocksDbStateBackend.


I'm also wondering if my throttler would improve if I just connected to 
the REST api to ask if any checkpoint is in progress, and then paused 
inputs accordingly.  Effectively state.backend.async=false via hacked 
application code.


 > Where are you updating your state here? I
 > couldn't find any flink managed state here.

The only updates to state I make are through the built-in 
DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I 
use .cogroup shows exactly two ways that .cogroup calls an 
implementation of AppendingState.add.  I summarize those below.


The two AppendingState subclasses invoked are HeapListState and 
HeapReducingState.  Neither have a support attribute on them, such as 
MapState's @PublicEvolving.


 > I suggested updating the flink managed state using onTimer over an
 > interval equal to the checkpoint interval.

So the onTimer method, with interval set to the checkpoint interval. 
Interesting.


It looks like the closest subclass for my use case use would be either 
KeyedCoProcessFunction.  Let me see if I understand concretely the idea:


1) between checkpoints, read join input and write join output, by 
loading any state reads from external state, but buffering all state 
changes in memory in some kind of data structure.


2) whenever a checkpoint arrived or the memory consumed by buffered 
writes gets too big, flush the writes to state.


Is that the gist of the idea about .onTimer?


Jeff



There are two paths from .coGroup to AppendingState.add

     path 1 of 2: .coGroup to HeapListState

     add:90, HeapListState {org.apache.flink.runtime.state.heap}
     processElement:203, EvictingWindowOperator 
{org.apache.flink.streaming.runtime.operators.windowing}
     processElement:164, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}
     processInput:143, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}



org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement 



   (windowAssigner is an instance of GlobalWindows)

     @Override
     public void processElement(StreamRecord element) 
throws Exception {
     final Collection elementWindows = 
windowAssigner.assignWindows(
     element.getValue(), element.getTimesta

[no subject]

2020-06-22 Thread 王宇
Hi, all
 some error occurred when I run flink in minicluster,
flink-version:1.11、scala-version:2.12.0.

Error:(33, 41) could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val solutionInput = env.fromElements((1, "1"))
Error:(33, 41) not enough arguments for method fromElements: (implicit
evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15:
org.apache.flink.api.common.typeinfo.TypeInformation[(Int,
String)])org.apache.flink.api.scala.DataSet[(Int, String)].
Unspecified value parameter evidence$15.
val solutionInput = env.fromElements((1, "1"))
Error:(34, 40) could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val worksetInput = env.fromElements((2, "2"))
Error:(34, 40) not enough arguments for method fromElements: (implicit
evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15:
org.apache.flink.api.common.typeinfo.TypeInformation[(Int,
String)])org.apache.flink.api.scala.DataSet[(Int, String)].
Unspecified value parameter evidence$15.
val worksetInput = env.fromElements((2, "2"))
Error:(47, 41) could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val solutionInput = env.fromElements((1, "1"))

have tried
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#type-information-in-the-scala-api


thanks


Re: Submitted Flink Jobs EMR are failing (Could not start rest endpoint on any port in port range 8081)

2020-06-22 Thread Yang Wang
Hi Sateesh, if the "rest.port" or "rest.bind-port" is configured
explicitly, it will be used to
start the rest server. So you need to remove them from the flink-conf.yaml
or configure them
to "0" or port range(50100-50200).

By default, "flink run" will always start a dedicated Flink cluster for
each job. If you want to use
session mode, you need to start with "yarn-session.sh" first. And then use
"flink run ... -yid application_id"
to submit a Flink job to existing cluster.


Best,
Yang

Arvid Heise  于2020年6月22日周一 下午9:58写道:

> Hi Sateesh,
>
> the solution still applies, there are not all entries listed in the conf
> template.
>
> From what you have written, it's most certainly that the first jobs are
> not finished (hence port is taken). Make sure you don't use the detached
> mode when submitting.
> You can see the status of the jobs in YARN resource manager which also
> links to the respective Flink JobManagers.
>
> And yes, by default, each job creates a new YARN session unless you use
> them explicitly [1].
>
> If you need more help, please post your steps.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session
>
> On Thu, Jun 18, 2020 at 4:15 PM sk_ac...@yahoo.com 
> wrote:
>
>> I am using EMR 5.30.0 and trying to submit a Flink (1.10.0) job using the
>> following command
>>
>> flink run -m yarn-cluster /home/hadoop/flink--test-0.0.1-SNAPSHOT.jar
>>
>> and i am getting the following error:
>>
>> Caused by:
>> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
>> YARN application unexpectedly switched to state FAILED during deployment.
>>
>> After going through the logs on the worker nodes and job manager logs it
>> looks like there is a port conflict
>>
>> 2020-06-17 21:40:51,199 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not
>> start cluster entrypoint YarnJobClusterEntrypoint.
>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException:
>> Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>> at
>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
>> Caused by: org.apache.flink.util.FlinkException: Could not create the
>> DispatcherResourceManagerComponent.
>> at
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:215)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>> ... 2 more
>> Caused by: java.net.BindException: Could not start rest endpoint on
>> any port in port range 8081
>> at
>> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:219)
>> at
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:165)
>> ... 9 more
>>
>> There seems to be JIRA Ticket (
>> https://issues.apache.org/jira/browse/FLINK-15394) open for this (though
>> it is for 1.9 version of Flink) and the suggested solution is to use port
>> range for **rest.bind-port** in Flink config File.
>>
>> How ever in 1.10 version of Flink we only the following the the Yan Conf
>> YML File
>>
>> rest.port: 8081
>>
>> Another issue i am facing is i have submitted multiple Flink jobs (same
>> job multiple times) using AWS Console and via Add Step ui. Only one of the
>> job succeeded and the rest have failed with the error posted above. And
>> when i go to Flink UI it doesn't show any jobs at all.
>>
>> Wondering whether each of the submitted jobs trying to create a Flink
>> Yarn session instead of using the existing one.
>>
>> Thanks
>> Sateesh
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | I

Re: TypeInformation not found

2020-06-22 Thread Yun Gao
Hi yu,

Have you add "import org.apache.flink.api.scala._"? It seems should be ok 
if the import has been added in the program:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object Test {
  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val solutionInput = env.fromElements((1, "1"))
solutionInput.print()
env.execute()
  }
}


Best,
Yun





 --Original Mail --
Sender:王宇 
Send Date:Tue Jun 23 09:42:47 2020
Recipients:User 
Subject:No Subject

Hi, all
 some error occurred when I run flink in minicluster, 
flink-version:1.11、scala-version:2.12.0.

Error:(33, 41) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val solutionInput = env.fromElements((1, "1"))
Error:(33, 41) not enough arguments for method fromElements: (implicit 
evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, 
String)])org.apache.flink.api.scala.DataSet[(Int, String)].
Unspecified value parameter evidence$15.
val solutionInput = env.fromElements((1, "1"))
Error:(34, 40) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val worksetInput = env.fromElements((2, "2"))
Error:(34, 40) not enough arguments for method fromElements: (implicit 
evidence$14: scala.reflect.ClassTag[(Int, String)], implicit evidence$15: 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, 
String)])org.apache.flink.api.scala.DataSet[(Int, String)].
Unspecified value parameter evidence$15.
val worksetInput = env.fromElements((2, "2"))
Error:(47, 41) could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val solutionInput = env.fromElements((1, "1"))

have tried 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#type-information-in-the-scala-api

thanks

Re: Flink Stream job to parquet sink

2020-06-22 Thread aj
I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj  wrote:

> please help with this. Any suggestions.
>
> On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:
>
>> Hello All,
>>
>> I am receiving a set of events in Avro format on different topics. I want
>> to consume these and write to s3 in parquet format.
>> I have written a below job that creates a different stream for each event
>> and fetches it schema from the confluent schema registry to create a
>> parquet sink for an event.
>> This is working fine but the only problem I am facing is whenever a new
>> event start coming I have to change in the YAML config and restart the job
>> every time. Is there any way I do not have to restart the job and it start
>> consuming a new set of events.
>>
>>
>> YAML config :
>>
>> !com.bounce.config.EventTopologyConfig
>> eventsType:
>>   - !com.bounce.config.EventConfig
>> event_name: "search_list_keyless"
>> schema_subject: 
>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
>> topic: "search_list_keyless"
>>
>>   - !com.bounce.config.EventConfig
>> event_name: "bike_search_details"
>> schema_subject: 
>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
>> topic: "bike_search_details"
>>
>>   - !com.bounce.config.EventConfig
>> event_name: "keyless_bike_lock"
>> schema_subject: 
>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
>> topic: "analytics-keyless"
>>
>>   - !com.bounce.config.EventConfig
>>   event_name: "keyless_bike_unlock"
>>   schema_subject: 
>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>>   topic: "analytics-keyless"
>>
>>
>> checkPointInterval: 120
>>
>> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>>
>>
>>
>>
>>
>> *Sink code :*
>>
>>   YamlReader reader = new YamlReader(topologyConfig);
>> EventTopologyConfig eventTopologyConfig = 
>> reader.read(EventTopologyConfig.class);
>>
>> long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
>> topics = eventTopologyConfig.getTopics();
>>
>> List eventTypesList = 
>> eventTopologyConfig.getEventsType();
>>
>> CachedSchemaRegistryClient registryClient = new 
>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>>
>>
>> FlinkKafkaConsumer flinkKafkaConsumer = new 
>> FlinkKafkaConsumer(topics,
>> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>> properties);
>>
>> DataStream dataStream = 
>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>>
>> try {
>> for (EventConfig eventConfig : eventTypesList) {
>>
>> LOG.info("creating a stream for ", eventConfig.getEvent_name());
>>
>> final StreamingFileSink sink = StreamingFileSink.forBulkFormat
>> (path, 
>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>>  registryClient)))
>> .withBucketAssigner(new EventTimeBucketAssigner())
>> .build();
>>
>> DataStream outStream = 
>> dataStream.filter((FilterFunction) genericRecord -> {
>> if (genericRecord != null && 
>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>>  {
>> return true;
>> }
>> return false;
>> });
>> 
>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>>
>> }
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>>
>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> 
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> 
>
>
> 
>


Re: Trouble with large state

2020-06-22 Thread Vijay Bhaskar
Jeff
Glad to know that you are able to progress well and issue got resolved

Regards
Bhaskar

On Tue, Jun 23, 2020 at 12:24 AM Jeff Henrikson 
wrote:

> Bhaskar,
>
> I think I am unstuck.  The performance numbers I sent after throttling
> were due to a one character error in business logic.  I think I now have
> something good enough to work with for now.  I will repost if I
> encounter further unexpected issues.
>
> Adding application-level throttling ends up resolving both my symptom of
> slow/failing checkpoints, and also my symptom of crashes during long runs.
>
> Many thanks!
>
>
> Jeff
>
>
> On 6/20/20 11:46 AM, Jeff Henrikson wrote:
> > Bhaskar,
> >
> >  > Glad to know some progress.
> >
> > Yeah, some progress.  Yet overnight run didn't look as good as I hoped.
> >
> > The throttling required to not crash during snapshots seems to be quite
> > different from the throttling required to crash not during snapshots. So
> > the lowest common denominator is quite a large performance penalty.
> >
> > What's worse, the rate of input that makes the snapshot performance go
> > from good to bad seems to change significantly as the state size grows.
> > Here is checkpoint history from an overnight run.
> >
> > Parameters:
> >
> >  - 30 minutes minimum between snapshots
> >  - incremental snapshot mode
> >  - inputs throttled to 100 events per sec per input per slot,
> >which is around 1/4 of the unthrottled throughput
> >
> > Checkpoint history:
> >
> >  IDStatusAcknowledgedTrigger TimeLatest
> > AcknowledgementEnd to End DurationState SizeBuffered During
> > Alignment
> >  12COMPLETED304/3048:52:2210:37:181h 44m 55s
> > 60.5 GB0 B
> >  11COMPLETED304/3046:47:038:22:191h 35m 16s
> > 53.3 GB0 B
> >  10COMPLETED304/3045:01:206:17:001h 15m 39s
> > 41.0 GB0 B
> >  9COMPLETED304/3043:47:434:31:1943m 35s34.1
> > GB0 B
> >  8COMPLETED304/3042:40:583:17:4236m 43s27.8
> > GB0 B
> >  7COMPLETED304/3041:39:152:10:5731m 42s23.1
> > GB0 B
> >  6COMPLETED304/3040:58:021:09:1311m 11s17.4
> > GB0 B
> >  5COMPLETED304/3040:23:270:28:014m 33s14.3
> > GB0 B
> >  4COMPLETED304/30423:52:2923:53:2656s12.7
> > GB0 B
> >  3COMPLETED304/30423:20:5923:22:281m 29s10.8
> > GB0 B
> >  2COMPLETED304/30422:46:1722:50:584m 40s7.40
> > GB0 B
> >
> > As you can see, GB/minute varies drastically.  GB/minute also varies
> > drastically with full checkpoint mode.
> >
> > I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the
> > checkpoint GB/minute getting so slow, it will crash soon.
> >
> > I'm really wishing state.backend.async=false worked for
> > RocksDbStateBackend.
> >
> > I'm also wondering if my throttler would improve if I just connected to
> > the REST api to ask if any checkpoint is in progress, and then paused
> > inputs accordingly.  Effectively state.backend.async=false via hacked
> > application code.
> >
> >  > Where are you updating your state here? I
> >  > couldn't find any flink managed state here.
> >
> > The only updates to state I make are through the built-in
> > DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I
> > use .cogroup shows exactly two ways that .cogroup calls an
> > implementation of AppendingState.add.  I summarize those below.
> >
> > The two AppendingState subclasses invoked are HeapListState and
> > HeapReducingState.  Neither have a support attribute on them, such as
> > MapState's @PublicEvolving.
> >
> >  > I suggested updating the flink managed state using onTimer over an
> >  > interval equal to the checkpoint interval.
> >
> > So the onTimer method, with interval set to the checkpoint interval.
> > Interesting.
> >
> > It looks like the closest subclass for my use case use would be either
> > KeyedCoProcessFunction.  Let me see if I understand concretely the idea:
> >
> > 1) between checkpoints, read join input and write join output, by
> > loading any state reads from external state, but buffering all state
> > changes in memory in some kind of data structure.
> >
> > 2) whenever a checkpoint arrived or the memory consumed by buffered
> > writes gets too big, flush the writes to state.
> >
> > Is that the gist of the idea about .onTimer?
> >
> >
> > Jeff
> >
> >
> >
> > There are two paths from .coGroup to AppendingState.add
> >
> >  path 1 of 2: .coGroup to HeapListState
> >
> >  add:90, HeapListState {org.apache.flink.runtime.state.heap}
> >  processElement:203, EvictingWindowOperator
> > {org.apache.flink.streaming.runtime.operators.windowing}
> >  processElement:164, StreamOneInputProcessor
> > {org.apache.flink.streaming.runtime.io}
> >   

two phase aggregation

2020-06-22 Thread Fanbin Bu
Hi,

Does over window aggregation support two-phase mode?
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html#table-optimizer-agg-phase-strategy

SELECT
  user_id
  , event_time
  , listagg(event_type, '*') over w as names
FROM table
WINDOW w AS
( PARTITION BY user_id
  ORDER BY event_time
  ROWS BETWEEN 256 PRECEDING AND CURRENT ROW
)


Re: two phase aggregation

2020-06-22 Thread Jark Wu
Hi Fanbin,

Currently, over window aggregation doesn't support two-phase optimization.

Best,
Jark

On Tue, 23 Jun 2020 at 12:14, Fanbin Bu  wrote:

> Hi,
>
> Does over window aggregation support two-phase mode?
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html#table-optimizer-agg-phase-strategy
>
> SELECT
>   user_id
>   , event_time
>   , listagg(event_type, '*') over w as names
> FROM table
> WINDOW w AS
> ( PARTITION BY user_id
>   ORDER BY event_time
>   ROWS BETWEEN 256 PRECEDING AND CURRENT ROW
> )
>
>


Re: two phase aggregation

2020-06-22 Thread Fanbin Bu
Jark,
thanks for the reply. Do you know whether it's on the roadmap or what's the
plan?

On Mon, Jun 22, 2020 at 9:36 PM Jark Wu  wrote:

> Hi Fanbin,
>
> Currently, over window aggregation doesn't support two-phase optimization.
>
> Best,
> Jark
>
> On Tue, 23 Jun 2020 at 12:14, Fanbin Bu  wrote:
>
>> Hi,
>>
>> Does over window aggregation support two-phase mode?
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html#table-optimizer-agg-phase-strategy
>>
>> SELECT
>>   user_id
>>   , event_time
>>   , listagg(event_type, '*') over w as names
>> FROM table
>> WINDOW w AS
>> ( PARTITION BY user_id
>>   ORDER BY event_time
>>   ROWS BETWEEN 256 PRECEDING AND CURRENT ROW
>> )
>>
>>


Re: two phase aggregation

2020-06-22 Thread Jark Wu
AFAIK, this is not on the roadmap.

The problem is that it doesn't get much improvement for over window
aggregates.
If we support two-phase for over window aggregate, the local over operator
doesn't reduce any data,
it has to emit the same number of records it received, and can't reduce
pressure of the global operator.

Best,
Jark

On Tue, 23 Jun 2020 at 13:09, Fanbin Bu  wrote:

> Jark,
> thanks for the reply. Do you know whether it's on the roadmap or what's
> the plan?
>
> On Mon, Jun 22, 2020 at 9:36 PM Jark Wu  wrote:
>
>> Hi Fanbin,
>>
>> Currently, over window aggregation doesn't support two-phase optimization.
>>
>> Best,
>> Jark
>>
>> On Tue, 23 Jun 2020 at 12:14, Fanbin Bu  wrote:
>>
>>> Hi,
>>>
>>> Does over window aggregation support two-phase mode?
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html#table-optimizer-agg-phase-strategy
>>>
>>> SELECT
>>>   user_id
>>>   , event_time
>>>   , listagg(event_type, '*') over w as names
>>> FROM table
>>> WINDOW w AS
>>> ( PARTITION BY user_id
>>>   ORDER BY event_time
>>>   ROWS BETWEEN 256 PRECEDING AND CURRENT ROW
>>> )
>>>
>>>


Re: Rocksdb state directory path in EMR

2020-06-22 Thread Dawid Wysakowicz
Hi,

If I understand you correctly, you want to check the local RocksDB
files, right? They are stored locally on each TaskManager in a temporary
directory.

This can be configured via "state.backend.rocksdb.localdir"[1]. If not
specified it will use the globally defined temporary directory set via
"io.tmp.dirs"[2], which defaults to : "'LOCAL_DIRS' on Yarn.
'_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in
standalone."

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-localdir

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#io-tmp-dirs

On 22/06/2020 15:22, Sudan S wrote:
> Hi,
>
> I have enabled rocksdb(State store) with s3 (external checkpoint) on EMR .
>
> I am using rocksdb as state store with ValueState and checkpoints are
> stored in s3.
>
> I am able to see checkpoints in s3 and functionality with respect to
> state store is working fine.
>
> But i am trying to dissect rocksdb folder structure but i am not able
> to find them on EMR. I am checking /mnt/yarn folder.
>
> Cn u plz help on where flink persists store on EMR when rocksdb is
> used as state backend?
>
>
>
>
>
> 
> /"The information contained in this e-mail and any accompanying
> documents may contain information that is confidential or otherwise
> protected from disclosure. If you are not the intended recipient of
> this message, or if this message has been addressed to you in error,
> please immediately alert the sender by replying to this e-mail and
> then delete this message, including any attachments. Any
> dissemination, distribution or other use of the contents of this
> message by anyone other than the intended recipient is strictly
> prohibited. All messages sent to and from this e-mail address may be
> monitored as permitted by applicable law and regulations to ensure
> compliance with our internal policies and to protect our business."/
> 


signature.asc
Description: OpenPGP digital signature