A litte doubt about the usage of checkpointLock

2018-08-14 Thread aitozi
Hi, community

I see the checkpointLock is used in StreamTask to ensure that we don't have
concurrent method calls that void consistent checkpoints.

As i known, it is used in the data consume , state interactive and the
timerService, But I am doubt that, if an application don't enable the
checkpoint, it still have to deal with the checkpointLock. Is it necessary ? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Managed Keyed state update

2018-08-14 Thread Fabian Hueske
Hi,

It is recommended to always call update().

State modifications by modifying objects is only possible because the heap
based backends do not serialize or copy records to avoid additional costs.
Hence, this is rather a side effect than a provided API. As soon as you
change the state backend, state modifications might be lost if you do not
call update().

Best, Fabian

2018-08-14 2:07 GMT+02:00 Alexey Trenikhun :

> Clear. Thank you
>
> Get Outlook for iOS 
>
> --
> *From:* Renjie Liu 
> *Sent:* Monday, August 13, 2018 4:33 PM
> *To:* Alexey Trenikhun
> *Cc:* user@flink.apache.org
> *Subject:* Re: Managed Keyed state update
>
> Hi, Alexey:
> It depends on the state backend you use. If you use heap memory backend,
> then you don't need to do put again.
> However, if you use rocksdb state backend, then you need to do the put
> again so that it will be saved by the checkpoint.
>
> On Tue, Aug 14, 2018 at 4:58 AM Alexey Trenikhun  wrote:
>
>> Let’s say I have Managed Keyed state - MapState>
>> x, I initialize for state for “k0” - x.put(“k0”, new Tuple2<>(“a”, “b”));
>> Later  I retried state Tuple2 v = x.get(“k0”); and change
>> value: v.f0=“U”;, does it make state ‘dirty’? In other words, do I need to
>> call x.put(“k0”, v) again or change will be saved by checkpoint anyway
>> because value was changed?
>>
>> Alexey
>>
> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: Limit on number of files to read for Dataset

2018-08-14 Thread Fabian Hueske
Hi,

Flink InputFormats generate their InputSplits sequentially on the
JobManager.
These splits are stored in the heap of the JM process and handed out to
SourceTasks when they request them lazily.
Split assignment is done by a InputSplitAssigner, that can be customized.
FileInputFormats typically use a LocatableInputSplitAssigner which tries to
assign splits based on locality.

I see three potential problems:
1) InputSplit generation might take a long while. The JM is blocked until
splits are generated.
2) All InputSplits need to be stored on the JM heap. You might need to
assign more memory to the JM process.
3) Split assignment might take a while depending on the complexity of the
InputSplitAssigner. You can implement a custom assigner to make this more
efficient (from an assignment point of view).

Best, Fabian

2018-08-14 8:19 GMT+02:00 Jörn Franke :

> It causes more overhead (processes etc) which might make it slower.
> Furthermore if you have them stored on HDFS then the bottleneck is the
> namenode which will have to answer millions of requests.
> The latter point will change in future Hadoop versions with
> http://ozone.hadoop.apache.org/
>
> On 13. Aug 2018, at 21:01, Darshan Singh  wrote:
>
> Hi Guys,
>
> Is there a limit on number of files flink dataset can read? My question is
> will there be any sort of issues if I have say millions of files to read to
> create single dataset.
>
> Thanks
>
>


Re: flink telemetry/metrics

2018-08-14 Thread Chesnay Schepler
How often is the warning logged? The default reporting interval is 10 
seconds, if a report is interrupted it can take a while for metrics to 
show up.


Could this also be caused by the MAX_CREATES_PER_MINUTE setting in 
carbon.conf being set too low?


On 13.08.2018 21:31, John O wrote:


I have tried two reporter types (Graphite, JMX)

Graphite

metrics.reporters: grph

metrics.reporter.grph.class: 
org.apache.flink.metrics.graphite.GraphiteReporter


metrics.reporter.grph.host: 

metrics.reporter.grph.port: 2003

metrics.reporter.grph.protocol: TCP

What I see on graphite are incomplete metrics. Some taskIdx (same 
process) will show different sets of metrics. For example,


-Sink->test

-0: currentInputWatermark, numRecordsIn

-1: numRecordsOutPerSecond

-2: numRecordsIn, numRecordsOut

-4: currentInputWatermark

I would expect seeing

-Sink->test

-0: currentInputWatermark, numRecordsIn, numRecordsOut, 
numRecordsOutPerSecond, numRecordsInPerSecond…


-1: currentInputWatermark, numRecordsIn, numRecordsOut, 
numRecordsOutPerSecond, numRecordsInPerSecond…


-2: currentInputWatermark, numRecordsIn, numRecordsOut, 
numRecordsOutPerSecond, numRecordsInPerSecond…


-4: currentInputWatermark, numRecordsIn, numRecordsOut, 
numRecordsOutPerSecond, numRecordsInPerSecond…




JMX

metrics.reporters: jmx

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter

metrics.reporter.jmx.port: 8789

I used visualvm with mbean explorer to view the exposed mbeans.

In this setup, I do see the expected metrics but the organization 
makes it difficult to find


I also get the following WARNing

2018-08-09 18:36:55,943 WARN 
org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while 
reporting metrics


java.util.ConcurrentModificationException

at 
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)


at 
java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)


at 
java.util.AbstractCollection.addAll(AbstractCollection.java:343)


at java.util.HashSet.(HashSet.java:120)

at 
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)


at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)


at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)


at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)


at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)


at 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:37)


at 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:27)


at 
org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper.getValue(FlinkGaugeWrapper.java:36)


at 
com.codahale.metrics.graphite.GraphiteReporter.reportGauge(GraphiteReporter.java:279)


at 
com.codahale.metrics.graphite.GraphiteReporter.report(GraphiteReporter.java:169)


at 
org.apache.flink.dropwizard.ScheduledDropwizardReporter.report(ScheduledDropwizardReporter.java:231)


at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:427)


at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)


at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)


at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)


at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)


at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)


at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)


at java.lang.Thread.run(Thread.java:748)

I am using flink 1.5.1

J.

*From:*Chesnay Schepler 
*Sent:* Friday, August 10, 2018 2:08 AM
*To:* John O ; user@flink.apache.org
*Subject:* Re: flink telemetry/metrics

What is wrong with the metrics that are shown in graphite?

Can you provide us with the metrics section of your flink-conf.yaml?

Are there any metric-related warnings in the TaskManager logs?

On 09.08.2018 01:38, John O wrote:

I’m working on getting a flink job into production. As part of the
production requirement, I need telemetry/metrics insight into my
flink job. I have followed instructions in

https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html

- Added the flink graphite jar to taskmanager/jobmanager lib folder
- confligured flink-conf.yaml to enable graphite reporter

- Added a simple counter in my flink code

When I submit the job, I can see my counter show

Re: Flink REST api for cancel with savepoint on yarn

2018-08-14 Thread Gary Yao
Hi Vipul,

We are aware of YARN-2031. There are some ideas how to workaround it, which
are tracked here:

https://issues.apache.org/jira/browse/FLINK-9478

At the moment you have the following options:

1. Find out the master's address from ZooKeeper [1] and issue the HTTP
request against the master directly.
2. Use the Flink CLI. The CLI will lookup the master's address.

Best,
Gary

[1]
https://github.com/apache/flink/blob/9614310ef207dd29dfee76cd878825534f41ff00/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L162

On Tue, Aug 14, 2018 at 8:43 AM, vipul singh  wrote:

> Hello,
>
> I have a question about flink 1.5/1.6 REST endpoints. I was trying to see
> how the rest endpoints have changed wrt to cancelling with savepoint; it
> seems like now to cancel with savepoint one need to use POST api /
> jobs/:jobid/savepoints
> 
>
> While trying to run these queries on yarn, it seems like yarn currently
> dosent support these POST calls: https://issues.apache.
> org/jira/browse/YARN-2031
>
> On digging further and looking at this email
> ,
> it seems like there is a /jobs/:jobid/yarn-cancel api which does
> something similar, but that dosent support cancel-with-savepoin currentlyt.
> Are there any plans to add this functionality to this API for future? Is
> there any workaround for this for now?
>
> Thanks,
> Vipul
>


Re: Standalone cluster instability

2018-08-14 Thread Piotr Nowojski
Hi,

Good that we are more or less on track with this problem :) But the problem 
here is not that heap size is too small, bot that your kernel is running out of 
memory and starts killing processes. Either:

1. some other process is using the available memory 
2. Increase memory allocation on your machine/virtual machine/container/cgroup
3. Decrease the heap size of Flink’s JVM or non heap size (decrease network 
memory buffer pool). Of course for any given job/state 
size/configuration/cluster size there is some minimal reasonable memory size 
that you have to assign to Flink, otherwise you will have poor performance 
and/or constant garbage collections and/or you will start getting OOM errors 
from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on a 
different level).

Piotrek

> On 14 Aug 2018, at 07:36, Shailesh Jain  wrote:
> 
> Hi Piotrek,
> 
> Thanks for your reply. I checked through the syslogs for that time, and I see 
> this:
> 
> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill 
> process 2305 (java) score 468 or sacrifice child
> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 (java) 
> total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
> 
> As you pointed out, kernel killed the task manager process.
> 
> If I had already set the max heap size for the JVM (to 3GB in this case), and 
> the memory usage stats showed 2329MB being used 90 seconds earlier, it seems 
> a bit unlikely for operators to consume 700 MB heap space in that short time, 
> because our events ingestion rate is not that high (close to 10 events per 
> minute).
> 
> 2018-08-08 13:19:23,341 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage 
> stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
> 
> Is it possible to log individual operator's memory consumption? This would 
> help in narrowing down on the root cause. There were around 50 operators 
> running (~8 kafka source/sink, ~8 Window operators, and the rest CEP 
> operators).
> 
> Thanks,
> Shailesh
> 
> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski  > wrote:
> Hi,
> 
> Please post full TaskManager logs, including stderr and stdout. (Have you 
> checked the stderr/stdout for some messages?)
> 
> I could think of couple reasons:
> 1. process segfault
> 2. process killed by OS
> 3. OS failure
> 
> 1. Should be visible by some message in stderr/stdout file and can be caused 
> by for example JVM, RocksDB or some other native library/code bug. 
> 2. Is your system maybe running out of memory? Kernel might kill process if 
> that’s happening. You can also check system (linux?) logs for errors that 
> correlate in time. Where are those logs depend on your OS. 
> 3. This might be tricky, but I have seen kernel failures that prevented any 
> messages from being logged for example. Besides this TaskManager failure is 
> your machine operating normally without any other problems/crashes/restarts?
> 
> Piotrek
> 
>> On 10 Aug 2018, at 06:59, Shailesh Jain > > wrote:
>> 
>> Hi,
>> 
>> I hit a similar issue yesterday, the task manager died suspiciously, no 
>> error logs in the task manager logs, but I see the following exceptions in 
>> the job manager logs:
>> 
>> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting   
>>- Association to [akka.tcp://flink@localhost:34483 <>] with 
>> UID [328996232] irrecoverably failed. Quarantining address.
>> java.util.concurrent.TimeoutException: Remote system has been silent for too 
>> long. (more than 48.0 hours)
>> at 
>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at 
>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at 
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 
>> but almost 3 days later it hit this:
>> 
>> 2018-08-08 13:22:00,061 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Internal 
>> state machine job (1057c13d169dae609466210174e2cc8b) switched from state 
>> RUNNING to FAILING.
>> java.lang.Exception: TaskManager was lost/killed: 
>> 5ee5de1112776c404541743b63ae0fe

Re: A litte doubt about the usage of checkpointLock

2018-08-14 Thread Andrey Zagrebin
Hi,

there are at least 3 main players which use the lock to sync state access 
between each other:
thread processing records in user code, checkpointing thread and processing 
timers callback thread.

I would still recommend to follow the contract and use the lock where required 
(e.g. custom source)
because whether it is needed or not eventually depends a lot on internal 
implementation details which might change. If there is no contention, it will 
be optimised by JVM and should not degrade performance too much.

Except custom sources as documented, application code is usually not required 
to deal with the lock in Flink API at all, e.g. in user processing functions.

Cheers,
Andrey

> On 14 Aug 2018, at 09:19, aitozi  wrote:
> 
> Hi, community
> 
> I see the checkpointLock is used in StreamTask to ensure that we don't have
> concurrent method calls that void consistent checkpoints.
> 
> As i known, it is used in the data consume , state interactive and the
> timerService, But I am doubt that, if an application don't enable the
> checkpoint, it still have to deal with the checkpointLock. Is it necessary ? 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



how to assign issue to someone

2018-08-14 Thread Guibo Pan
Hello, I am a new user for flink jira. I reported an issue and would like
to fix it, however I found I could not assign it to myself, or anyone.
Is there anyone to tell me how to do this?
Thanks.


Re: how to assign issue to someone

2018-08-14 Thread Fabian Hueske
Hi,

I've given you Contributor permissions for Jira and assigned the issue to
you.
You can now also assign other issue to you.

Looking forward to your contribution.

Best, Fabian

2018-08-14 19:45 GMT+02:00 Guibo Pan :

> Hello, I am a new user for flink jira. I reported an issue and would like
> to fix it, however I found I could not assign it to myself, or anyone.
> Is there anyone to tell me how to do this?
> Thanks.
>


Re: Limit on number of files to read for Dataset

2018-08-14 Thread Darshan Singh
Thanks all for your responses. I am now much more clearer on this.

Thanks

On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske  wrote:

> Hi,
>
> Flink InputFormats generate their InputSplits sequentially on the
> JobManager.
> These splits are stored in the heap of the JM process and handed out to
> SourceTasks when they request them lazily.
> Split assignment is done by a InputSplitAssigner, that can be customized.
> FileInputFormats typically use a LocatableInputSplitAssigner which tries to
> assign splits based on locality.
>
> I see three potential problems:
> 1) InputSplit generation might take a long while. The JM is blocked until
> splits are generated.
> 2) All InputSplits need to be stored on the JM heap. You might need to
> assign more memory to the JM process.
> 3) Split assignment might take a while depending on the complexity of the
> InputSplitAssigner. You can implement a custom assigner to make this more
> efficient (from an assignment point of view).
>
> Best, Fabian
>
> 2018-08-14 8:19 GMT+02:00 Jörn Franke :
>
>> It causes more overhead (processes etc) which might make it slower.
>> Furthermore if you have them stored on HDFS then the bottleneck is the
>> namenode which will have to answer millions of requests.
>> The latter point will change in future Hadoop versions with
>> http://ozone.hadoop.apache.org/
>>
>> On 13. Aug 2018, at 21:01, Darshan Singh  wrote:
>>
>> Hi Guys,
>>
>> Is there a limit on number of files flink dataset can read? My question
>> is will there be any sort of issues if I have say millions of files to read
>> to create single dataset.
>>
>> Thanks
>>
>>
>


Re: Limit on number of files to read for Dataset

2018-08-14 Thread Darshan Singh
Thanks for the details. I got it working. I have around 1 directory for
each month and I am running for 12-15 month data.So I created a dataset
from each month and did a union.

However, when I run I get the HTTP timeout issue. I am reading more than
120K files in total in all of months.

I am using S3 and emr to do this and flink version is 1.4.2. When I run for
6 months this works fine.

Below is part of error

Caused by: java.io.IOException: Error opening the Input Split s3://.gz
[0,-1]: Unable to execute HTTP request: Timeout waiting for connection from
pool
at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by:
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable
to execute HTTP request: Timeout waiting for connection from pool
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
at
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22)
at
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9)
at
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:80)
at
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99)
at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452)
at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439)
at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
at com.sun.proxy.$Proxy28.retrievePair(Unknown Source)
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1213)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at
org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
Caused by:
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
Timeout waiting for connection from pool
at
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
at
com.amazon.ws.emr.hadoop.fs.shaded.or

Re: 1.5.1

2018-08-14 Thread Gary Yao
Hi Juho,

It seems in your case the JobMaster did not receive a heartbeat from the
TaskManager in time [1]. Heartbeat requests and answers are sent over the
RPC
framework, and RPCs of one component (e.g., TaskManager, JobMaster, etc.)
are
dispatched by a single thread. Therefore, the reasons for heartbeats
timeouts
include:

1. The RPC threads of the TM or JM are blocked. In this case heartbeat
requests or answers cannot be dispatched.
2. The scheduled task for sending the heartbeat requests [2] died.
3. The network is flaky.

If you are confident that the network is not the culprit, I would suggest to
set the logging level to DEBUG, and look for periodic log messages (JM and
TM
logs) that are related to heartbeating. If the periodic log messages are
overdue, it is a hint that the main thread of the RPC endpoint is blocked
somewhere.

Best,
Gary

[1]
https://github.com/apache/flink/blob/release-1.5.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1611
[2]
https://github.com/apache/flink/blob/913b0413882939c30da4ad4df0cabc84dfe69ea0/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java#L64

On Mon, Aug 13, 2018 at 9:52 AM, Juho Autio  wrote:

> I also have jobs failing on a daily basis with the error "Heartbeat of
> TaskManager with id  timed out". I'm using Flink 1.5.2.
>
> Could anyone suggest how to debug possible causes?
>
> I already set these in flink-conf.yaml, but I'm still getting failures:
> heartbeat.interval: 1
> heartbeat.timeout: 10
>
> Thanks.
>
> On Sun, Jul 22, 2018 at 2:20 PM Vishal Santoshi 
> wrote:
>
>> According to the UI it seems that "
>>
>> org.apache.flink.util.FlinkException: The assigned slot 
>> 208af709ef7be2d2dfc028ba3bbf4600_10 was removed.
>>
>> " was the cause of a pipe restart.
>>
>> As to the TM it is an artifact of the new job allocation regime which
>> will exhaust all slots on a TM rather then distributing them equitably.
>> TMs selectively are under more stress then in a pure RR distribution I
>> think. We may have to lower the slots on each TM to define a good upper
>> bound. You are correct 50s is a a pretty generous value.
>>
>> On Sun, Jul 22, 2018 at 6:55 AM, Gary Yao  wrote:
>>
>>> Hi,
>>>
>>> The first exception should be only logged on info level. It's expected
>>> to see
>>> this exception when a TaskManager unregisters from the ResourceManager.
>>>
>>> Heartbeats can be configured via heartbeat.interval and hearbeat.timeout
>>> [1].
>>> The default timeout is 50s, which should be a generous value. It is
>>> probably a
>>> good idea to find out why the heartbeats cannot be answered by the TM.
>>>
>>> Best,
>>> Gary
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.5/ops/config.html#heartbeat-manager
>>>
>>>
>>> On Sun, Jul 22, 2018 at 1:36 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 2 issues we are seeing on 1.5.1 on a streaming pipe line

 org.apache.flink.util.FlinkException: The assigned slot 
 208af709ef7be2d2dfc028ba3bbf4600_10 was removed.


 and

 java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
 208af709ef7be2d2dfc028ba3bbf4600 timed out.


 Not sure about the first but how do we increase the heartbeat interval
 of a TM

 Thanks much

 Vishal

>>>
>>>
>>
>


ClassCastException when writing to a Kafka stream with Flink + Python

2018-08-14 Thread Joe Malt
Hi,

I'm trying to write to a Kafka stream in a Flink job using the new Python
streaming API.

My program looks like this:

def main(factory):

props = Properties()
props.setProperty("bootstrap.servers",configs['kafkaBroker'])

consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']],
SimpleStringSchema(), props)
producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'],
SimpleStringSchema(), props)

env = factory.get_execution_environment()

stream = env.add_java_source(consumer)

stream.output() # this works (prints to a .out file)
stream.add_sink(producer) # producing to this causes the exception

env.execute()

I'm getting a ClassCastException when trying to output to the
FlinkKafkaProducer:

java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast
to java.lang.String
at 
org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
at 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
at 
org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
at 
org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)


It seems that the Python string isn't getting converted to a
java.lang.String, which should happen automatically in Jython.

I've tried adding a MapFunction that maps each input to String(input)where
String is the constructor for java.lang.String. This made no difference; I
get the same error.

Any ideas?

Thanks,

Joe Malt

Software Engineering Intern
Yelp


watermark does not progress

2018-08-14 Thread John O
I am noticing that watermark does not progress as expected when running locally 
in IDE. It just stays at Long.MIN

I am using EventTime processing and have tried both these time extractors.

* assignAscendingTimestamps ...

* 
assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

If I run the job on a flink cluster, I do see the watermark progress.

Is watermarking not supported in local mode?

Thanks
Jo


Re: ClassCastException when writing to a Kafka stream with Flink + Python

2018-08-14 Thread vino yang
Hi Joe,

ping Chesnay for you, please wait for the reply.

Thanks, vino.

Joe Malt  于2018年8月15日周三 上午7:16写道:

> Hi,
>
> I'm trying to write to a Kafka stream in a Flink job using the new Python
> streaming API.
>
> My program looks like this:
>
> def main(factory):
>
> props = Properties()
> props.setProperty("bootstrap.servers",configs['kafkaBroker'])
>
> consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']], 
> SimpleStringSchema(), props)
> producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'], 
> SimpleStringSchema(), props)
>
> env = factory.get_execution_environment()
>
> stream = env.add_java_source(consumer)
>
> stream.output() # this works (prints to a .out file)
> stream.add_sink(producer) # producing to this causes the exception
>
> env.execute()
>
> I'm getting a ClassCastException when trying to output to the
> FlinkKafkaProducer:
>
> java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast to 
> java.lang.String
>   at 
> org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
>   at 
> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
>   at 
> org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
>   at 
> org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>
> It seems that the Python string isn't getting converted to a
> java.lang.String, which should happen automatically in Jython.
>
> I've tried adding a MapFunction that maps each input to String(input)where
> String is the constructor for java.lang.String. This made no difference;
> I get the same error.
>
> Any ideas?
>
> Thanks,
>
> Joe Malt
>
> Software Engineering Intern
> Yelp
>


Re: watermark does not progress

2018-08-14 Thread vino yang
Hi Johe,

In local mode, it should also work.
When you debug, you can set a breakpoint in the getCurrentWatermark method
to see if you can enter the method and if the behavior is what you expect.
What is your source? If you post your code, it might be easier to locate.
In addition, for positioning watermark, you can also refer to this email[1].

[1]:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Debugging-watermarks-td7065.html

Thanks, vino.

John O  于2018年8月15日周三 上午9:44写道:

> I am noticing that watermark does not progress as expected when running
> locally in IDE. It just stays at Long.MIN
>
>
>
> I am using EventTime processing and have tried both these time extractors.
>
> · assignAscendingTimestamps ...
>
> · 
> assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)
> ...
>
>
>
> Also, configured the environment as so
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>
>
> If I run the job on a flink cluster, I do see the watermark progress.
>
>
>
> Is watermarking not supported in local mode?
>
>
>
> Thanks
>
> Jo
>


How to submit flink job on yarn by java code

2018-08-14 Thread spoon_lz
My project is to automatically generate flink's code jar and then submit it
to yarn cluster for execution and get the ApplicationId. I find that after
execution, an error will be reported



Then I searched for the error on Google and found that the reason for the
error was that I did not introduce the haoop environment variable.
But my jar submission is not called./bin/flink script originally submitted,
but use the CliFrontend.java ,How to solve this problem?

My code like :





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


docker, error NoResourceAvailableException..

2018-08-14 Thread shyla deshpande
Hello all,

Trying to use docker as a single node flink cluster.

docker run --name flink_local -p 8081:8081 -t flink local

I submited a job to the cluster using the Web UI. The job failed. I see
this error message in the docker logs.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 2, slots allocated: 0

The Web UI, shows 0 taskmanagers and 0 task slots on the Flink dashboard.
How do I start the docker with 2 Task slots?

Appreciate any help.

Thanks


CoFlatMapFunction with more than two input streams

2018-08-14 Thread Averell
Hi,

I have stream_A of type "Dog", which needs to be transformed using data from
stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
being updated frequently), to do the transformation I connect two streams,
do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
is just to update State table, not generating any output).

Now I have another stream B of type "Cat", which also needs to be
transformed using data from stream_C. After that transformation,
transformed_B will go through a completely different pipeline from
transformed A. 

I can see two approaches for this:
1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
2. create a new stream D of type "Animal", transform it with C, then split
the result into two streams using split/select using case class pattern
matching.

My question is which option should I choose?
With option 1, at least I need to maintain two State tables, let alone the
cost for duplicating stream (I am not sure how expensive this is in term of
resource), and the requirement on duplicating the CoFlatMapFunction (*).
With option 2, there's additional cost coming from unioning,
splitting/selecting, and type-casting at the final streams. 
Is there any better option for me?

Thank you very much for your support.
Regards,
Averell

(*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
[Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
Function as well.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: CoFlatMapFunction with more than two input streams

2018-08-14 Thread vino yang
Hi Averell,

As far as these two solutions are concerned, I think you can only choose
option 2, because as you have stated, the current Flink DataStream API does
not support the replacement of one of the input stream types of
CoFlatMapFunction. Another choice:

1. Split it into two separate jobs. But in comparison, I still think that
Option 2 is better.
2. Since you said that stream_c is slower and has fewer updates, if it is
not very large, you can store it in the RDBMS and then join it with
stream_a and stream_b respectively (using CoFlatMapFunction as well).

I think you should give priority to your option 2.

Thanks, vino.

Averell  于2018年8月15日周三 下午1:51写道:

> Hi,
>
> I have stream_A of type "Dog", which needs to be transformed using data
> from
> stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
> being updated frequently), to do the transformation I connect two streams,
> do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
> stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
> is just to update State table, not generating any output).
>
> Now I have another stream B of type "Cat", which also needs to be
> transformed using data from stream_C. After that transformation,
> transformed_B will go through a completely different pipeline from
> transformed A.
>
> I can see two approaches for this:
> 1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
> 2. create a new stream D of type "Animal", transform it with C, then split
> the result into two streams using split/select using case class pattern
> matching.
>
> My question is which option should I choose?
> With option 1, at least I need to maintain two State tables, let alone the
> cost for duplicating stream (I am not sure how expensive this is in term of
> resource), and the requirement on duplicating the CoFlatMapFunction (*).
> With option 2, there's additional cost coming from unioning,
> splitting/selecting, and type-casting at the final streams.
> Is there any better option for me?
>
> Thank you very much for your support.
> Regards,
> Averell
>
> (*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
> [Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
> Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
> Function as well.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: ClassCastException when writing to a Kafka stream with Flink + Python

2018-08-14 Thread Chesnay Schepler
As seen in the stacktrace every sink added via StreamExEnv#add_source is 
wrapped in a PythonSinkFunction which internally converts things to 
PyObjects, that's why the mapper  had no effect.
Currently we don't differentiate between java/python sinks, contrary to 
sources where we have an explicit StreamExEnv#add_java_source method.


There are 2 ways to approach this issue:
* As alluded in a previous mail, create a python wrapper around the 
kafka consumer class.

* extend PythonDataStream class with a separate method for kafka.

Unfortunately I don't think we can solve this in a generic matter (i.e. 
add_java_source) since the java types wouldn't fit at compile time.


On 15.08.2018 04:15, vino yang wrote:

Hi Joe,

ping Chesnay for you, please wait for the reply.

Thanks, vino.

Joe Malt mailto:jm...@yelp.com>> 于2018年8月15日周三 
上午7:16写道:


Hi,

I'm trying to write to a Kafka stream in a Flink job using the new
Python streaming API.

My program looks like this:

def main(factory):

 props = Properties()
 props.setProperty("bootstrap.servers",configs['kafkaBroker'])

 consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']], 
SimpleStringSchema(), props)
 producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'], 
SimpleStringSchema(), props)

 env = factory.get_execution_environment()

 stream = env.add_java_source(consumer)

 stream.output()# this works (prints to a .out file) 
stream.add_sink(producer)# producing to this causes the exception env.execute()

I'm getting a ClassCastException when trying to output to the
FlinkKafkaProducer:

java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast to 
java.lang.String
at 
org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
at 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
at 
org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
at 
org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)


It seems that the Python string isn't getting converted to a
java.lang.String, which should happen automatically in Jython.

I've tried adding a MapFunction that maps each input to
String(input)where String is the constructor for java.lang.String.
This made no difference; I get the same error.

Any ideas?

Thanks,

Joe Malt

Software Engineering Intern
Yelp





Re: CoFlatMapFunction with more than two input streams

2018-08-14 Thread Xingcan Cui
Hi Averell,

I am also in favor of option 2. Besides, you could use CoProcessFunction 
instead of CoFlatMapFunction and try to wrap elements of stream_A and stream_B 
using the `Either` class.

Best,
Xingcan

> On Aug 15, 2018, at 2:24 PM, vino yang  wrote:
> 
> Hi Averell,
> 
> As far as these two solutions are concerned, I think you can only choose 
> option 2, because as you have stated, the current Flink DataStream API does 
> not support the replacement of one of the input stream types of 
> CoFlatMapFunction. Another choice:
> 
> 1. Split it into two separate jobs. But in comparison, I still think that 
> Option 2 is better.
> 2. Since you said that stream_c is slower and has fewer updates, if it is not 
> very large, you can store it in the RDBMS and then join it with stream_a and 
> stream_b respectively (using CoFlatMapFunction as well).
> 
> I think you should give priority to your option 2.
> 
> Thanks, vino.
> 
> Averell mailto:lvhu...@gmail.com>> 于2018年8月15日周三 下午1:51写道:
> Hi,
> 
> I have stream_A of type "Dog", which needs to be transformed using data from
> stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
> being updated frequently), to do the transformation I connect two streams,
> do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
> stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
> is just to update State table, not generating any output).
> 
> Now I have another stream B of type "Cat", which also needs to be
> transformed using data from stream_C. After that transformation,
> transformed_B will go through a completely different pipeline from
> transformed A. 
> 
> I can see two approaches for this:
> 1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
> 2. create a new stream D of type "Animal", transform it with C, then split
> the result into two streams using split/select using case class pattern
> matching.
> 
> My question is which option should I choose?
> With option 1, at least I need to maintain two State tables, let alone the
> cost for duplicating stream (I am not sure how expensive this is in term of
> resource), and the requirement on duplicating the CoFlatMapFunction (*).
> With option 2, there's additional cost coming from unioning,
> splitting/selecting, and type-casting at the final streams. 
> Is there any better option for me?
> 
> Thank you very much for your support.
> Regards,
> Averell
> 
> (*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
> [Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
> Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
> Function as well.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>