Re: Checking actual config values used by TaskManager

2016-04-28 Thread Timur Fayruzov
If you're talking about parameters that were set on JVM startup then `ps aux|grep flink` on an EMR slave node should do the trick, that'll give you the full command line. On Thu, Apr 28, 2016 at 9:00 PM, Ken Krugler wrote: > Hi all, > > I’m running jobs on EMR via YARN, and wondering how to chec

Re: Command line arguments getting munged with CLI?

2016-04-27 Thread Timur Fayruzov
Hi Ken, I have built parameter parser in my jar to work with '--' instead of '-' and it works fine (on 1.0.0 and on current master). After a cursory look at parameter parser Flink uses (http://commons.apache.org/proper/commons-cli/) it seems that double vs single dash could make a difference, so y

Re: "No more bytes left" at deserialization

2016-04-26 Thread Timur Fayruzov
> On Apr 26, 2016, at 6:23pm, Timur Fayruzov > wrote: > > I built master with scala 2.11 and hadoop 2.7.1, now get a different > exception (still serialization-related though): > > java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup > at &

Re: "No more bytes left" at deserialization

2016-04-26 Thread Timur Fayruzov
6 at 9:07 AM, Till Rohrmann wrote: > Then let's keep finger crossed that we've found the culprit :-) > > On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov > wrote: > >> Thank you Till. >> >> I will try to run with new binaries today. As I have mentioned, th

Re: Job hangs

2016-04-26 Thread Timur Fayruzov
any way. Otherwise, I probably would not be actively looking at this problem. Thanks, Timur On Tue, Apr 26, 2016 at 8:11 AM, Ufuk Celebi wrote: > Can you please further provide the execution plan via > > env.getExecutionPlan() > > > > On Tue, Apr 26, 2016 at 4:23 PM, Tim

Re: "No more bytes left" at deserialization

2016-04-26 Thread Timur Fayruzov
tell, there is nothing unusual with your code. It's >>> probably an issue with Flink. >>> >>> I think we have to wait a little longer to hear what others in the >>> community say about this. >>> >>> @Aljoscha, Till, Robert: any ideas w

Re: Job hangs

2016-04-26 Thread Timur Fayruzov
> >> No. >> >> If you run on YARN, the YARN logs are the relevant ones for the >> JobManager and TaskManager. The client log submitting the job should >> be found in /log. >> >> – Ufuk >> >> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov >

Re: Job hangs

2016-04-26 Thread Timur Fayruzov
well? We can first have a look at the logs, but the stack traces will be helpful if we can't figure out what the issue is. – Ufuk On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann wrote: > Could you share the logs with us, Timur? That would be very helpful. > > Cheers, > Till > &g

Re: convert Json to Tuple

2016-04-25 Thread Timur Fayruzov
Normally, Json4s or Jackson+scala plugin work well for json to scala data structure conversions. However, I would not expect they support a special case for tuples, since JSON key-value fields would normally convert to case classes and JSON arrays are converted to, well, arrays. That's being said,

Job hangs

2016-04-25 Thread Timur Fayruzov
Hello, Now I'm at the stage where my job seem to completely hang. Source code is attached (it won't compile but I think gives a very good idea of what happens). Unfortunately I can't provide the datasets. Most of them are about 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB m

Re: YARN terminating TaskNode

2016-04-25 Thread Timur Fayruzov
y.preallocate: false > > Raising the cutoff ratio should prevent killing of the TaskManagers. > As Robert mentioned, in practice the JVM tends to allocate more than > the maximum specified heap size. You can put the following in your > flink-conf.yaml: > > # slightly raise

Re: "No more bytes left" at deserialization

2016-04-25 Thread Timur Fayruzov
alization is one of Flink cornerstones and should be well tested, so there is a high chance of me doing things wrongly, but I can't really find anything unusual in my code. Any suggestion what to try is highly welcomed. Thanks, Timur On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov wrote: > Hello

Re: Access to a shared resource within a mapper

2016-04-25 Thread Timur Fayruzov
use these > are shared between all MapFunction in a TaskManager (JVM). > > 2016-04-22 21:21 GMT+02:00 Timur Fayruzov : > >> Actually, a follow-up question: is map function single-threaded (within >> one task manager, that is). If it's not then lazy initialization won

Re: YARN terminating TaskNode

2016-04-25 Thread Timur Fayruzov
by YARN. > > > > I have to check the code of Flink again, because I would expect the > safety > > boundary to be much larger than 30 mb. > > > > Regards, > > Robert > > > > > > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov < > timur

Re: "No more bytes left" at deserialization

2016-04-24 Thread Timur Fayruzov
recently changed something related to the ExecutionConfig which has > lead to Kryo issues in the past. > > > On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov > wrote: > >> Trying to use ProtobufSerializer -- program consistently fails with the >> following exception: &g

Re: "No more bytes left" at deserialization

2016-04-24 Thread Timur Fayruzov
r 24, 2016 at 2:26 AM, Timur Fayruzov wrote: > Hello, > > I'm running a Flink program that is failing with the following exception: > > 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend >

"No more bytes left" at deserialization

2016-04-24 Thread Timur Fayruzov
Hello, I'm running a Flink program that is failing with the following exception: 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execu

YARN terminating TaskNode

2016-04-22 Thread Timur Fayruzov
Hello, Next issue in a string of things I'm solving is that my application fails with the message 'Connection unexpectedly closed by remote task manager'. Yarn log shows the following: Container [pid=4102,containerID=container_1461341357870_0004_01_15] is running beyond physical memory limit

Re: Access to a shared resource within a mapper

2016-04-22 Thread Timur Fayruzov
the parallel > execution by making it a "lazy" variable in Scala. > > On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov > wrote: > >> Outstanding! Thanks, Aljoscha. >> >> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek >> wrote: >> >>> Hi,

Re: Access to a shared resource within a mapper

2016-04-22 Thread Timur Fayruzov
def map(input: INT): OUT = { > // use client > } > } > > the open() method is called before any elements are passed to the > function. The counterpart of open() is close(), which is called after all > elements are through or if the job cancels. > > Cheers, > Aljoscha

Access to a shared resource within a mapper

2016-04-21 Thread Timur Fayruzov
Hello, I'm writing a Scala Flink application. I have a standalone process that exists on every Flink node that I need to call to transform my data. To access this process I need to initialize non thread-safe client first. I would like to avoid initializing a client for each element being transform

Re: Integrate Flink with S3 on EMR cluster

2016-04-08 Thread Timur Fayruzov
> > More answers inline below > > > On Thu, Apr 7, 2016 at 11:32 PM, Timur Fayruzov > wrote: > >> The exception does not show up in the console when I run the job, it only >> shows in the logs. I thought it means that it happens either on AM or TM (I >> as

Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Timur Fayruzov
lib folder. > The above approach did not work, could you elaborate what you meant by 'lib folder'? Thanks, Timur > – Ufuk > > > > > On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov > wrote: > > There's one more filesystem integration failure that

Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Timur Fayruzov
link is good for AWS users. I think the > bootstrap action scripts would be placed in `flink-contrib` directory. > > > > If you want, one of people in PMC of Flink will be assign FLINK-1337 to > you. > > > > Regards, > > Chiwan Park > > > >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov > wrote: > >> > >> I had a guide like that. > >> > > >

Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
t;>> JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native >>> mapred-site.xml: key: mapreduce.admin.user.env >>> >>> I tried to add path to dir with my native lib in both places, but still >>> no luck. >>> >>> Thanks, >

Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
es: hadoop-env.sh: export JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native mapred-site.xml: key: mapreduce.admin.user.env I tried to add path to dir with my native lib in both places, but still no luck. Thanks, Timur On Wed, Apr 6, 2016 at 11:21 PM, Timur Fayruzov wrote: &

Using native libraries in Flink EMR jobs

2016-04-06 Thread Timur Fayruzov
Hello, I'm not sure whether it's a Hadoop or Flink-specific question, but since I ran into this in the context of Flink I'm asking here. I would be glad if anyone can suggest a more appropriate place. I have a native library that I need to use in my Flink batch job that I run on EMR, and I try to

Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Timur Fayruzov
> > HADOOP_CONF_DIR =/etc/hadoop/conf bin/flink etc. > > Hope this helps! Please report back. :-) > > – Ufuk > > > On Tue, Apr 5, 2016 at 5:47 PM, Timur Fayruzov > wrote: > > Hello Ufuk, > > > > I'm using EMR 4.4.0. > > > > Thanks, &g

Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Timur Fayruzov
Hello Ufuk, I'm using EMR 4.4.0. Thanks, Timur On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi wrote: > Hey Timur, > > which EMR version are you using? > > – Ufuk > > On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov > wrote: > > Thanks for the answer, Ken. &g

Re: Integrate Flink with S3 on EMR cluster

2016-04-04 Thread Timur Fayruzov
gt; What happens if you use s3n:/// for the --input > parameter? > > — Ken > > On Apr 4, 2016, at 2:51pm, Timur Fayruzov > wrote: > > Hello, > > I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded > with a three-step procedure: load data from S

Integrate Flink with S3 on EMR cluster

2016-04-04 Thread Timur Fayruzov
Hello, I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded with a three-step procedure: load data from S3 to cluster's HDFS, run Flink Job, unload outputs from HDFS to S3. However, ideally I'd like to read/write data directly from/to S3. I followed the instructions here: ht

Re: Implicit inference of TypeInformation for join keys

2016-03-31 Thread Timur Fayruzov
ed with the additional implicit parameter. > It's a bit ugly, though ... > > On Wed, 30 Mar 2016 at 18:34 Timur Fayruzov > wrote: > >> Actually, there is an even easier solution (which I saw in your reply to >> my other question): >> ``` >> a.coGro

Re: Why Scala Option is not a valid key?

2016-03-30 Thread Timur Fayruzov
nt` because the > return type of KeySelector is `Int`. `TypeInformation` is not generic > type. > > Regards, > Chiwan Park > > > On Mar 31, 2016, at 1:09 AM, Timur Fayruzov > wrote: > > > > Thank you for your answers, Chiwan! That would mean that a generic type &

Re: Implicit inference of TypeInformation for join keys

2016-03-30 Thread Timur Fayruzov
ce decrease > when you are using KeySelector. > > Regards, > Chiwan Park > > > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov > wrote: > > > > Thank you Chiwan! Yes, I understand that there are workarounds that > don't use function argument (and thus do not

Re: Why Scala Option is not a valid key?

2016-03-30 Thread Timur Fayruzov
``` > > Note that the approach in example (using hashCode()) cannot be applied to > sort task. > > Regards, > Chiwan Park > > > On Mar 30, 2016, at 2:37 AM, Timur Fayruzov > wrote: > > > > There is some more detail to this question that I missed initia

Re: Implicit inference of TypeInformation for join keys

2016-03-30 Thread Timur Fayruzov
rom 0. > .equalTo(0, 1) { > (left, right) => 1 > } > ``` > > I hope this helps. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples > > Regards, > Chiwan Park > > >

Implicit inference of TypeInformation for join keys

2016-03-29 Thread Timur Fayruzov
Hello, Another issue I have encountered is incorrect implicit resolution (I'm using Scala 2.11.7). Here's the example (with a workaround): val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) a.coGroup(b) .where(e =>

Re: Why Scala Option is not a valid key?

2016-03-29 Thread Timur Fayruzov
encountering an Option. Is it possible to work around this situation without giving up Options? Inability to use Options in Domain objects could be really frustrating. Thanks, Timur On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov wrote: > Hello, > > I'm evaluating Flink and one thin

Why Scala Option is not a valid key?

2016-03-29 Thread Timur Fayruzov
Hello, I'm evaluating Flink and one thing I noticed is Option[A] can't be used as a key for coGroup (looking specifically here: https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). I'm not clear about the reason of t