Blink Planner fails to generate RowtimeAttribute for custom TableSource

2020-05-11 Thread Yuval Itzchakov
Hi,
While migrating from Flink 1.9 -> 1.10 and from the old planner to Blink,
I've ran into an issue where the Blink Planner doesn't take into account
the RowtimeAttribute defined on a custom table source. I've opened an
issue: https://issues.apache.org/jira/browse/FLINK-17600 and was wondering
if anyone else ran into this?

-- 
Best Regards,
Yuval Itzchakov.


Testing jobs locally agains secure Hadoop cluster

2020-05-11 Thread Őrhidi Mátyás
Dear Community,

I'm having troubles testing jobs against a secure Hadoop cluster. Is that
possible? The mini cluster seems to not load any security modules.

Thanks,
Matyas


Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-11 Thread luisfaamaral
Thanks Gordon and Seth for the reply.

So.. the main project contains the below flink dependencies...



And the state processor project contains the following:
1.9.0



At the first sight I may say all the libraries match to 1.9.0 flink
libraries within both projects.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-11 Thread Tzu-Li (Gordon) Tai
In that case, the most possible cause would be
https://issues.apache.org/jira/browse/FLINK-16313, which is included in
Flink 1.10.1 (to be released)

The release candidates for Flink 1.10.1 is currently ongoing, would it be
possible for you to try that out and see if the error still occurs?

On Mon, May 11, 2020 at 4:11 PM luisfaamaral 
wrote:

> Thanks Gordon and Seth for the reply.
>
> So.. the main project contains the below flink dependencies...
>
>
>
> And the state processor project contains the following:
> 1.9.0
>
>
>
> At the first sight I may say all the libraries match to 1.9.0 flink
> libraries within both projects.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Flink REST API side effect?

2020-05-11 Thread Tomasz Dudziak
Hi,

I found an interesting behaviour of the REST API in my automated system tests 
using that API where I was getting status of a purposefully failing job.

If you query job details immediately after job submission, subsequent queries 
will return its status as RUNNING for a moment until Flink's restart strategies 
are exhausted. If you don't and take a brief moment before your first query of  
the job status, it will be reported in a more fine-grained fashion, e.g. 
RESTARTING or FAILING.

Has anyone else noticed that behaviour?

Thanks,
Tomasz

Tomasz Dudziak | Marshall Wace LLP, George House, 131 Sloane Street, London, 
SW1X 9AT | E-mail: t.dudz...@mwam.com | Tel: +44 207 
024 7061


This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. Please 
refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
regarding this email. If we collect and use your personal data we will use it 
in accordance with our privacy policy, which can be reviewed at 
https://www.mwam.com/privacy-policy .

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.


Re: Flink REST API side effect?

2020-05-11 Thread Chesnay Schepler
This is expected, the backing data structure is cached for a while so we 
never hammer the JobManager with requests.


IIRC this is controlled via "web.refresh-interval", with the default 
being 3 seconds.


On 11/05/2020 14:10, Tomasz Dudziak wrote:


Hi,

I found an interesting behaviour of the REST API in my automated 
system tests using that API where I was getting status of a 
purposefully failing job.


If you query job details immediately after job submission, subsequent 
queries will return its status as RUNNING for a moment until Flink’s 
restart strategies are exhausted. If you don’t and take a brief moment 
before your first query of  the job status, it will be reported in a 
more fine-grained fashion, e.g. RESTARTING or FAILING.


Has anyone else noticed that behaviour?

Thanks,

Tomasz

*Tomasz Dudziak *| Marshall Wace LLP, George House, 131 Sloane Street, 
London, SW1X 9AT |**E-mail: t.dudz...@mwam.com 
| Tel: +44 207 024 7061


This e-mail and any attachments are confidential to the addressee(s) 
and may contain information that is legally privileged and/or 
confidential. Please refer to http://www.mwam.com/email-disclaimer-uk 
for important disclosures regarding this email. If we collect and use 
your personal data we will use it in accordance with our privacy 
policy, which can be reviewed at https://www.mwam.com/privacy-policy.


Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership 
registered in England and Wales with registered number OC302228 and 
registered office at George House, 131 Sloane Street, London, SW1X 
9AT. If you are receiving this e-mail as a client, or an investor in 
an investment vehicle, managed or advised by Marshall Wace North 
America L.P., the sender of this e-mail is communicating with you in 
the sender's capacity as an associated or related person of Marshall 
Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.






RE: Flink REST API side effect?

2020-05-11 Thread Tomasz Dudziak
Thanks for reply. So do I understand correctly if I say that whenever you query 
job status it gets cached for a configurable amount of time and subsequent 
queries within that time period will not show any change?

From: Chesnay Schepler 
Sent: 11 May 2020 13:20
To: Tomasz Dudziak ; user@flink.apache.org
Subject: Re: Flink REST API side effect?

This is expected, the backing data structure is cached for a while so we never 
hammer the JobManager with requests.

IIRC this is controlled via "web.refresh-interval", with the default being 3 
seconds.

On 11/05/2020 14:10, Tomasz Dudziak wrote:
Hi,

I found an interesting behaviour of the REST API in my automated system tests 
using that API where I was getting status of a purposefully failing job.

If you query job details immediately after job submission, subsequent queries 
will return its status as RUNNING for a moment until Flink's restart strategies 
are exhausted. If you don't and take a brief moment before your first query of  
the job status, it will be reported in a more fine-grained fashion, e.g. 
RESTARTING or FAILING.

Has anyone else noticed that behaviour?

Thanks,
Tomasz

Tomasz Dudziak | Marshall Wace LLP, George House, 131 Sloane Street, London, 
SW1X 9AT | E-mail: t.dudz...@mwam.com | Tel: +44 207 
024 7061




This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. Please 
refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
regarding this email. If we collect and use your personal data we will use it 
in accordance with our privacy policy, which can be reviewed at 
https://www.mwam.com/privacy-policy.

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.



This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. Please 
refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
regarding this email. If we collect and use your personal data we will use it 
in accordance with our privacy policy, which can be reviewed at 
https://www.mwam.com/privacy-policy .

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.


Re: Flink REST API side effect?

2020-05-11 Thread Chesnay Schepler

yes, that is correct.

On 11/05/2020 14:28, Tomasz Dudziak wrote:


Thanks for reply. So do I understand correctly if I say that whenever 
you query job status it gets cached for a configurable amount of time 
and subsequent queries within that time period will not show any change?


*From:*Chesnay Schepler 
*Sent:* 11 May 2020 13:20
*To:* Tomasz Dudziak ; user@flink.apache.org
*Subject:* Re: Flink REST API side effect?

This is expected, the backing data structure is cached for a while so 
we never hammer the JobManager with requests.


IIRC this is controlled via "web.refresh-interval", with the default 
being 3 seconds.


On 11/05/2020 14:10, Tomasz Dudziak wrote:

Hi,

I found an interesting behaviour of the REST API in my automated
system tests using that API where I was getting status of a
purposefully failing job.

If you query job details immediately after job submission,
subsequent queries will return its status as RUNNING for a moment
until Flink’s restart strategies are exhausted. If you don’t and
take a brief moment before your first query of  the job status, it
will be reported in a more fine-grained fashion, e.g. RESTARTING
or FAILING.

Has anyone else noticed that behaviour?

Thanks,

Tomasz

*Tomasz Dudziak *| Marshall Wace LLP, George House, 131 Sloane
Street, London, SW1X 9AT |**E-mail: t.dudz...@mwam.com
| Tel: +44 207 024 7061

This e-mail and any attachments are confidential to the
addressee(s) and may contain information that is legally
privileged and/or confidential. Please refer to
http://www.mwam.com/email-disclaimer-uk for important disclosures
regarding this email. If we collect and use your personal data we
will use it in accordance with our privacy policy, which can be
reviewed at https://www.mwam.com/privacy-policy.

Marshall Wace LLP is authorised and regulated by the Financial
Conduct Authority. Marshall Wace LLP is a limited liability
partnership registered in England and Wales with registered number
OC302228 and registered office at George House, 131 Sloane Street,
London, SW1X 9AT. If you are receiving this e-mail as a client, or
an investor in an investment vehicle, managed or advised by
Marshall Wace North America L.P., the sender of this e-mail is
communicating with you in the sender's capacity as an associated
or related person of Marshall Wace North America L.P., which is
registered with the US Securities and Exchange Commission as an
investment adviser.

This e-mail and any attachments are confidential to the addressee(s) 
and may contain information that is legally privileged and/or 
confidential. Please refer to http://www.mwam.com/email-disclaimer-uk 
for important disclosures regarding this email. If we collect and use 
your personal data we will use it in accordance with our privacy 
policy, which can be reviewed at https://www.mwam.com/privacy-policy.


Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership 
registered in England and Wales with registered number OC302228 and 
registered office at George House, 131 Sloane Street, London, SW1X 
9AT. If you are receiving this e-mail as a client, or an investor in 
an investment vehicle, managed or advised by Marshall Wace North 
America L.P., the sender of this e-mail is communicating with you in 
the sender's capacity as an associated or related person of Marshall 
Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.






Flink Memory analyze on AWS EMR

2020-05-11 Thread Jacky D
hi, All

I'm encounter a memory issue with my flink job on AWS EMR(current flink
version 1.6.2) , I would like to find the root cause so I'm trying JITWatch
on my local standalone cluster but I can not use it on EMR . after I add
following config on flink-conf.yaml :

env.java.opts: "-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading
-XX:+LogCompilation -XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"

I got error

2020-05-07 16:24:53,368 ERROR
org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while
running the Flink Yarn session.
java.lang.reflect.UndeclaredThrowableException
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1862)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:813)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Couldn't deploy Yarn session cluster
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:429)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:610)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:813)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
... 2 more
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment.

How can I fix this issue to enable JITWatch or which tool will be a proper
way to analyze flink mem dump on EMR  ?

Thanks
Jacky Du


Preserve record orders after WINDOW function

2020-05-11 Thread Jiahui Jiang
Hello! I'm writing a SQL query with a OVER window function ordered by 
processing time.

I'm wondering since timestamp is only millisecond granularity.

For a query using over window and sorted on processing time column, for example,
```
SELECT col1,
 max(col2) OVER (PARTITION BY col1, ORDER BY _processing_time_column)
FROM table
```

If 2 records have the same processing timestamp (they arrived the operator at 
the exact same millisecond), is the order guaranteed to be preserved after the 
window function? If not, what would to the recommended workaround to keep the 
order?

Thank you!
Jiahui


Not able to implement an usecase

2020-05-11 Thread Jaswin Shah
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process 
function catching up the discrepant results on joining. Joining is done on key 
orderId. Now, I want to identify all the messages in both datastreams which are 
not joined. Means, for a message in left stream if I do not find any message in 
right stream over the interval defined, then, that message should be caught and 
same for right stream if there are messages which do not have corresponding 
messages in left streams then, catch them.Need an help how can I achieve the 
use case. I know this can be done with outer join but interval join or tumbling 
event time window joins only support inner join as per my knowledge. I do not 
want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of 
the cases where large large delay can happen and messages in left or right 
streams are missing are not getting supported with my this implementaions:

/**
 * Join cart and pg streams on mid and orderId, and the interval specified.
 *
 * @param leftStream
 * @param rightStream
 * @return
 */
public SingleOutputStreamOperator 
intervalJoinCartAndPGStreams(DataStream leftStream, 
DataStream rightStream, ParameterTool parameter) {
//Descripant results are sent to kafka from CartPGProcessFunction.
return leftStream
.keyBy(new CartJoinColumnsSelector())
.intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))

.between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))),
 
Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND
.process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the 
datastreams I am reading from kafka to hive which I want to batch process with 
Flink

Please help me on resolving this use cases.

Thanks,
Jaswin


Get Outlook for Android


Re: MongoSink

2020-05-11 Thread Khachatryan Roman
Hi Aissa,

What is BSONWritable you pass from map to sink?
I guess it's not serializable which causes Flink to use kryo, which fails.

Regards,
Roman


On Sun, May 10, 2020 at 10:42 PM Aissa Elaffani 
wrote:

> Hello Guys,
> I am trying to sink my data to MongoDB, But i got some errors. I am
> sharing with you my MongoDB implemetation, and the errors that occurred. I
> hope someone can figure it out.
> Thank you for your time, I really appreciate it.
> Aissa
>


Re: Broadcast state vs data enrichment

2020-05-11 Thread Khachatryan Roman
Hi Manas,

The approaches you described looks the same:
> each operator only stores what it needs.
> each downstream operator will "strip off" the config parameter that it
needs.

Can you please explain the difference?

Regards,
Roman


On Mon, May 11, 2020 at 8:07 AM Manas Kale  wrote:

> Hi,
> I have a single broadcast message that contains configuration data
> consumed by different operators. For eg:
> config = {
> "config1" : 1,
> "config2" : 2,
> "config3" : 3
> }
>
> Operator 1 will consume config1 only, operator 2 will consume config2 only
> etc.
>
>
>- Right now in my implementation the config message gets broadcast
>over operators 1,2,3 and each operator only stores what it needs.
>
>
>- A different approach would be to broadcast the config message to a
>single root operator. This will then enrich event data flowing through it
>with config1,config2 and config3 and each downstream operator will "strip
>off" the config parameter that it needs.
>
>
> *I was wondering which approach would be the best to go with performance
> wise. *I don't really have the time to implement both and compare, so
> perhaps someone here already knows if one approach is better or both
> provide similar performance.
>
> FWIW, the config stream is very sporadic compared to the event stream.
>
> Thank you,
> Manas Kale
>
>
>
>


Re: Blink Planner fails to generate RowtimeAttribute for custom TableSource

2020-05-11 Thread Khachatryan Roman
Hi Yuval,

Thanks for reporting this issue. I'm pulling in Timo and Jark who are
working on the SQL component. They might be able to help you with your
problem.

Regards,
Roman


On Mon, May 11, 2020 at 9:10 AM Yuval Itzchakov  wrote:

> Hi,
> While migrating from Flink 1.9 -> 1.10 and from the old planner to Blink,
> I've ran into an issue where the Blink Planner doesn't take into account
> the RowtimeAttribute defined on a custom table source. I've opened an
> issue: https://issues.apache.org/jira/browse/FLINK-17600 and was
> wondering if anyone else ran into this?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Testing jobs locally agains secure Hadoop cluster

2020-05-11 Thread Khachatryan Roman
Hi Őrhidi,

Can you please provide some details about the errors you get?

Regards,
Roman


On Mon, May 11, 2020 at 9:32 AM Őrhidi Mátyás 
wrote:

> Dear Community,
>
> I'm having troubles testing jobs against a secure Hadoop cluster. Is that
> possible? The mini cluster seems to not load any security modules.
>
> Thanks,
> Matyas
>


Need suggestion on Flink-Kafka stream processing design

2020-05-11 Thread hemant singh
Hi,

I have different events from a device which constitutes different metrics
for same device. Each of these event is produced by the device in
interval of few milli seconds to a minute.

Event1(Device1) -> Stream1 -> Metric 1
Event2 (Device1) -> Stream2 -> Metric 2 ...
..
...
Event100(Device1) -> Stream100 -> Metric100

The number of events can go up to few 100s for each data protocol and we
have around 4-5 data protocols. Metrics from different streams makes up a
records
like for example from above example for device 1 -

Device1 -> Metric1, Metric 2, Metric15 forms a single record for the
device. Currently in development phase I am using interval join to achieve
this, that is to create a record with latest data from different
streams(events).

Based on data protocol I have 4-5 topics. Currently the data for a single
event is being pushed to a partition of the kafka topic(producer key ->
event_type + data_protocol). So essentially one topic is made up of many
streams. I am filtering on the key to define the streams.

My question is - Is this correct way to stream the data, I had thought of
maintaining different topic for an event, however in that case number of
topics could go to few thousands and that is something which becomes little
challenging to maintain and not sure if kafka handles that well.

I know there are traditional ways to do this like pushing it to
timeseries db and then joining data for different metric but that is
something which will never scale, also this processing should be as
realtime as possible.

Are there better ways to handle this use case or I am on correct path.

Thanks,
Hemant


Re: Flink Memory analyze on AWS EMR

2020-05-11 Thread Khachatryan Roman
Hi Jacky,

Did you try it without  -XX:LogFile=${FLINK_LOG_PREFIX}.jit ?
Probably, Flink can't write to this location.

Also, you can try other tools described at
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/application_profiling.html

Regards,
Roman


On Mon, May 11, 2020 at 5:02 PM Jacky D  wrote:

> hi, All
>
> I'm encounter a memory issue with my flink job on AWS EMR(current flink
> version 1.6.2) , I would like to find the root cause so I'm trying JITWatch
> on my local standalone cluster but I can not use it on EMR . after I add
> following config on flink-conf.yaml :
>
> env.java.opts: "-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading
> -XX:+LogCompilation -XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"
>
> I got error
>
> 2020-05-07 16:24:53,368 ERROR
> org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while
> running the Flink Yarn session.
> java.lang.reflect.UndeclaredThrowableException
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1862)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:813)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Couldn't deploy Yarn session cluster
> at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:429)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:610)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:813)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> ... 2 more
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment.
>
> How can I fix this issue to enable JITWatch or which tool will be a proper
> way to analyze flink mem dump on EMR  ?
>
> Thanks
> Jacky Du
>


Fwd: Flink Memory analyze on AWS EMR

2020-05-11 Thread Jacky D
-- Forwarded message -
发件人: Jacky D 
Date: 2020年5月11日周一 下午3:12
Subject: Re: Flink Memory analyze on AWS EMR
To: Khachatryan Roman 


Hi, Roman

Thanks for quick response , I tried without logFIle option but failed with
same error , I'm currently using flink 1.6
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/application_profiling.html,
so I can only use Jitwatch or JMC .  I guess those tools only available on
Standalone cluster ? as document mentioned "Each standalone JobManager,
TaskManager, HistoryServer, and ZooKeeper daemon redirects stdout and stderr to
a file with a .out filename suffix and writes internal logging to a file
with a .log suffix. Java options configured by the user in env.java.opts" ?

Thanks
Jacky


Re: Flink Memory analyze on AWS EMR

2020-05-11 Thread Robert Metzger
Hey Jacky,

The error says "The YARN application unexpectedly switched to state FAILED
during deployment.".
Have you tried retrieving the YARN application logs?
Does the YARN UI / resource manager logs reveal anything on the reason for
the deployment to fail?

Best,
Robert


On Mon, May 11, 2020 at 9:34 PM Jacky D  wrote:

>
>
> -- Forwarded message -
> 发件人: Jacky D 
> Date: 2020年5月11日周一 下午3:12
> Subject: Re: Flink Memory analyze on AWS EMR
> To: Khachatryan Roman 
>
>
> Hi, Roman
>
> Thanks for quick response , I tried without logFIle option but failed with
> same error , I'm currently using flink 1.6
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/application_profiling.html,
> so I can only use Jitwatch or JMC .  I guess those tools only available on
> Standalone cluster ? as document mentioned "Each standalone JobManager,
> TaskManager, HistoryServer, and ZooKeeper daemon redirects stdout and
> stderr to a file with a .out filename suffix and writes internal logging
> to a file with a .log suffix. Java options configured by the user in
> env.java.opts" ?
>
> Thanks
> Jacky
>


Re: Flink Memory analyze on AWS EMR

2020-05-11 Thread Jacky D
Hi,Robert

Yes , I tried to retrieve more log info from yarn UI , the full logs
showing below , this happens when I try to create a flink yarn session on
emr when set up jitwatch configuration .

2020-05-11 19:06:09,552 ERROR
org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while
running the Flink Yarn session.
java.lang.reflect.UndeclaredThrowableException
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1862)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:813)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Couldn't deploy Yarn session cluster
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:429)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:610)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:813)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
... 2 more
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment.
Diagnostics from YARN: Application application_1584459865196_0165 failed 1
times (global limit =2; local limit is =1) due to AM Container for
appattempt_1584459865196_0165_01 exited with  exitCode: 1
Failing this attempt.Diagnostics: Exception from container-launch.
Container id: container_1584459865196_0165_01_01
Exit code: 1
Exception message: Usage: java [-options] class [args...]
   (to execute a class)
   or  java [-options] -jar jarfile [args...]
   (to execute a jar file)
where options include:
-d32   use a 32-bit data model if available
-d64   use a 64-bit data model if available
-server   to select the "server" VM
  The default VM is server,
  because you are running on a server-class machine.


-cp 
-classpath 
  A : separated list of directories, JAR archives,
  and ZIP archives to search for class files.
-D=
  set a system property
-verbose:[class|gc|jni]
  enable verbose output
-version  print product version and exit
-version:
  Warning: this feature is deprecated and will be removed
  in a future release.
  require the specified version to run
-showversion  print product version and continue
-jre-restrict-search | -no-jre-restrict-search
  Warning: this feature is deprecated and will be removed
  in a future release.
  include/exclude user private JREs in the version search
-? -help  print this help message
-Xprint help on non-standard options
-ea[:...|:]
-enableassertions[:...|:]
  enable assertions with specified granularity
-da[:...|:]
-disableassertions[:...|:]
  disable assertions with specified granularity
-esa | -enablesystemassertions
  enable system assertions
-dsa | -disablesystemassertions
  disable system assertions
-agentlib:[=]
  load native agent library , e.g. -agentlib:hprof
  see also, -agentlib:jdwp=help and -agentlib:hprof=help
-agentpath:[=]
  load native agent library by full pathname
-javaagent:[=]
  load Java programming language agent, see
java.lang.instrument
-splash:
  show splash screen with specified image
See http://www.oracle.com/technetwork/java/javase/documentation/index.html
for more details.

Thanks
Jacky

Robert Metzger  于2020年5月11日周一 下午3:42写道:

> Hey Jacky,
>
> The error says "The YARN application unexpectedly switched to state FAILED
> during deployment.".
> Have you tried retrieving the YARN application logs?
> Does the YARN UI / resource manager logs reveal anything on the reason for
> the deployment to fail?
>
> Best,
> Robert
>
>
> On Mon, May 11, 2020 at 9:34 PM Jacky D  wrote:
>
>>
>>
>> -- Forwarded message -
>> 发件人: Jacky D 
>> Date: 2020年5月11日周一 下午3:12
>> Subject: Re: Flink Memory analyze on AWS EMR
>> To: Khachatryan Roman 
>>
>>
>> Hi, Roman
>>
>> Thanks for quick response , I tried without logFIle option but failed
>> with same error , I'm currently using flink 1.6
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/application_profiling.html,
>> so I can only use Jitwatch or JMC .  I guess those tools only available on
>> Standalone cluster ? as document mentioned "Each standalone JobManager,
>> TaskManager, HistoryServer, and ZooK

Re: Flink Memory analyze on AWS EMR

2020-05-11 Thread Robert Metzger
Thanks a lot for posting the full output.

It seems that Flink is passing an invalid list of arguments to the JVM.
Can you
- set the root log level in conf/log4j-yarn-session.properties to DEBUG
- then launch the YARN session
- share the log file of the yarn session on the mailing list?

I'm particularly interested in the line printed here, as it shows the JVM
invocation.
https://github.com/apache/flink/blob/release-1.6/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1630


On Mon, May 11, 2020 at 9:56 PM Jacky D  wrote:

> Hi,Robert
>
> Yes , I tried to retrieve more log info from yarn UI , the full logs
> showing below , this happens when I try to create a flink yarn session on
> emr when set up jitwatch configuration .
>
> 2020-05-11 19:06:09,552 ERROR
> org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while
> running the Flink Yarn session.
> java.lang.reflect.UndeclaredThrowableException
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1862)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:813)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Couldn't deploy Yarn session cluster
> at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:429)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:610)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:813)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> ... 2 more
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment.
> Diagnostics from YARN: Application application_1584459865196_0165 failed 1
> times (global limit =2; local limit is =1) due to AM Container for
> appattempt_1584459865196_0165_01 exited with  exitCode: 1
> Failing this attempt.Diagnostics: Exception from container-launch.
> Container id: container_1584459865196_0165_01_01
> Exit code: 1
> Exception message: Usage: java [-options] class [args...]
>(to execute a class)
>or  java [-options] -jar jarfile [args...]
>(to execute a jar file)
> where options include:
> -d32   use a 32-bit data model if available
> -d64   use a 64-bit data model if available
> -server   to select the "server" VM
>   The default VM is server,
>   because you are running on a server-class machine.
>
>
> -cp 
> -classpath 
>   A : separated list of directories, JAR archives,
>   and ZIP archives to search for class files.
> -D=
>   set a system property
> -verbose:[class|gc|jni]
>   enable verbose output
> -version  print product version and exit
> -version:
>   Warning: this feature is deprecated and will be removed
>   in a future release.
>   require the specified version to run
> -showversion  print product version and continue
> -jre-restrict-search | -no-jre-restrict-search
>   Warning: this feature is deprecated and will be removed
>   in a future release.
>   include/exclude user private JREs in the version search
> -? -help  print this help message
> -Xprint help on non-standard options
> -ea[:...|:]
> -enableassertions[:...|:]
>   enable assertions with specified granularity
> -da[:...|:]
> -disableassertions[:...|:]
>   disable assertions with specified granularity
> -esa | -enablesystemassertions
>   enable system assertions
> -dsa | -disablesystemassertions
>   disable system assertions
> -agentlib:[=]
>   load native agent library , e.g. -agentlib:hprof
>   see also, -agentlib:jdwp=help and -agentlib:hprof=help
> -agentpath:[=]
>   load native agent library by full pathname
> -javaagent:[=]
>   load Java programming language agent, see
> java.lang.instrument
> -splash:
>   show splash screen with specified image
> See http://www.oracle.com/technetwork/java/javase/documentation/index.html
> for more details.
>
> Thanks
> Jacky
>
> Robert Metzger  于2020年5月11日周一 下午3:42写道:
>
>> Hey Jacky,
>>
>> The error says "The YARN application unexpectedly switched to state
>> FAILED during deployment.".
>> Have you tried retrieving the YARN application logs?
>> Does the YARN UI / resou

Re: Not able to implement an usecase

2020-05-11 Thread Khachatryan Roman
Hi Jaswin,

Currently, DataStream API doesn't support outer joins.
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table
API [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html

Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah  wrote:

> Hi,
> I want to implement the below use case in my application:
> I am doing an interval join between two data streams and then, in process
> function catching up the discrepant results on joining. Joining is done on
> key orderId. Now, I want to identify all the messages in both datastreams
> which are not joined. Means, for a message in left stream if I do not
> find any message in right stream over the interval defined, then, that
> message should be caught and same for right stream if there are messages
> which do not have corresponding messages in left streams then, catch
> them.Need an help how can I achieve the use case. I know this can be done
> with outer join but interval join or tumbling event time window joins only
> support inner join as per my knowledge. I do not want to use table/sql api
> here but want to work on this datastream apis only.
>
> Currently I am using this which is working for 90 % of the cases but 10 %
> of the cases where large large delay can happen and messages in left or
> right streams are missing are not getting supported with my this
> implementaions:
>
> /**
>  * Join cart and pg streams on mid and orderId, and the interval specified.
>  *
>  * @param leftStream
>  * @param rightStream
>  * @return
>  */
> public SingleOutputStreamOperator 
> intervalJoinCartAndPGStreams(DataStream leftStream, 
> DataStream rightStream, ParameterTool parameter) {
> //Descripant results are sent to kafka from CartPGProcessFunction.
> return leftStream
> .keyBy(new CartJoinColumnsSelector())
> .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
> 
> .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))),
>  
> Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND
> .process(new CartPGProcessFunction());
>
> }
>
>
>
> Secondly, I am unable to find the streaming support to stream out the
> datastreams I am reading from kafka to hive which I want to batch process
> with Flink
>
> Please help me on resolving this use cases.
>
> Thanks,
> Jaswin
>
>
> Get Outlook for Android 
>


Re: Not able to implement an usecase

2020-05-11 Thread Jaswin Shah
If I go with table apis, can I write the streams to hive or it is only for 
batch processing as of now.

Get Outlook for Android


From: Khachatryan Roman 
Sent: Tuesday, May 12, 2020 1:49:10 AM
To: Jaswin Shah 
Cc: user@flink.apache.org 
Subject: Re: Not able to implement an usecase

Hi Jaswin,

Currently, DataStream API doesn't support outer joins.
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table API 
[2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html

Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah 
mailto:jaswin.s...@outlook.com>> wrote:
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process 
function catching up the discrepant results on joining. Joining is done on key 
orderId. Now, I want to identify all the messages in both datastreams which are 
not joined. Means, for a message in left stream if I do not find any message in 
right stream over the interval defined, then, that message should be caught and 
same for right stream if there are messages which do not have corresponding 
messages in left streams then, catch them.Need an help how can I achieve the 
use case. I know this can be done with outer join but interval join or tumbling 
event time window joins only support inner join as per my knowledge. I do not 
want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of 
the cases where large large delay can happen and messages in left or right 
streams are missing are not getting supported with my this implementaions:

/**
 * Join cart and pg streams on mid and orderId, and the interval specified.
 *
 * @param leftStream
 * @param rightStream
 * @return
 */
public SingleOutputStreamOperator 
intervalJoinCartAndPGStreams(DataStream leftStream, 
DataStream rightStream, ParameterTool parameter) {
//Descripant results are sent to kafka from CartPGProcessFunction.
return leftStream
.keyBy(new CartJoinColumnsSelector())
.intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))

.between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))),
 
Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND
.process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the 
datastreams I am reading from kafka to hive which I want to batch process with 
Flink

Please help me on resolving this use cases.

Thanks,
Jaswin


Get Outlook for Android


Re: Need suggestion on Flink-Kafka stream processing design

2020-05-11 Thread Khachatryan Roman
Hi Hemant,

As I understand, each protocol has a distinct set of event types (where
event type == metrics type); and a distinct set of devices. Is this correct?

> Based on data protocol I have 4-5 topics. Currently the data for a single
event is being pushed to a partition of the kafka topic(producer key ->
event_type + data_protocol).
Here you are talking about the source (to Flink job), right?

Can you also share how are you going to consume these data?


Regards,
Roman


On Mon, May 11, 2020 at 8:57 PM hemant singh  wrote:

> Hi,
>
> I have different events from a device which constitutes different metrics
> for same device. Each of these event is produced by the device in
> interval of few milli seconds to a minute.
>
> Event1(Device1) -> Stream1 -> Metric 1
> Event2 (Device1) -> Stream2 -> Metric 2 ...
> ..
> ...
> Event100(Device1) -> Stream100 -> Metric100
>
> The number of events can go up to few 100s for each data protocol and we
> have around 4-5 data protocols. Metrics from different streams makes up a
> records
> like for example from above example for device 1 -
>
> Device1 -> Metric1, Metric 2, Metric15 forms a single record for the
> device. Currently in development phase I am using interval join to achieve
> this, that is to create a record with latest data from different
> streams(events).
>
> Based on data protocol I have 4-5 topics. Currently the data for a single
> event is being pushed to a partition of the kafka topic(producer key ->
> event_type + data_protocol). So essentially one topic is made up of many
> streams. I am filtering on the key to define the streams.
>
> My question is - Is this correct way to stream the data, I had thought of
> maintaining different topic for an event, however in that case number of
> topics could go to few thousands and that is something which becomes little
> challenging to maintain and not sure if kafka handles that well.
>
> I know there are traditional ways to do this like pushing it to
> timeseries db and then joining data for different metric but that is
> something which will never scale, also this processing should be as
> realtime as possible.
>
> Are there better ways to handle this use case or I am on correct path.
>
> Thanks,
> Hemant
>


ProducerRecord with Kafka Sink for 1.8.0

2020-05-11 Thread Nick Bendtner
Hi guys,
I use 1.8.0 version for flink-connector-kafka. Do you have any
recommendations on how to produce a ProducerRecord from a kafka
sink. Looking to add support to kafka headers therefore thinking about
ProducerRecord. If you have any thoughts its highly appreciated.

Best,
Nick.


Re: Need suggestion on Flink-Kafka stream processing design

2020-05-11 Thread hemant singh
Hello Roman,

PFB my response -

As I understand, each protocol has a distinct set of event types (where
event type == metrics type); and a distinct set of devices. Is this correct?
Yes, correct. distinct events and devices. Each device emits these event.

> Based on data protocol I have 4-5 topics. Currently the data for a single
event is being pushed to a partition of the kafka topic(producer key ->
event_type + data_protocol).
Here you are talking about the source (to Flink job), right?
Yes, you are right.

Can you also share how are you going to consume these data?
By consume do you mean the downstream system?
If yes then this data will be written to a DB, some metrics goes to
TSDB(Influx) as well.

Thanks,
Hemant

On Tue, May 12, 2020 at 2:08 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Hemant,
>
> As I understand, each protocol has a distinct set of event types (where
> event type == metrics type); and a distinct set of devices. Is this correct?
>
> > Based on data protocol I have 4-5 topics. Currently the data for a
> single event is being pushed to a partition of the kafka topic(producer key
> -> event_type + data_protocol).
> Here you are talking about the source (to Flink job), right?
>
> Can you also share how are you going to consume these data?
>
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 8:57 PM hemant singh  wrote:
>
>> Hi,
>>
>> I have different events from a device which constitutes different metrics
>> for same device. Each of these event is produced by the device in
>> interval of few milli seconds to a minute.
>>
>> Event1(Device1) -> Stream1 -> Metric 1
>> Event2 (Device1) -> Stream2 -> Metric 2 ...
>> ..
>> ...
>> Event100(Device1) -> Stream100 -> Metric100
>>
>> The number of events can go up to few 100s for each data protocol and we
>> have around 4-5 data protocols. Metrics from different streams makes up a
>> records
>> like for example from above example for device 1 -
>>
>> Device1 -> Metric1, Metric 2, Metric15 forms a single record for the
>> device. Currently in development phase I am using interval join to achieve
>> this, that is to create a record with latest data from different
>> streams(events).
>>
>> Based on data protocol I have 4-5 topics. Currently the data for a single
>> event is being pushed to a partition of the kafka topic(producer key ->
>> event_type + data_protocol). So essentially one topic is made up of many
>> streams. I am filtering on the key to define the streams.
>>
>> My question is - Is this correct way to stream the data, I had thought of
>> maintaining different topic for an event, however in that case number of
>> topics could go to few thousands and that is something which becomes little
>> challenging to maintain and not sure if kafka handles that well.
>>
>> I know there are traditional ways to do this like pushing it to
>> timeseries db and then joining data for different metric but that is
>> something which will never scale, also this processing should be as
>> realtime as possible.
>>
>> Are there better ways to handle this use case or I am on correct path.
>>
>> Thanks,
>> Hemant
>>
>


Re: Flink Memory analyze on AWS EMR

2020-05-11 Thread Jacky D
hi, Robert

Thanks so much for quick reply  , I changed the log level to debug  and
attach the log file .

Thanks
Jacky

Robert Metzger  于2020年5月11日周一 下午4:14写道:

> Thanks a lot for posting the full output.
>
> It seems that Flink is passing an invalid list of arguments to the JVM.
> Can you
> - set the root log level in conf/log4j-yarn-session.properties to DEBUG
> - then launch the YARN session
> - share the log file of the yarn session on the mailing list?
>
> I'm particularly interested in the line printed here, as it shows the JVM
> invocation.
>
> https://github.com/apache/flink/blob/release-1.6/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1630
>
>
> On Mon, May 11, 2020 at 9:56 PM Jacky D  wrote:
>
>> Hi,Robert
>>
>> Yes , I tried to retrieve more log info from yarn UI , the full logs
>> showing below , this happens when I try to create a flink yarn session on
>> emr when set up jitwatch configuration .
>>
>> 2020-05-11 19:06:09,552 ERROR
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while
>> running the Flink Yarn session.
>> java.lang.reflect.UndeclaredThrowableException
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1862)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:813)
>> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
>> Couldn't deploy Yarn session cluster
>> at
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:429)
>> at
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:610)
>> at
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:813)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>> ... 2 more
>> Caused by:
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
>> The YARN application unexpectedly switched to state FAILED during
>> deployment.
>> Diagnostics from YARN: Application application_1584459865196_0165 failed
>> 1 times (global limit =2; local limit is =1) due to AM Container for
>> appattempt_1584459865196_0165_01 exited with  exitCode: 1
>> Failing this attempt.Diagnostics: Exception from container-launch.
>> Container id: container_1584459865196_0165_01_01
>> Exit code: 1
>> Exception message: Usage: java [-options] class [args...]
>>(to execute a class)
>>or  java [-options] -jar jarfile [args...]
>>(to execute a jar file)
>> where options include:
>> -d32   use a 32-bit data model if available
>> -d64   use a 64-bit data model if available
>> -server   to select the "server" VM
>>   The default VM is server,
>>   because you are running on a server-class machine.
>>
>>
>> -cp 
>> -classpath 
>>   A : separated list of directories, JAR archives,
>>   and ZIP archives to search for class files.
>> -D=
>>   set a system property
>> -verbose:[class|gc|jni]
>>   enable verbose output
>> -version  print product version and exit
>> -version:
>>   Warning: this feature is deprecated and will be removed
>>   in a future release.
>>   require the specified version to run
>> -showversion  print product version and continue
>> -jre-restrict-search | -no-jre-restrict-search
>>   Warning: this feature is deprecated and will be removed
>>   in a future release.
>>   include/exclude user private JREs in the version search
>> -? -help  print this help message
>> -Xprint help on non-standard options
>> -ea[:...|:]
>> -enableassertions[:...|:]
>>   enable assertions with specified granularity
>> -da[:...|:]
>> -disableassertions[:...|:]
>>   disable assertions with specified granularity
>> -esa | -enablesystemassertions
>>   enable system assertions
>> -dsa | -disablesystemassertions
>>   disable system assertions
>> -agentlib:[=]
>>   load native agent library , e.g.
>> -agentlib:hprof
>>   see also, -agentlib:jdwp=help and -agentlib:hprof=help
>> -agentpath:[=]
>>   load native agent library by full pathname
>> -javaagent:[=]
>>   load Java programming language agent, see
>> java.lang.instrument
>> -splash:
>>   show splash screen with specified image
>> See
>> http://www.oracle.com/technetwork/java/javase/documentation/index.html

Re: Preserve record orders after WINDOW function

2020-05-11 Thread Jark Wu
Hi Jiahui,

Yes, if they arrive at the same millisecond, they are perserved in the
arriving order.

Best,
Jark



On Mon, 11 May 2020 at 23:17, Jiahui Jiang  wrote:

> Hello! I'm writing a SQL query with a OVER window function ordered by
> processing time.
>
> I'm wondering since timestamp is only millisecond granularity.
>
> For a query using over window and sorted on processing time column, for
> example,
> ```
> SELECT col1,
>  max(col2) OVER (PARTITION BY col1, ORDER BY _processing_time_column)
> FROM table
> ```
>
> If 2 records have the same processing timestamp (they arrived the operator
> at the exact same millisecond), is the order guaranteed to be preserved
> after the window function? If not, what would to the recommended workaround
> to keep the order?
>
> Thank you!
> Jiahui
>


Re: Flink Memory analyze on AWS EMR

2020-05-11 Thread Xintong Song
Hi Jacky,

Could you search for "Application Master start command:" in the debug log
and post the result and a few lines before & after that? This is not
included in the clip of attached log file.

Thank you~

Xintong Song



On Tue, May 12, 2020 at 5:33 AM Jacky D  wrote:

> hi, Robert
>
> Thanks so much for quick reply  , I changed the log level to debug  and
> attach the log file .
>
> Thanks
> Jacky
>
> Robert Metzger  于2020年5月11日周一 下午4:14写道:
>
>> Thanks a lot for posting the full output.
>>
>> It seems that Flink is passing an invalid list of arguments to the JVM.
>> Can you
>> - set the root log level in conf/log4j-yarn-session.properties to DEBUG
>> - then launch the YARN session
>> - share the log file of the yarn session on the mailing list?
>>
>> I'm particularly interested in the line printed here, as it shows the JVM
>> invocation.
>>
>> https://github.com/apache/flink/blob/release-1.6/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1630
>>
>>
>> On Mon, May 11, 2020 at 9:56 PM Jacky D  wrote:
>>
>>> Hi,Robert
>>>
>>> Yes , I tried to retrieve more log info from yarn UI , the full logs
>>> showing below , this happens when I try to create a flink yarn session on
>>> emr when set up jitwatch configuration .
>>>
>>> 2020-05-11 19:06:09,552 ERROR
>>> org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while
>>> running the Flink Yarn session.
>>> java.lang.reflect.UndeclaredThrowableException
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1862)
>>> at
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> at
>>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:813)
>>> Caused by:
>>> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
>>> deploy Yarn session cluster
>>> at
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:429)
>>> at
>>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:610)
>>> at
>>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:813)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>>> ... 2 more
>>> Caused by:
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
>>> The YARN application unexpectedly switched to state FAILED during
>>> deployment.
>>> Diagnostics from YARN: Application application_1584459865196_0165 failed
>>> 1 times (global limit =2; local limit is =1) due to AM Container for
>>> appattempt_1584459865196_0165_01 exited with  exitCode: 1
>>> Failing this attempt.Diagnostics: Exception from container-launch.
>>> Container id: container_1584459865196_0165_01_01
>>> Exit code: 1
>>> Exception message: Usage: java [-options] class [args...]
>>>(to execute a class)
>>>or  java [-options] -jar jarfile [args...]
>>>(to execute a jar file)
>>> where options include:
>>> -d32   use a 32-bit data model if available
>>> -d64   use a 64-bit data model if available
>>> -server   to select the "server" VM
>>>   The default VM is server,
>>>   because you are running on a server-class machine.
>>>
>>>
>>> -cp 
>>> -classpath 
>>>   A : separated list of directories, JAR archives,
>>>   and ZIP archives to search for class files.
>>> -D=
>>>   set a system property
>>> -verbose:[class|gc|jni]
>>>   enable verbose output
>>> -version  print product version and exit
>>> -version:
>>>   Warning: this feature is deprecated and will be removed
>>>   in a future release.
>>>   require the specified version to run
>>> -showversion  print product version and continue
>>> -jre-restrict-search | -no-jre-restrict-search
>>>   Warning: this feature is deprecated and will be removed
>>>   in a future release.
>>>   include/exclude user private JREs in the version search
>>> -? -help  print this help message
>>> -Xprint help on non-standard options
>>> -ea[:...|:]
>>> -enableassertions[:...|:]
>>>   enable assertions with specified granularity
>>> -da[:...|:]
>>> -disableassertions[:...|:]
>>>   disable assertions with specified granularity
>>> -esa | -enablesystemassertions
>>>   enable system assertions
>>> -dsa | -disablesystemassertions
>>>   disable system assertions
>>> -agentlib:[=]
>>>   load native agent library , e.g.
>>> -agentlib:hprof
>>>   see also, -a

Re: Blink Planner fails to generate RowtimeAttribute for custom TableSource

2020-05-11 Thread Jark Wu
Thanks for the investigation, and I think yes, this is a bug and is going
to be fixed in FLINK-16160.

Best,
Jark

On Tue, 12 May 2020 at 02:28, Khachatryan Roman 
wrote:

> Hi Yuval,
>
> Thanks for reporting this issue. I'm pulling in Timo and Jark who are
> working on the SQL component. They might be able to help you with your
> problem.
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 9:10 AM Yuval Itzchakov  wrote:
>
>> Hi,
>> While migrating from Flink 1.9 -> 1.10 and from the old planner to Blink,
>> I've ran into an issue where the Blink Planner doesn't take into account
>> the RowtimeAttribute defined on a custom table source. I've opened an
>> issue: https://issues.apache.org/jira/browse/FLINK-17600 and was
>> wondering if anyone else ran into this?
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>


Re: Preserve record orders after WINDOW function

2020-05-11 Thread Jiahui Jiang
Thank you for confirming!

Just want to make sure my understanding of the internal implementation is 
correct:

When applying an over window and ordered by processing time using SQL, the 
datastream plan it translates into doesn't actually have an order by logic. It 
just sequentially process all the elements that fall into this window in their 
order inside the stream. Is that correct?

Thanks again! 😊

From: Jark Wu 
Sent: Monday, May 11, 2020 8:52 PM
To: Jiahui Jiang 
Cc: user@flink.apache.org 
Subject: Re: Preserve record orders after WINDOW function

Hi Jiahui,

Yes, if they arrive at the same millisecond, they are perserved in the arriving 
order.

Best,
Jark



On Mon, 11 May 2020 at 23:17, Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Hello! I'm writing a SQL query with a OVER window function ordered by 
processing time.

I'm wondering since timestamp is only millisecond granularity.

For a query using over window and sorted on processing time column, for example,
```
SELECT col1,
 max(col2) OVER (PARTITION BY col1, ORDER BY _processing_time_column)
FROM table
```

If 2 records have the same processing timestamp (they arrived the operator at 
the exact same millisecond), is the order guaranteed to be preserved after the 
window function? If not, what would to the recommended workaround to keep the 
order?

Thank you!
Jiahui


Re: Broadcast state vs data enrichment

2020-05-11 Thread Manas Kale
Sure. Apologies for not making this clear enough.

> each operator only stores what it needs.
Lets imagine this setup :

BROADCAST STREAM
config-stream 

|   |  |
event-stream--> operator1-->
operator2-> operator3


In this scenario, all 3 operators will be BroadcastProcessFunctions. Each
of them will receive the whole config message in their
processBroadcastElement method, but each one will only store what it needs
in their state store. So even though operator1 will receive
 config = {
"config1" : 1,
"config2" : 2,
"config3" : 3
}
it will only store config1.

> each downstream operator will "strip off" the config parameter that it
needs.

BROADCAST STREAM
config-stream -
  |
event-stream-->  enricher -->
operator1--> operator2-> operator3

In this case, the enricher operator will store the whole config message.
When an event message arrives, this operator will append config1, config2
and config3 to it. Operator 1 will extract and use config1, and output a
message that has config1 stripped off.

I hope that helps!

Perhaps I am being too pedantic but I would like to know if these two
methods have comparable performance differences and if so which one would
be preferred.




On Mon, May 11, 2020 at 11:46 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Manas,
>
> The approaches you described looks the same:
> > each operator only stores what it needs.
> > each downstream operator will "strip off" the config parameter that it
> needs.
>
> Can you please explain the difference?
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 8:07 AM Manas Kale  wrote:
>
>> Hi,
>> I have a single broadcast message that contains configuration data
>> consumed by different operators. For eg:
>> config = {
>> "config1" : 1,
>> "config2" : 2,
>> "config3" : 3
>> }
>>
>> Operator 1 will consume config1 only, operator 2 will consume config2
>> only etc.
>>
>>
>>- Right now in my implementation the config message gets broadcast
>>over operators 1,2,3 and each operator only stores what it needs.
>>
>>
>>- A different approach would be to broadcast the config message to a
>>single root operator. This will then enrich event data flowing through it
>>with config1,config2 and config3 and each downstream operator will "strip
>>off" the config parameter that it needs.
>>
>>
>> *I was wondering which approach would be the best to go with performance
>> wise. *I don't really have the time to implement both and compare, so
>> perhaps someone here already knows if one approach is better or both
>> provide similar performance.
>>
>> FWIW, the config stream is very sporadic compared to the event stream.
>>
>> Thank you,
>> Manas Kale
>>
>>
>>
>>


Re: Preserve record orders after WINDOW function

2020-05-11 Thread Jark Wu
Yes, that's right.

On Tue, 12 May 2020 at 10:55, Jiahui Jiang  wrote:

> Thank you for confirming!
>
> Just want to make sure my understanding of the internal implementation is
> correct:
>
> When applying an over window and ordered by processing time using SQL, the
> datastream plan it translates into doesn't actually have an order by logic.
> It just sequentially process all the elements that fall into this window in
> their order inside the stream. Is that correct?
>
> Thanks again! 😊
> --
> *From:* Jark Wu 
> *Sent:* Monday, May 11, 2020 8:52 PM
> *To:* Jiahui Jiang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Preserve record orders after WINDOW function
>
> Hi Jiahui,
>
> Yes, if they arrive at the same millisecond, they are perserved in the
> arriving order.
>
> Best,
> Jark
>
>
>
> On Mon, 11 May 2020 at 23:17, Jiahui Jiang 
> wrote:
>
> Hello! I'm writing a SQL query with a OVER window function ordered by
> processing time.
>
> I'm wondering since timestamp is only millisecond granularity.
>
> For a query using over window and sorted on processing time column, for
> example,
> ```
> SELECT col1,
>  max(col2) OVER (PARTITION BY col1, ORDER BY _processing_time_column)
> FROM table
> ```
>
> If 2 records have the same processing timestamp (they arrived the operator
> at the exact same millisecond), is the order guaranteed to be preserved
> after the window function? If not, what would to the recommended workaround
> to keep the order?
>
> Thank you!
> Jiahui
>
>