Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Alexander Pivovarov
If I add additional small box to the cluster can I configure yarn to select
small box to run am container?


On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen  wrote:

> Typically YARN is there because you're mediating resource requests
> from things besides Spark, so yeah using every bit of the cluster is a
> little bit of a corner case. There's not a good answer if all your
> nodes are the same size.
>
> I think you can let YARN over-commit RAM though, and allocate more
> memory than it actually has. It may be beneficial to let them all
> think they have an extra GB, and let one node running the AM
> technically be overcommitted, a state which won't hurt at all unless
> you're really really tight on memory, in which case something might
> get killed.
>
> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly 
> wrote:
> > Alex,
> >
> > That's a very good question that I've been trying to answer myself
> recently
> > too. Since you've mentioned before that you're using EMR, I assume you're
> > asking this because you've noticed this behavior on emr-4.3.0.
> >
> > In this release, we made some changes to the maximizeResourceAllocation
> > (which you may or may not be using, but either way this issue is
> present),
> > including the accidental inclusion of somewhat of a bug that makes it not
> > reserve any space for the AM, which ultimately results in one of the
> nodes
> > being utilized only by the AM and not an executor.
> >
> > However, as you point out, the only viable fix seems to be to reserve
> enough
> > memory for the AM on *every single node*, which in some cases might
> actually
> > be worse than wasting a lot of memory on a single node.
> >
> > So yeah, I also don't like either option. Is this just the price you pay
> for
> > running on YARN?
> >
> >
> > ~ Jonathan
> >
> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov  >
> > wrote:
> >>
> >> Lets say that yarn has 53GB memory available on each slave
> >>
> >> spark.am container needs 896MB.  (512 + 384)
> >>
> >> I see two options to configure spark:
> >>
> >> 1. configure spark executors to use 52GB and leave 1 GB on each box. So,
> >> some box will also run am container. So, 1GB memory will not be used on
> all
> >> slaves but one.
> >>
> >> 2. configure spark to use all 53GB and add additional 53GB box which
> will
> >> run only am container. So, 52GB on this additional box will do nothing
> >>
> >> I do not like both options. Is there a better way to configure
> yarn/spark?
> >>
> >>
> >> Alex
>


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Sean Owen
If it's too small to run an executor, I'd think it would be chosen for
the AM as the only way to satisfy the request.

On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
 wrote:
> If I add additional small box to the cluster can I configure yarn to select
> small box to run am container?
>
>
> On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen  wrote:
>>
>> Typically YARN is there because you're mediating resource requests
>> from things besides Spark, so yeah using every bit of the cluster is a
>> little bit of a corner case. There's not a good answer if all your
>> nodes are the same size.
>>
>> I think you can let YARN over-commit RAM though, and allocate more
>> memory than it actually has. It may be beneficial to let them all
>> think they have an extra GB, and let one node running the AM
>> technically be overcommitted, a state which won't hurt at all unless
>> you're really really tight on memory, in which case something might
>> get killed.
>>
>> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly 
>> wrote:
>> > Alex,
>> >
>> > That's a very good question that I've been trying to answer myself
>> > recently
>> > too. Since you've mentioned before that you're using EMR, I assume
>> > you're
>> > asking this because you've noticed this behavior on emr-4.3.0.
>> >
>> > In this release, we made some changes to the maximizeResourceAllocation
>> > (which you may or may not be using, but either way this issue is
>> > present),
>> > including the accidental inclusion of somewhat of a bug that makes it
>> > not
>> > reserve any space for the AM, which ultimately results in one of the
>> > nodes
>> > being utilized only by the AM and not an executor.
>> >
>> > However, as you point out, the only viable fix seems to be to reserve
>> > enough
>> > memory for the AM on *every single node*, which in some cases might
>> > actually
>> > be worse than wasting a lot of memory on a single node.
>> >
>> > So yeah, I also don't like either option. Is this just the price you pay
>> > for
>> > running on YARN?
>> >
>> >
>> > ~ Jonathan
>> >
>> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov
>> > 
>> > wrote:
>> >>
>> >> Lets say that yarn has 53GB memory available on each slave
>> >>
>> >> spark.am container needs 896MB.  (512 + 384)
>> >>
>> >> I see two options to configure spark:
>> >>
>> >> 1. configure spark executors to use 52GB and leave 1 GB on each box.
>> >> So,
>> >> some box will also run am container. So, 1GB memory will not be used on
>> >> all
>> >> slaves but one.
>> >>
>> >> 2. configure spark to use all 53GB and add additional 53GB box which
>> >> will
>> >> run only am container. So, 52GB on this additional box will do nothing
>> >>
>> >> I do not like both options. Is there a better way to configure
>> >> yarn/spark?
>> >>
>> >>
>> >> Alex
>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Long running Spark job on YARN throws "No AMRMToken"

2016-02-09 Thread Steve Loughran

On 9 Feb 2016, at 05:55, Prabhu Joseph 
mailto:prabhujose.ga...@gmail.com>> wrote:

+ Spark-Dev

On Tue, Feb 9, 2016 at 10:04 AM, Prabhu Joseph 
mailto:prabhujose.ga...@gmail.com>> wrote:
Hi All,

A long running Spark job on YARN throws below exception after running for 
few days.

yarn.ApplicationMaster: Reporter thread fails 1 time(s) in a row. 
org.apache.hadoop.yarn.exceptions.YarnException: No AMRMToken found for user 
prabhu at org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUtil.java:45)

Do any of the below renew the AMRMToken and solve the issue

1. yarn-resourcemanager.delegation.token.max-lifetime increase from 7 days

2. Configuring Proxy user:

 hadoop.proxyuser.yarn.hosts * 
 hadoop.proxyuser.yarn.groups * 


wouldnt do that: security issues


3. Can Spark-1.4.0 handle with fix 
https://issues.apache.org/jira/browse/SPARK-5342

spark.yarn.credentials.file



I'll say "maybe" there

How to renew the AMRMToken for a long running job on YARN?




AMRM token renewal should be automatic in AM; Yarn sends a message to the AM 
(actually an allocate() response with no containers but a new token at the tail 
of the message.

i don't see any logging in the Hadoopp code there (AMRMClientImpl); filed 
YARN-4682 to add a log statement

if someone other than me were to supply a patch to that JIRA to add a log 
statement *by the end of the day* I'll review it and get it in to Hadoop 2.8

-Steve


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Steve Loughran

> On 9 Feb 2016, at 06:53, Sean Owen  wrote:
> 
> 
> I think you can let YARN over-commit RAM though, and allocate more
> memory than it actually has. It may be beneficial to let them all
> think they have an extra GB, and let one node running the AM
> technically be overcommitted, a state which won't hurt at all unless
> you're really really tight on memory, in which case something might
> get killed.


from my test VMs

  
Whether physical memory limits will be enforced for
  containers.

yarn.nodemanager.pmem-check-enabled
false
  

  
yarn.nodemanager.vmem-check-enabled
false
  


it does mean that a container can swap massively, hurting the performance of 
all containers around it as IO bandwidth gets soaked up —which is why the 
checks are on for shared clusters. If it's dedicated, you can overcommit

Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread praveen S
How about running in client mode, so that the client from which it is run
becomes the driver.

Regards,
Praveen
On 9 Feb 2016 16:59, "Steve Loughran"  wrote:

>
> > On 9 Feb 2016, at 06:53, Sean Owen  wrote:
> >
> >
> > I think you can let YARN over-commit RAM though, and allocate more
> > memory than it actually has. It may be beneficial to let them all
> > think they have an extra GB, and let one node running the AM
> > technically be overcommitted, a state which won't hurt at all unless
> > you're really really tight on memory, in which case something might
> > get killed.
>
>
> from my test VMs
>
>   
> Whether physical memory limits will be enforced for
>   containers.
> 
> yarn.nodemanager.pmem-check-enabled
> false
>   
>
>   
> yarn.nodemanager.vmem-check-enabled
> false
>   
>
>
> it does mean that a container can swap massively, hurting the performance
> of all containers around it as IO bandwidth gets soaked up —which is why
> the checks are on for shared clusters. If it's dedicated, you can overcommit


Re: Long running Spark job on YARN throws "No AMRMToken"

2016-02-09 Thread Steve Loughran

On 9 Feb 2016, at 11:26, Steve Loughran 
mailto:ste...@hortonworks.com>> wrote:


On 9 Feb 2016, at 05:55, Prabhu Joseph 
mailto:prabhujose.ga...@gmail.com>> wrote:

+ Spark-Dev

On Tue, Feb 9, 2016 at 10:04 AM, Prabhu Joseph 
mailto:prabhujose.ga...@gmail.com>> wrote:
Hi All,

A long running Spark job on YARN throws below exception after running for 
few days.

yarn.ApplicationMaster: Reporter thread fails 1 time(s) in a row. 
org.apache.hadoop.yarn.exceptions.YarnException: No AMRMToken found for user 
prabhu at org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUtil.java:45)

Do any of the below renew the AMRMToken and solve the issue

1. yarn-resourcemanager.delegation.token.max-lifetime increase from 7 days

2. Configuring Proxy user:

 hadoop.proxyuser.yarn.hosts * 
 hadoop.proxyuser.yarn.groups * 


wouldnt do that: security issues


3. Can Spark-1.4.0 handle with fix 
https://issues.apache.org/jira/browse/SPARK-5342

spark.yarn.credentials.file



I'll say "maybe" there

uprated to a no, having looked at the code more


How to renew the AMRMToken for a long running job on YARN?




AMRM token renewal should be automatic in AM; Yarn sends a message to the AM 
(actually an allocate() response with no containers but a new token at the tail 
of the message.

i don't see any logging in the Hadoopp code there (AMRMClientImpl); filed 
YARN-4682 to add a log statement

if someone other than me were to supply a patch to that JIRA to add a log 
statement *by the end of the day* I'll review it and get it in to Hadoop 2.8


like I said: I'll get this in to hadoop-2.8 if someone is timely with the diff



Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Jonathan Kelly
Sean, I'm not sure if that's actually the case, since the AM would be
allocated before the executors are even requested (by the driver through
the AM), right? This must at least be the case with dynamicAllocation
enabled, but I would expect that it's true regardless.

However, Alex, yes, this would be possible on EMR if you use small CORE
instances and larger TASK instances. EMR is configured to run AMs only on
CORE instances, so if you don't need much HDFS space (HDFS is stored only
on CORE instances, not TASK instances), this might be a good option for
you. Note though that you would have to set spark.executor.memory yourself
though rather than using maximizeResourceAllocation because
maximizeResourceAllocation currently only considers the size of the CORE
instances when determining spark.{driver,executor}.memory.

~ Jonathan

On Tue, Feb 9, 2016 at 12:40 AM Sean Owen  wrote:

> If it's too small to run an executor, I'd think it would be chosen for
> the AM as the only way to satisfy the request.
>
> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
>  wrote:
> > If I add additional small box to the cluster can I configure yarn to
> select
> > small box to run am container?
> >
> >
> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen  wrote:
> >>
> >> Typically YARN is there because you're mediating resource requests
> >> from things besides Spark, so yeah using every bit of the cluster is a
> >> little bit of a corner case. There's not a good answer if all your
> >> nodes are the same size.
> >>
> >> I think you can let YARN over-commit RAM though, and allocate more
> >> memory than it actually has. It may be beneficial to let them all
> >> think they have an extra GB, and let one node running the AM
> >> technically be overcommitted, a state which won't hurt at all unless
> >> you're really really tight on memory, in which case something might
> >> get killed.
> >>
> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly 
> >> wrote:
> >> > Alex,
> >> >
> >> > That's a very good question that I've been trying to answer myself
> >> > recently
> >> > too. Since you've mentioned before that you're using EMR, I assume
> >> > you're
> >> > asking this because you've noticed this behavior on emr-4.3.0.
> >> >
> >> > In this release, we made some changes to the
> maximizeResourceAllocation
> >> > (which you may or may not be using, but either way this issue is
> >> > present),
> >> > including the accidental inclusion of somewhat of a bug that makes it
> >> > not
> >> > reserve any space for the AM, which ultimately results in one of the
> >> > nodes
> >> > being utilized only by the AM and not an executor.
> >> >
> >> > However, as you point out, the only viable fix seems to be to reserve
> >> > enough
> >> > memory for the AM on *every single node*, which in some cases might
> >> > actually
> >> > be worse than wasting a lot of memory on a single node.
> >> >
> >> > So yeah, I also don't like either option. Is this just the price you
> pay
> >> > for
> >> > running on YARN?
> >> >
> >> >
> >> > ~ Jonathan
> >> >
> >> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov
> >> > 
> >> > wrote:
> >> >>
> >> >> Lets say that yarn has 53GB memory available on each slave
> >> >>
> >> >> spark.am container needs 896MB.  (512 + 384)
> >> >>
> >> >> I see two options to configure spark:
> >> >>
> >> >> 1. configure spark executors to use 52GB and leave 1 GB on each box.
> >> >> So,
> >> >> some box will also run am container. So, 1GB memory will not be used
> on
> >> >> all
> >> >> slaves but one.
> >> >>
> >> >> 2. configure spark to use all 53GB and add additional 53GB box which
> >> >> will
> >> >> run only am container. So, 52GB on this additional box will do
> nothing
> >> >>
> >> >> I do not like both options. Is there a better way to configure
> >> >> yarn/spark?
> >> >>
> >> >>
> >> >> Alex
> >
> >
>


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Jonathan Kelly
Praveen,

You mean cluster mode, right? That would still in a sense cause one box to
be "wasted", but at least it would be used a bit more to its full
potential, especially if you set spark.driver.memory to higher than its 1g
default. Also, cluster mode is not an option for some applications, such as
the spark-shell, pyspark shell, or Zeppelin.

~ Jonathan

On Tue, Feb 9, 2016 at 5:48 AM praveen S  wrote:

> How about running in client mode, so that the client from which it is run
> becomes the driver.
>
> Regards,
> Praveen
> On 9 Feb 2016 16:59, "Steve Loughran"  wrote:
>
>>
>> > On 9 Feb 2016, at 06:53, Sean Owen  wrote:
>> >
>> >
>> > I think you can let YARN over-commit RAM though, and allocate more
>> > memory than it actually has. It may be beneficial to let them all
>> > think they have an extra GB, and let one node running the AM
>> > technically be overcommitted, a state which won't hurt at all unless
>> > you're really really tight on memory, in which case something might
>> > get killed.
>>
>>
>> from my test VMs
>>
>>   
>> Whether physical memory limits will be enforced for
>>   containers.
>> 
>> yarn.nodemanager.pmem-check-enabled
>> false
>>   
>>
>>   
>> yarn.nodemanager.vmem-check-enabled
>> false
>>   
>>
>>
>> it does mean that a container can swap massively, hurting the performance
>> of all containers around it as IO bandwidth gets soaked up —which is why
>> the checks are on for shared clusters. If it's dedicated, you can overcommit
>
>


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Alexander Pivovarov
Am container starts first and yarn selects random computer to run it.

Is it possible to configure yarn so that it selects small computer for am
container.
On Feb 9, 2016 12:40 AM, "Sean Owen"  wrote:

> If it's too small to run an executor, I'd think it would be chosen for
> the AM as the only way to satisfy the request.
>
> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
>  wrote:
> > If I add additional small box to the cluster can I configure yarn to
> select
> > small box to run am container?
> >
> >
> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen  wrote:
> >>
> >> Typically YARN is there because you're mediating resource requests
> >> from things besides Spark, so yeah using every bit of the cluster is a
> >> little bit of a corner case. There's not a good answer if all your
> >> nodes are the same size.
> >>
> >> I think you can let YARN over-commit RAM though, and allocate more
> >> memory than it actually has. It may be beneficial to let them all
> >> think they have an extra GB, and let one node running the AM
> >> technically be overcommitted, a state which won't hurt at all unless
> >> you're really really tight on memory, in which case something might
> >> get killed.
> >>
> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly 
> >> wrote:
> >> > Alex,
> >> >
> >> > That's a very good question that I've been trying to answer myself
> >> > recently
> >> > too. Since you've mentioned before that you're using EMR, I assume
> >> > you're
> >> > asking this because you've noticed this behavior on emr-4.3.0.
> >> >
> >> > In this release, we made some changes to the
> maximizeResourceAllocation
> >> > (which you may or may not be using, but either way this issue is
> >> > present),
> >> > including the accidental inclusion of somewhat of a bug that makes it
> >> > not
> >> > reserve any space for the AM, which ultimately results in one of the
> >> > nodes
> >> > being utilized only by the AM and not an executor.
> >> >
> >> > However, as you point out, the only viable fix seems to be to reserve
> >> > enough
> >> > memory for the AM on *every single node*, which in some cases might
> >> > actually
> >> > be worse than wasting a lot of memory on a single node.
> >> >
> >> > So yeah, I also don't like either option. Is this just the price you
> pay
> >> > for
> >> > running on YARN?
> >> >
> >> >
> >> > ~ Jonathan
> >> >
> >> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov
> >> > 
> >> > wrote:
> >> >>
> >> >> Lets say that yarn has 53GB memory available on each slave
> >> >>
> >> >> spark.am container needs 896MB.  (512 + 384)
> >> >>
> >> >> I see two options to configure spark:
> >> >>
> >> >> 1. configure spark executors to use 52GB and leave 1 GB on each box.
> >> >> So,
> >> >> some box will also run am container. So, 1GB memory will not be used
> on
> >> >> all
> >> >> slaves but one.
> >> >>
> >> >> 2. configure spark to use all 53GB and add additional 53GB box which
> >> >> will
> >> >> run only am container. So, 52GB on this additional box will do
> nothing
> >> >>
> >> >> I do not like both options. Is there a better way to configure
> >> >> yarn/spark?
> >> >>
> >> >>
> >> >> Alex
> >
> >
>


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Marcelo Vanzin
You should be able to use spark.yarn.am.nodeLabelExpression if your
version of YARN supports node labels (and you've added a label to the
node where you want the AM to run).

On Tue, Feb 9, 2016 at 9:51 AM, Alexander Pivovarov
 wrote:
> Am container starts first and yarn selects random computer to run it.
>
> Is it possible to configure yarn so that it selects small computer for am
> container.
>
> On Feb 9, 2016 12:40 AM, "Sean Owen"  wrote:
>>
>> If it's too small to run an executor, I'd think it would be chosen for
>> the AM as the only way to satisfy the request.
>>
>> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
>>  wrote:
>> > If I add additional small box to the cluster can I configure yarn to
>> > select
>> > small box to run am container?
>> >
>> >
>> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen  wrote:
>> >>
>> >> Typically YARN is there because you're mediating resource requests
>> >> from things besides Spark, so yeah using every bit of the cluster is a
>> >> little bit of a corner case. There's not a good answer if all your
>> >> nodes are the same size.
>> >>
>> >> I think you can let YARN over-commit RAM though, and allocate more
>> >> memory than it actually has. It may be beneficial to let them all
>> >> think they have an extra GB, and let one node running the AM
>> >> technically be overcommitted, a state which won't hurt at all unless
>> >> you're really really tight on memory, in which case something might
>> >> get killed.
>> >>
>> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly 
>> >> wrote:
>> >> > Alex,
>> >> >
>> >> > That's a very good question that I've been trying to answer myself
>> >> > recently
>> >> > too. Since you've mentioned before that you're using EMR, I assume
>> >> > you're
>> >> > asking this because you've noticed this behavior on emr-4.3.0.
>> >> >
>> >> > In this release, we made some changes to the
>> >> > maximizeResourceAllocation
>> >> > (which you may or may not be using, but either way this issue is
>> >> > present),
>> >> > including the accidental inclusion of somewhat of a bug that makes it
>> >> > not
>> >> > reserve any space for the AM, which ultimately results in one of the
>> >> > nodes
>> >> > being utilized only by the AM and not an executor.
>> >> >
>> >> > However, as you point out, the only viable fix seems to be to reserve
>> >> > enough
>> >> > memory for the AM on *every single node*, which in some cases might
>> >> > actually
>> >> > be worse than wasting a lot of memory on a single node.
>> >> >
>> >> > So yeah, I also don't like either option. Is this just the price you
>> >> > pay
>> >> > for
>> >> > running on YARN?
>> >> >
>> >> >
>> >> > ~ Jonathan
>> >> >
>> >> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov
>> >> > 
>> >> > wrote:
>> >> >>
>> >> >> Lets say that yarn has 53GB memory available on each slave
>> >> >>
>> >> >> spark.am container needs 896MB.  (512 + 384)
>> >> >>
>> >> >> I see two options to configure spark:
>> >> >>
>> >> >> 1. configure spark executors to use 52GB and leave 1 GB on each box.
>> >> >> So,
>> >> >> some box will also run am container. So, 1GB memory will not be used
>> >> >> on
>> >> >> all
>> >> >> slaves but one.
>> >> >>
>> >> >> 2. configure spark to use all 53GB and add additional 53GB box which
>> >> >> will
>> >> >> run only am container. So, 52GB on this additional box will do
>> >> >> nothing
>> >> >>
>> >> >> I do not like both options. Is there a better way to configure
>> >> >> yarn/spark?
>> >> >>
>> >> >>
>> >> >> Alex
>> >
>> >



-- 
Marcelo

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Alexander Pivovarov
I use hadoop 2.7.1
On Feb 9, 2016 9:54 AM, "Marcelo Vanzin"  wrote:

> You should be able to use spark.yarn.am.nodeLabelExpression if your
> version of YARN supports node labels (and you've added a label to the
> node where you want the AM to run).
>
> On Tue, Feb 9, 2016 at 9:51 AM, Alexander Pivovarov
>  wrote:
> > Am container starts first and yarn selects random computer to run it.
> >
> > Is it possible to configure yarn so that it selects small computer for am
> > container.
> >
> > On Feb 9, 2016 12:40 AM, "Sean Owen"  wrote:
> >>
> >> If it's too small to run an executor, I'd think it would be chosen for
> >> the AM as the only way to satisfy the request.
> >>
> >> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
> >>  wrote:
> >> > If I add additional small box to the cluster can I configure yarn to
> >> > select
> >> > small box to run am container?
> >> >
> >> >
> >> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen 
> wrote:
> >> >>
> >> >> Typically YARN is there because you're mediating resource requests
> >> >> from things besides Spark, so yeah using every bit of the cluster is
> a
> >> >> little bit of a corner case. There's not a good answer if all your
> >> >> nodes are the same size.
> >> >>
> >> >> I think you can let YARN over-commit RAM though, and allocate more
> >> >> memory than it actually has. It may be beneficial to let them all
> >> >> think they have an extra GB, and let one node running the AM
> >> >> technically be overcommitted, a state which won't hurt at all unless
> >> >> you're really really tight on memory, in which case something might
> >> >> get killed.
> >> >>
> >> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly <
> jonathaka...@gmail.com>
> >> >> wrote:
> >> >> > Alex,
> >> >> >
> >> >> > That's a very good question that I've been trying to answer myself
> >> >> > recently
> >> >> > too. Since you've mentioned before that you're using EMR, I assume
> >> >> > you're
> >> >> > asking this because you've noticed this behavior on emr-4.3.0.
> >> >> >
> >> >> > In this release, we made some changes to the
> >> >> > maximizeResourceAllocation
> >> >> > (which you may or may not be using, but either way this issue is
> >> >> > present),
> >> >> > including the accidental inclusion of somewhat of a bug that makes
> it
> >> >> > not
> >> >> > reserve any space for the AM, which ultimately results in one of
> the
> >> >> > nodes
> >> >> > being utilized only by the AM and not an executor.
> >> >> >
> >> >> > However, as you point out, the only viable fix seems to be to
> reserve
> >> >> > enough
> >> >> > memory for the AM on *every single node*, which in some cases might
> >> >> > actually
> >> >> > be worse than wasting a lot of memory on a single node.
> >> >> >
> >> >> > So yeah, I also don't like either option. Is this just the price
> you
> >> >> > pay
> >> >> > for
> >> >> > running on YARN?
> >> >> >
> >> >> >
> >> >> > ~ Jonathan
> >> >> >
> >> >> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov
> >> >> > 
> >> >> > wrote:
> >> >> >>
> >> >> >> Lets say that yarn has 53GB memory available on each slave
> >> >> >>
> >> >> >> spark.am container needs 896MB.  (512 + 384)
> >> >> >>
> >> >> >> I see two options to configure spark:
> >> >> >>
> >> >> >> 1. configure spark executors to use 52GB and leave 1 GB on each
> box.
> >> >> >> So,
> >> >> >> some box will also run am container. So, 1GB memory will not be
> used
> >> >> >> on
> >> >> >> all
> >> >> >> slaves but one.
> >> >> >>
> >> >> >> 2. configure spark to use all 53GB and add additional 53GB box
> which
> >> >> >> will
> >> >> >> run only am container. So, 52GB on this additional box will do
> >> >> >> nothing
> >> >> >>
> >> >> >> I do not like both options. Is there a better way to configure
> >> >> >> yarn/spark?
> >> >> >>
> >> >> >>
> >> >> >> Alex
> >> >
> >> >
>
>
>
> --
> Marcelo
>


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Alexander Pivovarov
I decided to do YARN over-commit and add 896
to yarn.nodemanager.resource.memory-mb
it was 54,272
now I set it to 54,272+896 = 55,168

Kelly, can I ask you couple questions
1. it is possible to add yarn label to particular instance group boxes on
EMR?
2. in addition to maximizeResourceAllocation it would be nice if we have
executorsPerBox setting in EMR.
I have a case when I need to run 2 or 4 executors on r3.2xlarge

On Tue, Feb 9, 2016 at 9:56 AM, Alexander Pivovarov 
wrote:

> I use hadoop 2.7.1
> On Feb 9, 2016 9:54 AM, "Marcelo Vanzin"  wrote:
>
>> You should be able to use spark.yarn.am.nodeLabelExpression if your
>> version of YARN supports node labels (and you've added a label to the
>> node where you want the AM to run).
>>
>> On Tue, Feb 9, 2016 at 9:51 AM, Alexander Pivovarov
>>  wrote:
>> > Am container starts first and yarn selects random computer to run it.
>> >
>> > Is it possible to configure yarn so that it selects small computer for
>> am
>> > container.
>> >
>> > On Feb 9, 2016 12:40 AM, "Sean Owen"  wrote:
>> >>
>> >> If it's too small to run an executor, I'd think it would be chosen for
>> >> the AM as the only way to satisfy the request.
>> >>
>> >> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
>> >>  wrote:
>> >> > If I add additional small box to the cluster can I configure yarn to
>> >> > select
>> >> > small box to run am container?
>> >> >
>> >> >
>> >> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen 
>> wrote:
>> >> >>
>> >> >> Typically YARN is there because you're mediating resource requests
>> >> >> from things besides Spark, so yeah using every bit of the cluster
>> is a
>> >> >> little bit of a corner case. There's not a good answer if all your
>> >> >> nodes are the same size.
>> >> >>
>> >> >> I think you can let YARN over-commit RAM though, and allocate more
>> >> >> memory than it actually has. It may be beneficial to let them all
>> >> >> think they have an extra GB, and let one node running the AM
>> >> >> technically be overcommitted, a state which won't hurt at all unless
>> >> >> you're really really tight on memory, in which case something might
>> >> >> get killed.
>> >> >>
>> >> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly <
>> jonathaka...@gmail.com>
>> >> >> wrote:
>> >> >> > Alex,
>> >> >> >
>> >> >> > That's a very good question that I've been trying to answer myself
>> >> >> > recently
>> >> >> > too. Since you've mentioned before that you're using EMR, I assume
>> >> >> > you're
>> >> >> > asking this because you've noticed this behavior on emr-4.3.0.
>> >> >> >
>> >> >> > In this release, we made some changes to the
>> >> >> > maximizeResourceAllocation
>> >> >> > (which you may or may not be using, but either way this issue is
>> >> >> > present),
>> >> >> > including the accidental inclusion of somewhat of a bug that
>> makes it
>> >> >> > not
>> >> >> > reserve any space for the AM, which ultimately results in one of
>> the
>> >> >> > nodes
>> >> >> > being utilized only by the AM and not an executor.
>> >> >> >
>> >> >> > However, as you point out, the only viable fix seems to be to
>> reserve
>> >> >> > enough
>> >> >> > memory for the AM on *every single node*, which in some cases
>> might
>> >> >> > actually
>> >> >> > be worse than wasting a lot of memory on a single node.
>> >> >> >
>> >> >> > So yeah, I also don't like either option. Is this just the price
>> you
>> >> >> > pay
>> >> >> > for
>> >> >> > running on YARN?
>> >> >> >
>> >> >> >
>> >> >> > ~ Jonathan
>> >> >> >
>> >> >> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov
>> >> >> > 
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Lets say that yarn has 53GB memory available on each slave
>> >> >> >>
>> >> >> >> spark.am container needs 896MB.  (512 + 384)
>> >> >> >>
>> >> >> >> I see two options to configure spark:
>> >> >> >>
>> >> >> >> 1. configure spark executors to use 52GB and leave 1 GB on each
>> box.
>> >> >> >> So,
>> >> >> >> some box will also run am container. So, 1GB memory will not be
>> used
>> >> >> >> on
>> >> >> >> all
>> >> >> >> slaves but one.
>> >> >> >>
>> >> >> >> 2. configure spark to use all 53GB and add additional 53GB box
>> which
>> >> >> >> will
>> >> >> >> run only am container. So, 52GB on this additional box will do
>> >> >> >> nothing
>> >> >> >>
>> >> >> >> I do not like both options. Is there a better way to configure
>> >> >> >> yarn/spark?
>> >> >> >>
>> >> >> >>
>> >> >> >> Alex
>> >> >
>> >> >
>>
>>
>>
>> --
>> Marcelo
>>
>


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Alexander Pivovarov
I mean Jonathan

On Tue, Feb 9, 2016 at 10:41 AM, Alexander Pivovarov 
wrote:

> I decided to do YARN over-commit and add 896
> to yarn.nodemanager.resource.memory-mb
> it was 54,272
> now I set it to 54,272+896 = 55,168
>
> Kelly, can I ask you couple questions
> 1. it is possible to add yarn label to particular instance group boxes on
> EMR?
> 2. in addition to maximizeResourceAllocation it would be nice if we have
> executorsPerBox setting in EMR.
> I have a case when I need to run 2 or 4 executors on r3.2xlarge
>
> On Tue, Feb 9, 2016 at 9:56 AM, Alexander Pivovarov 
> wrote:
>
>> I use hadoop 2.7.1
>> On Feb 9, 2016 9:54 AM, "Marcelo Vanzin"  wrote:
>>
>>> You should be able to use spark.yarn.am.nodeLabelExpression if your
>>> version of YARN supports node labels (and you've added a label to the
>>> node where you want the AM to run).
>>>
>>> On Tue, Feb 9, 2016 at 9:51 AM, Alexander Pivovarov
>>>  wrote:
>>> > Am container starts first and yarn selects random computer to run it.
>>> >
>>> > Is it possible to configure yarn so that it selects small computer for
>>> am
>>> > container.
>>> >
>>> > On Feb 9, 2016 12:40 AM, "Sean Owen"  wrote:
>>> >>
>>> >> If it's too small to run an executor, I'd think it would be chosen for
>>> >> the AM as the only way to satisfy the request.
>>> >>
>>> >> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
>>> >>  wrote:
>>> >> > If I add additional small box to the cluster can I configure yarn to
>>> >> > select
>>> >> > small box to run am container?
>>> >> >
>>> >> >
>>> >> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen 
>>> wrote:
>>> >> >>
>>> >> >> Typically YARN is there because you're mediating resource requests
>>> >> >> from things besides Spark, so yeah using every bit of the cluster
>>> is a
>>> >> >> little bit of a corner case. There's not a good answer if all your
>>> >> >> nodes are the same size.
>>> >> >>
>>> >> >> I think you can let YARN over-commit RAM though, and allocate more
>>> >> >> memory than it actually has. It may be beneficial to let them all
>>> >> >> think they have an extra GB, and let one node running the AM
>>> >> >> technically be overcommitted, a state which won't hurt at all
>>> unless
>>> >> >> you're really really tight on memory, in which case something might
>>> >> >> get killed.
>>> >> >>
>>> >> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly <
>>> jonathaka...@gmail.com>
>>> >> >> wrote:
>>> >> >> > Alex,
>>> >> >> >
>>> >> >> > That's a very good question that I've been trying to answer
>>> myself
>>> >> >> > recently
>>> >> >> > too. Since you've mentioned before that you're using EMR, I
>>> assume
>>> >> >> > you're
>>> >> >> > asking this because you've noticed this behavior on emr-4.3.0.
>>> >> >> >
>>> >> >> > In this release, we made some changes to the
>>> >> >> > maximizeResourceAllocation
>>> >> >> > (which you may or may not be using, but either way this issue is
>>> >> >> > present),
>>> >> >> > including the accidental inclusion of somewhat of a bug that
>>> makes it
>>> >> >> > not
>>> >> >> > reserve any space for the AM, which ultimately results in one of
>>> the
>>> >> >> > nodes
>>> >> >> > being utilized only by the AM and not an executor.
>>> >> >> >
>>> >> >> > However, as you point out, the only viable fix seems to be to
>>> reserve
>>> >> >> > enough
>>> >> >> > memory for the AM on *every single node*, which in some cases
>>> might
>>> >> >> > actually
>>> >> >> > be worse than wasting a lot of memory on a single node.
>>> >> >> >
>>> >> >> > So yeah, I also don't like either option. Is this just the price
>>> you
>>> >> >> > pay
>>> >> >> > for
>>> >> >> > running on YARN?
>>> >> >> >
>>> >> >> >
>>> >> >> > ~ Jonathan
>>> >> >> >
>>> >> >> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov
>>> >> >> > 
>>> >> >> > wrote:
>>> >> >> >>
>>> >> >> >> Lets say that yarn has 53GB memory available on each slave
>>> >> >> >>
>>> >> >> >> spark.am container needs 896MB.  (512 + 384)
>>> >> >> >>
>>> >> >> >> I see two options to configure spark:
>>> >> >> >>
>>> >> >> >> 1. configure spark executors to use 52GB and leave 1 GB on each
>>> box.
>>> >> >> >> So,
>>> >> >> >> some box will also run am container. So, 1GB memory will not be
>>> used
>>> >> >> >> on
>>> >> >> >> all
>>> >> >> >> slaves but one.
>>> >> >> >>
>>> >> >> >> 2. configure spark to use all 53GB and add additional 53GB box
>>> which
>>> >> >> >> will
>>> >> >> >> run only am container. So, 52GB on this additional box will do
>>> >> >> >> nothing
>>> >> >> >>
>>> >> >> >> I do not like both options. Is there a better way to configure
>>> >> >> >> yarn/spark?
>>> >> >> >>
>>> >> >> >>
>>> >> >> >> Alex
>>> >> >
>>> >> >
>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>


RE: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Diwakar Dhanuskodi


Are you using  Yarn   to  run  spark jobs only  ?. Are you  configuring  spark  
properties in  spark-submit parameters? . If  so 
did  you  try  with  --no - of - executors x*53 (where  x is  no of  nodes ) 
--spark executor-memory 1g --spark-driver-memory 1g.

You  might  see  yarn  allocating  resources  to  all executors except one 
because  driver draws that  memory . Trade off  is  that  if  there were  not  
much  data to  process then  many  executors  may  run empty wasting  up  
resources . In  that  case probably  you  might  need  to  settle  down  with  
dynamic allocation  enabled .



Sent from Samsung Mobile.
Sent from Samsung Mobile.

 Original message From: Alexander Pivovarov 
 Date:09/02/2016  10:33  (GMT+05:30) 
To: dev@spark.apache.org Cc:  Subject: spark 
on yarn wastes one box (or 1 GB on each box) for am container 
Lets say that yarn has 53GB memory available on each slave

spark.am container needs 896MB.  (512 + 384)

I see two options to configure spark:

1. configure spark executors to use 52GB and leave 1 GB on each box. So, some 
box will also run am container. So, 1GB memory will not be used on all slaves 
but one.

2. configure spark to use all 53GB and add additional 53GB box which will run 
only am container. So, 52GB on this additional box will do nothing

I do not like both options. Is there a better way to configure yarn/spark?


Alex

Re: Preserving partitioning with dataframe select

2016-02-09 Thread Michael Armbrust
RDD level partitioning information is not used to decide when to shuffle
for queries planned using Catalyst (since we have better information about
distribution from the query plan itself).  Instead you should be looking at
the logic in EnsureRequirements

.

We don't yet reason about equivalence classes for attributes when deciding
if a given partitioning is valid, but #10844
 is a start at building that
infrastructure.


Re: Long running Spark job on YARN throws "No AMRMToken"

2016-02-09 Thread Hari Shreedharan
The credentials file approach (using keytab for spark apps) will only
update HDFS tokens. YARN's AMRM tokens should be taken care of by YARN
internally.

Steve - correct me if I am wrong here: If the AMRM tokens are disappearing
it might be a YARN bug (does the AMRM token have a 7 day limit as well? I
thought that was only for HDFS).


Thanks,
Hari

On Tue, Feb 9, 2016 at 8:44 AM, Steve Loughran 
wrote:

>
> On 9 Feb 2016, at 11:26, Steve Loughran  wrote:
>
>
> On 9 Feb 2016, at 05:55, Prabhu Joseph  wrote:
>
> + Spark-Dev
>
> On Tue, Feb 9, 2016 at 10:04 AM, Prabhu Joseph  > wrote:
>
>> Hi All,
>>
>> A long running Spark job on YARN throws below exception after running
>> for few days.
>>
>> yarn.ApplicationMaster: Reporter thread fails 1 time(s) in a row.
>> org.apache.hadoop.yarn.exceptions.YarnException: *No AMRMToken found* for
>> user prabhu at org.apache.hadoop.yarn.ipc.RPC
>> Util.getRemoteException(RPCUtil.java:45)
>>
>> Do any of the below renew the AMRMToken and solve the issue
>>
>> 1. yarn-resourcemanager.delegation.token.max-lifetime increase from 7 days
>>
>> 2. Configuring Proxy user:
>>
>>  hadoop.proxyuser.yarn.hosts *
>> 
>>  hadoop.proxyuser.yarn.groups *
>> 
>>
>
> wouldnt do that: security issues
>
>
>> 3. Can Spark-1.4.0 handle with fix
>> https://issues.apache.org/jira/browse/SPARK-5342
>>
>> spark.yarn.credentials.file
>>
>>
>>
> I'll say "maybe" there
>
>
> uprated to a no, having looked at the code more
>
>
> How to renew the AMRMToken for a long running job on YARN?
>>
>>
>>
>
> AMRM token renewal should be automatic in AM; Yarn sends a message to the
> AM (actually an allocate() response with no containers but a new token at
> the tail of the message.
>
> i don't see any logging in the Hadoopp code there (AMRMClientImpl); filed
> YARN-4682 to add a log statement
>
> if someone other than me were to supply a patch to that JIRA to add a log
> statement *by the end of the day* I'll review it and get it in to Hadoop 2.8
>
>
> like I said: I'll get this in to hadoop-2.8 if someone is timely with the
> diff
>
>


Error aliasing an array column.

2016-02-09 Thread rakeshchalasani
Hi All:

I am getting an "UnsupportedOperationException" when trying to alias an
array column. The issue seems to be at "CreateArray" expression -> dataType,
which checks for nullability of its children, while aliasing is creating a
PrettyAttribute that does not implement nullability.  

Below is an example to reproduce it.



this throws the following exception:




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Error-aliasing-an-array-column-tp16288.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Jonathan Kelly
Interesting, I was not aware of spark.yarn.am.nodeLabelExpression.

We do use YARN labels on EMR; each node is automatically labeled with its
type (MASTER, CORE, or TASK). And we do
set yarn.app.mapreduce.am.labels=CORE in yarn-site.xml, but we do not set
spark.yarn.am.nodeLabelExpression.

Does Spark somehow not actually honor this? It seems weird that Spark would
have its own similar-sounding property (spark.yarn.am.nodeLabelExpression).
If spark.yarn.am.nodeLabelExpression is used
and yarn.app.mapreduce.am.labels ignored, I could be wrong about Spark AMs
only running on CORE instances in EMR.

I'm guessing though that spark.yarn.am.nodeLabelExpression would simply
override yarn.app.mapreduce.am.labels, so yarn.app.mapreduce.am.labels
would be treated as a default when it is set and
spark.yarn.am.nodeLabelExpression is not. Is that correct?

In short, Alex, you should not need to set any of the label-related
properties yourself if you do what I suggested regarding using small CORE
instances and large TASK instances. But if you want to do something
different, it would also be possible to add a TASK instance group with
small nodes and configured with some new label. Then you could set
spark.yarn.am.nodeLabelExpression to that label.

Thanks, Marcelo, for pointing out spark.yarn.am.nodeLabelExpression!

~ Jonathan

On Tue, Feb 9, 2016 at 9:54 AM Marcelo Vanzin  wrote:

> You should be able to use spark.yarn.am.nodeLabelExpression if your
> version of YARN supports node labels (and you've added a label to the
> node where you want the AM to run).
>
> On Tue, Feb 9, 2016 at 9:51 AM, Alexander Pivovarov
>  wrote:
> > Am container starts first and yarn selects random computer to run it.
> >
> > Is it possible to configure yarn so that it selects small computer for am
> > container.
> >
> > On Feb 9, 2016 12:40 AM, "Sean Owen"  wrote:
> >>
> >> If it's too small to run an executor, I'd think it would be chosen for
> >> the AM as the only way to satisfy the request.
> >>
> >> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
> >>  wrote:
> >> > If I add additional small box to the cluster can I configure yarn to
> >> > select
> >> > small box to run am container?
> >> >
> >> >
> >> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen 
> wrote:
> >> >>
> >> >> Typically YARN is there because you're mediating resource requests
> >> >> from things besides Spark, so yeah using every bit of the cluster is
> a
> >> >> little bit of a corner case. There's not a good answer if all your
> >> >> nodes are the same size.
> >> >>
> >> >> I think you can let YARN over-commit RAM though, and allocate more
> >> >> memory than it actually has. It may be beneficial to let them all
> >> >> think they have an extra GB, and let one node running the AM
> >> >> technically be overcommitted, a state which won't hurt at all unless
> >> >> you're really really tight on memory, in which case something might
> >> >> get killed.
> >> >>
> >> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly <
> jonathaka...@gmail.com>
> >> >> wrote:
> >> >> > Alex,
> >> >> >
> >> >> > That's a very good question that I've been trying to answer myself
> >> >> > recently
> >> >> > too. Since you've mentioned before that you're using EMR, I assume
> >> >> > you're
> >> >> > asking this because you've noticed this behavior on emr-4.3.0.
> >> >> >
> >> >> > In this release, we made some changes to the
> >> >> > maximizeResourceAllocation
> >> >> > (which you may or may not be using, but either way this issue is
> >> >> > present),
> >> >> > including the accidental inclusion of somewhat of a bug that makes
> it
> >> >> > not
> >> >> > reserve any space for the AM, which ultimately results in one of
> the
> >> >> > nodes
> >> >> > being utilized only by the AM and not an executor.
> >> >> >
> >> >> > However, as you point out, the only viable fix seems to be to
> reserve
> >> >> > enough
> >> >> > memory for the AM on *every single node*, which in some cases might
> >> >> > actually
> >> >> > be worse than wasting a lot of memory on a single node.
> >> >> >
> >> >> > So yeah, I also don't like either option. Is this just the price
> you
> >> >> > pay
> >> >> > for
> >> >> > running on YARN?
> >> >> >
> >> >> >
> >> >> > ~ Jonathan
> >> >> >
> >> >> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov
> >> >> > 
> >> >> > wrote:
> >> >> >>
> >> >> >> Lets say that yarn has 53GB memory available on each slave
> >> >> >>
> >> >> >> spark.am container needs 896MB.  (512 + 384)
> >> >> >>
> >> >> >> I see two options to configure spark:
> >> >> >>
> >> >> >> 1. configure spark executors to use 52GB and leave 1 GB on each
> box.
> >> >> >> So,
> >> >> >> some box will also run am container. So, 1GB memory will not be
> used
> >> >> >> on
> >> >> >> all
> >> >> >> slaves but one.
> >> >> >>
> >> >> >> 2. configure spark to use all 53GB and add additional 53GB box
> which
> >> >> >> will
> >> >> >> run only am container. So, 52GB on this additional box will do
> >> >> >> nothi

Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Alexander Pivovarov
Thanks Jonathan

Actually I'd like to use maximizeResourceAllocation.

Ideally for me would be to add new instance group having single small box
labelled as AM
I'm not sure "aws emr create-cluster" supports setting custom LABELS , the
only settings awailable are:

InstanceCount=1,BidPrice=0.5,Name=sparkAM,InstanceGroupType=TASK,InstanceType=m3.xlarge


How can I specify yarn label AM for that box?



On Tue, Feb 9, 2016 at 12:16 PM, Jonathan Kelly 
wrote:

> Interesting, I was not aware of spark.yarn.am.nodeLabelExpression.
>
> We do use YARN labels on EMR; each node is automatically labeled with its
> type (MASTER, CORE, or TASK). And we do
> set yarn.app.mapreduce.am.labels=CORE in yarn-site.xml, but we do not set
> spark.yarn.am.nodeLabelExpression.
>
> Does Spark somehow not actually honor this? It seems weird that Spark
> would have its own similar-sounding property
> (spark.yarn.am.nodeLabelExpression). If spark.yarn.am.nodeLabelExpression
> is used and yarn.app.mapreduce.am.labels ignored, I could be wrong about
> Spark AMs only running on CORE instances in EMR.
>
> I'm guessing though that spark.yarn.am.nodeLabelExpression would simply
> override yarn.app.mapreduce.am.labels, so yarn.app.mapreduce.am.labels
> would be treated as a default when it is set and
> spark.yarn.am.nodeLabelExpression is not. Is that correct?
>
> In short, Alex, you should not need to set any of the label-related
> properties yourself if you do what I suggested regarding using small CORE
> instances and large TASK instances. But if you want to do something
> different, it would also be possible to add a TASK instance group with
> small nodes and configured with some new label. Then you could set
> spark.yarn.am.nodeLabelExpression to that label.
>
> Thanks, Marcelo, for pointing out spark.yarn.am.nodeLabelExpression!
>
> ~ Jonathan
>
> On Tue, Feb 9, 2016 at 9:54 AM Marcelo Vanzin  wrote:
>
>> You should be able to use spark.yarn.am.nodeLabelExpression if your
>> version of YARN supports node labels (and you've added a label to the
>> node where you want the AM to run).
>>
>> On Tue, Feb 9, 2016 at 9:51 AM, Alexander Pivovarov
>>  wrote:
>> > Am container starts first and yarn selects random computer to run it.
>> >
>> > Is it possible to configure yarn so that it selects small computer for
>> am
>> > container.
>> >
>> > On Feb 9, 2016 12:40 AM, "Sean Owen"  wrote:
>> >>
>> >> If it's too small to run an executor, I'd think it would be chosen for
>> >> the AM as the only way to satisfy the request.
>> >>
>> >> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
>> >>  wrote:
>> >> > If I add additional small box to the cluster can I configure yarn to
>> >> > select
>> >> > small box to run am container?
>> >> >
>> >> >
>> >> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen 
>> wrote:
>> >> >>
>> >> >> Typically YARN is there because you're mediating resource requests
>> >> >> from things besides Spark, so yeah using every bit of the cluster
>> is a
>> >> >> little bit of a corner case. There's not a good answer if all your
>> >> >> nodes are the same size.
>> >> >>
>> >> >> I think you can let YARN over-commit RAM though, and allocate more
>> >> >> memory than it actually has. It may be beneficial to let them all
>> >> >> think they have an extra GB, and let one node running the AM
>> >> >> technically be overcommitted, a state which won't hurt at all unless
>> >> >> you're really really tight on memory, in which case something might
>> >> >> get killed.
>> >> >>
>> >> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly <
>> jonathaka...@gmail.com>
>> >> >> wrote:
>> >> >> > Alex,
>> >> >> >
>> >> >> > That's a very good question that I've been trying to answer myself
>> >> >> > recently
>> >> >> > too. Since you've mentioned before that you're using EMR, I assume
>> >> >> > you're
>> >> >> > asking this because you've noticed this behavior on emr-4.3.0.
>> >> >> >
>> >> >> > In this release, we made some changes to the
>> >> >> > maximizeResourceAllocation
>> >> >> > (which you may or may not be using, but either way this issue is
>> >> >> > present),
>> >> >> > including the accidental inclusion of somewhat of a bug that
>> makes it
>> >> >> > not
>> >> >> > reserve any space for the AM, which ultimately results in one of
>> the
>> >> >> > nodes
>> >> >> > being utilized only by the AM and not an executor.
>> >> >> >
>> >> >> > However, as you point out, the only viable fix seems to be to
>> reserve
>> >> >> > enough
>> >> >> > memory for the AM on *every single node*, which in some cases
>> might
>> >> >> > actually
>> >> >> > be worse than wasting a lot of memory on a single node.
>> >> >> >
>> >> >> > So yeah, I also don't like either option. Is this just the price
>> you
>> >> >> > pay
>> >> >> > for
>> >> >> > running on YARN?
>> >> >> >
>> >> >> >
>> >> >> > ~ Jonathan
>> >> >> >
>> >> >> > On Mon, Feb 8, 2016 at 9:03 PM Alexander Pivovarov
>> >> >> > 
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Lets say that yarn has 53GB

Re: Error aliasing an array column.

2016-02-09 Thread Ted Yu
Do you mind pastebin'ning code snippet and exception one more time - I
couldn't see them in your original email.

Which Spark release are you using ?

On Tue, Feb 9, 2016 at 11:55 AM, rakeshchalasani 
wrote:

> Hi All:
>
> I am getting an "UnsupportedOperationException" when trying to alias an
> array column. The issue seems to be at "CreateArray" expression ->
> dataType,
> which checks for nullability of its children, while aliasing is creating a
> PrettyAttribute that does not implement nullability.
>
> Below is an example to reproduce it.
>
>
>
> this throws the following exception:
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Error-aliasing-an-array-column-tp16288.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Marcelo Vanzin
On Tue, Feb 9, 2016 at 12:16 PM, Jonathan Kelly  wrote:
> And we do set yarn.app.mapreduce.am.labels=CORE

That sounds very mapreduce-specific, so I doubt Spark (or anything
non-MR) would honor it.

-- 
Marcelo

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Error aliasing an array column.

2016-02-09 Thread Rakesh Chalasani
Sorry, didn't realize the mail didn't show the code. Using Spark release
1.6.0

Below is an example to reproduce it.

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
import org.apache.spark.sql.functions

case class Test(a:Int, b:Int)
val data = sparkContext.parallelize(Array.range(0, 10).map(x => Test(x,
x+1)))
val df = data.toDF()
val arrayCol = functions.array(df("a"), df("b")).as("arrayCol")

this throws the following exception:
ava.lang.UnsupportedOperationException
at
org.apache.spark.sql.catalyst.expressions.PrettyAttribute.nullable(namedExpressions.scala:289)
at
org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
at
org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
at
scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
at
scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
at
scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
at
scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
at
scala.collection.GenSeqLike$class.prefixLength(GenSeqLike.scala:92)
at scala.collection.AbstractSeq.prefixLength(Seq.scala:40)
at
scala.collection.IndexedSeqOptimized$class.exists(IndexedSeqOptimized.scala:40)
at
scala.collection.mutable.ArrayBuffer.exists(ArrayBuffer.scala:47)
at
org.apache.spark.sql.catalyst.expressions.CreateArray.dataType(complexTypeCreator.scala:40)
at
org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:136)
at
org.apache.spark.sql.catalyst.expressions.NamedExpression$class.typeSuffix(namedExpressions.scala:84)
at
org.apache.spark.sql.catalyst.expressions.Alias.typeSuffix(namedExpressions.scala:120)
at
org.apache.spark.sql.catalyst.expressions.Alias.toString(namedExpressions.scala:155)
at
org.apache.spark.sql.catalyst.expressions.Expression.prettyString(Expression.scala:207)
at org.apache.spark.sql.Column.toString(Column.scala:138)
at java.lang.String.valueOf(String.java:2994)
at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:331)
at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
at .(:20)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)

On Tue, Feb 9, 2016 at 4:23 PM Ted Yu  wrote:

> Do you mind pastebin'ning code snippet and exception one more time - I
> couldn't see them in your original email.
>
> Which Spark release are you using ?
>
> On Tue, Feb 9, 2016 at 11:55 AM, rakeshchalasani 
> wrote:
>
>> Hi All:
>>
>> I am getting an "UnsupportedOperationException" when trying to alias an
>> array column. The issue seems to be at "CreateArray" expression ->
>> dataType,
>> which checks for nullability of its children, while aliasing is creating a
>> PrettyAttribute that does not implement nullability.
>>
>> Below is an example to reproduce it.
>>
>>
>>
>> this throws the following exception:
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Error-aliasing-an-array-column-tp16288.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>


Re: Error aliasing an array column.

2016-02-09 Thread Ted Yu
How about changing the last line to:

scala> val df2 = df.select(functions.array(df("a"),
df("b")).alias("arrayCol"))
df2: org.apache.spark.sql.DataFrame = [arrayCol: array]

scala> df2.show()
++
|arrayCol|
++
|  [0, 1]|
|  [1, 2]|
|  [2, 3]|
|  [3, 4]|
|  [4, 5]|
|  [5, 6]|
|  [6, 7]|
|  [7, 8]|
|  [8, 9]|
| [9, 10]|
++

FYI

On Tue, Feb 9, 2016 at 1:38 PM, Rakesh Chalasani 
wrote:

> Sorry, didn't realize the mail didn't show the code. Using Spark release
> 1.6.0
>
> Below is an example to reproduce it.
>
> import org.apache.spark.sql.SQLContext
> val sqlContext = new SQLContext(sparkContext)
> import sqlContext.implicits._
> import org.apache.spark.sql.functions
>
> case class Test(a:Int, b:Int)
> val data = sparkContext.parallelize(Array.range(0, 10).map(x => Test(x,
> x+1)))
> val df = data.toDF()
> val arrayCol = functions.array(df("a"), df("b")).as("arrayCol")
>
> this throws the following exception:
> ava.lang.UnsupportedOperationException
> at
> org.apache.spark.sql.catalyst.expressions.PrettyAttribute.nullable(namedExpressions.scala:289)
> at
> org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
> at
> org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
> at
> scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
> at
> scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
> at
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
> at
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
> at
> scala.collection.GenSeqLike$class.prefixLength(GenSeqLike.scala:92)
> at scala.collection.AbstractSeq.prefixLength(Seq.scala:40)
> at
> scala.collection.IndexedSeqOptimized$class.exists(IndexedSeqOptimized.scala:40)
> at
> scala.collection.mutable.ArrayBuffer.exists(ArrayBuffer.scala:47)
> at
> org.apache.spark.sql.catalyst.expressions.CreateArray.dataType(complexTypeCreator.scala:40)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:136)
> at
> org.apache.spark.sql.catalyst.expressions.NamedExpression$class.typeSuffix(namedExpressions.scala:84)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.typeSuffix(namedExpressions.scala:120)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.toString(namedExpressions.scala:155)
> at
> org.apache.spark.sql.catalyst.expressions.Expression.prettyString(Expression.scala:207)
> at org.apache.spark.sql.Column.toString(Column.scala:138)
> at java.lang.String.valueOf(String.java:2994)
> at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:331)
> at
> scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
> at .(:20)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
> at
> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>
> On Tue, Feb 9, 2016 at 4:23 PM Ted Yu  wrote:
>
>> Do you mind pastebin'ning code snippet and exception one more time - I
>> couldn't see them in your original email.
>>
>> Which Spark release are you using ?
>>
>> On Tue, Feb 9, 2016 at 11:55 AM, rakeshchalasani 
>> wrote:
>>
>>> Hi All:
>>>
>>> I am getting an "UnsupportedOperationException" when trying to alias an
>>> array column. The issue seems to be at "CreateArray" expression ->
>>> dataType,
>>> which checks for nullability of its children, while aliasing is creating
>>> a
>>> PrettyAttribute that does not implement nullability.
>>>
>>> Below is an example to reproduce it.
>>>
>>>
>>>
>>> this throws the following exception:
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/Error-aliasing-an-array-column-tp16288.html
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>
>>>
>>


Re: Error aliasing an array column.

2016-02-09 Thread Rakesh Chalasani
Do you mean using "alias" instead of "as"? Unfortunately, that didn't help

> val arrayCol = functions.array(df("a"), df("b")).alias("arrayCol")

still throws the error.

Surprisingly, doing the same thing inside a select works,
> df.select(functions.array(df("a"), df("b")).as("arrayCol")).show()

++
|arrayCol|
++
|  [0, 1]|
|  [1, 2]|
|  [2, 3]|
|  [3, 4]|
|  [4, 5]|
|  [5, 6]|
|  [6, 7]|
|  [7, 8]|
|  [8, 9]|
| [9, 10]|
++



On Tue, Feb 9, 2016 at 4:52 PM Ted Yu  wrote:

> How about changing the last line to:
>
> scala> val df2 = df.select(functions.array(df("a"),
> df("b")).alias("arrayCol"))
> df2: org.apache.spark.sql.DataFrame = [arrayCol: array]
>
> scala> df2.show()
> ++
> |arrayCol|
> ++
> |  [0, 1]|
> |  [1, 2]|
> |  [2, 3]|
> |  [3, 4]|
> |  [4, 5]|
> |  [5, 6]|
> |  [6, 7]|
> |  [7, 8]|
> |  [8, 9]|
> | [9, 10]|
> ++
>
> FYI
>
> On Tue, Feb 9, 2016 at 1:38 PM, Rakesh Chalasani 
> wrote:
>
>> Sorry, didn't realize the mail didn't show the code. Using Spark release
>> 1.6.0
>>
>> Below is an example to reproduce it.
>>
>> import org.apache.spark.sql.SQLContext
>> val sqlContext = new SQLContext(sparkContext)
>> import sqlContext.implicits._
>> import org.apache.spark.sql.functions
>>
>> case class Test(a:Int, b:Int)
>> val data = sparkContext.parallelize(Array.range(0, 10).map(x => Test(x,
>> x+1)))
>> val df = data.toDF()
>> val arrayCol = functions.array(df("a"), df("b")).as("arrayCol")
>>
>> this throws the following exception:
>> ava.lang.UnsupportedOperationException
>> at
>> org.apache.spark.sql.catalyst.expressions.PrettyAttribute.nullable(namedExpressions.scala:289)
>> at
>> org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
>> at
>> org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
>> at
>> scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
>> at
>> scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
>> at
>> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>> at
>> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>> at
>> scala.collection.GenSeqLike$class.prefixLength(GenSeqLike.scala:92)
>> at scala.collection.AbstractSeq.prefixLength(Seq.scala:40)
>> at
>> scala.collection.IndexedSeqOptimized$class.exists(IndexedSeqOptimized.scala:40)
>> at
>> scala.collection.mutable.ArrayBuffer.exists(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.sql.catalyst.expressions.CreateArray.dataType(complexTypeCreator.scala:40)
>> at
>> org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:136)
>> at
>> org.apache.spark.sql.catalyst.expressions.NamedExpression$class.typeSuffix(namedExpressions.scala:84)
>> at
>> org.apache.spark.sql.catalyst.expressions.Alias.typeSuffix(namedExpressions.scala:120)
>> at
>> org.apache.spark.sql.catalyst.expressions.Alias.toString(namedExpressions.scala:155)
>> at
>> org.apache.spark.sql.catalyst.expressions.Expression.prettyString(Expression.scala:207)
>> at org.apache.spark.sql.Column.toString(Column.scala:138)
>> at java.lang.String.valueOf(String.java:2994)
>> at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:331)
>> at
>> scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
>> at .(:20)
>> at .()
>> at $print()
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>> at
>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>> at
>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>> at
>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>> at
>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>
>> On Tue, Feb 9, 2016 at 4:23 PM Ted Yu  wrote:
>>
>>> Do you mind pastebin'ning code snippet and exception one more time - I
>>> couldn't see them in your original email.
>>>
>>> Which Spark release are you using ?
>>>
>>> On Tue, Feb 9, 2016 at 11:55 AM, rakeshchalasani 
>>> wrote:
>>>
 Hi All:

 I am getting an "UnsupportedOperationException" when trying to alias an
 array column. The issue seems to be at "CreateArray" expression ->
 dataType,
 which checks for nullability of its children, while aliasing is
 creating a
 PrettyAttri

Re: Error aliasing an array column.

2016-02-09 Thread Ted Yu
What's your plan of using the arrayCol ?
It would be part of some query, right ?

On Tue, Feb 9, 2016 at 2:27 PM, Rakesh Chalasani 
wrote:

> Do you mean using "alias" instead of "as"? Unfortunately, that didn't help
>
> > val arrayCol = functions.array(df("a"), df("b")).alias("arrayCol")
>
> still throws the error.
>
> Surprisingly, doing the same thing inside a select works,
> > df.select(functions.array(df("a"), df("b")).as("arrayCol")).show()
>
> ++
> |arrayCol|
> ++
> |  [0, 1]|
> |  [1, 2]|
> |  [2, 3]|
> |  [3, 4]|
> |  [4, 5]|
> |  [5, 6]|
> |  [6, 7]|
> |  [7, 8]|
> |  [8, 9]|
> | [9, 10]|
> ++
>
>
>
> On Tue, Feb 9, 2016 at 4:52 PM Ted Yu  wrote:
>
>> How about changing the last line to:
>>
>> scala> val df2 = df.select(functions.array(df("a"),
>> df("b")).alias("arrayCol"))
>> df2: org.apache.spark.sql.DataFrame = [arrayCol: array]
>>
>> scala> df2.show()
>> ++
>> |arrayCol|
>> ++
>> |  [0, 1]|
>> |  [1, 2]|
>> |  [2, 3]|
>> |  [3, 4]|
>> |  [4, 5]|
>> |  [5, 6]|
>> |  [6, 7]|
>> |  [7, 8]|
>> |  [8, 9]|
>> | [9, 10]|
>> ++
>>
>> FYI
>>
>> On Tue, Feb 9, 2016 at 1:38 PM, Rakesh Chalasani 
>> wrote:
>>
>>> Sorry, didn't realize the mail didn't show the code. Using Spark release
>>> 1.6.0
>>>
>>> Below is an example to reproduce it.
>>>
>>> import org.apache.spark.sql.SQLContext
>>> val sqlContext = new SQLContext(sparkContext)
>>> import sqlContext.implicits._
>>> import org.apache.spark.sql.functions
>>>
>>> case class Test(a:Int, b:Int)
>>> val data = sparkContext.parallelize(Array.range(0, 10).map(x => Test(x,
>>> x+1)))
>>> val df = data.toDF()
>>> val arrayCol = functions.array(df("a"), df("b")).as("arrayCol")
>>>
>>> this throws the following exception:
>>> ava.lang.UnsupportedOperationException
>>> at
>>> org.apache.spark.sql.catalyst.expressions.PrettyAttribute.nullable(namedExpressions.scala:289)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
>>> at
>>> scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
>>> at
>>> scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
>>> at
>>> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
>>> at
>>> scala.collection.GenSeqLike$class.prefixLength(GenSeqLike.scala:92)
>>> at scala.collection.AbstractSeq.prefixLength(Seq.scala:40)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.exists(IndexedSeqOptimized.scala:40)
>>> at
>>> scala.collection.mutable.ArrayBuffer.exists(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.CreateArray.dataType(complexTypeCreator.scala:40)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:136)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.NamedExpression$class.typeSuffix(namedExpressions.scala:84)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.Alias.typeSuffix(namedExpressions.scala:120)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.Alias.toString(namedExpressions.scala:155)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.Expression.prettyString(Expression.scala:207)
>>> at org.apache.spark.sql.Column.toString(Column.scala:138)
>>> at java.lang.String.valueOf(String.java:2994)
>>> at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:331)
>>> at
>>> scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
>>> at .(:20)
>>> at .()
>>> at $print()
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>>> at
>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>>> at
>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>>> at
>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>>> at
>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>>>
>>> On Tue, Feb 9, 2016 at 4:23 PM Ted Yu  wrote:
>>>
 Do you mind pastebin'ning code snippet and exception one more time - I
 couldn't see them in your original email.

 Which Spark release are you using ?

 On Tue, Feb 9, 2016 at 11:55 AM, rakeshchalasani 

Re: Error aliasing an array column.

2016-02-09 Thread Michael Armbrust
That looks like a bug in toString for columns.  Can you open a JIRA?

On Tue, Feb 9, 2016 at 1:38 PM, Rakesh Chalasani 
wrote:

> Sorry, didn't realize the mail didn't show the code. Using Spark release
> 1.6.0
>
> Below is an example to reproduce it.
>
> import org.apache.spark.sql.SQLContext
> val sqlContext = new SQLContext(sparkContext)
> import sqlContext.implicits._
> import org.apache.spark.sql.functions
>
> case class Test(a:Int, b:Int)
> val data = sparkContext.parallelize(Array.range(0, 10).map(x => Test(x,
> x+1)))
> val df = data.toDF()
> val arrayCol = functions.array(df("a"), df("b")).as("arrayCol")
>
> this throws the following exception:
> ava.lang.UnsupportedOperationException
> at
> org.apache.spark.sql.catalyst.expressions.PrettyAttribute.nullable(namedExpressions.scala:289)
> at
> org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
> at
> org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
> at
> scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
> at
> scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
> at
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
> at
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
> at
> scala.collection.GenSeqLike$class.prefixLength(GenSeqLike.scala:92)
> at scala.collection.AbstractSeq.prefixLength(Seq.scala:40)
> at
> scala.collection.IndexedSeqOptimized$class.exists(IndexedSeqOptimized.scala:40)
> at
> scala.collection.mutable.ArrayBuffer.exists(ArrayBuffer.scala:47)
> at
> org.apache.spark.sql.catalyst.expressions.CreateArray.dataType(complexTypeCreator.scala:40)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:136)
> at
> org.apache.spark.sql.catalyst.expressions.NamedExpression$class.typeSuffix(namedExpressions.scala:84)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.typeSuffix(namedExpressions.scala:120)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.toString(namedExpressions.scala:155)
> at
> org.apache.spark.sql.catalyst.expressions.Expression.prettyString(Expression.scala:207)
> at org.apache.spark.sql.Column.toString(Column.scala:138)
> at java.lang.String.valueOf(String.java:2994)
> at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:331)
> at
> scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
> at .(:20)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
> at
> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>
> On Tue, Feb 9, 2016 at 4:23 PM Ted Yu  wrote:
>
>> Do you mind pastebin'ning code snippet and exception one more time - I
>> couldn't see them in your original email.
>>
>> Which Spark release are you using ?
>>
>> On Tue, Feb 9, 2016 at 11:55 AM, rakeshchalasani 
>> wrote:
>>
>>> Hi All:
>>>
>>> I am getting an "UnsupportedOperationException" when trying to alias an
>>> array column. The issue seems to be at "CreateArray" expression ->
>>> dataType,
>>> which checks for nullability of its children, while aliasing is creating
>>> a
>>> PrettyAttribute that does not implement nullability.
>>>
>>> Below is an example to reproduce it.
>>>
>>>
>>>
>>> this throws the following exception:
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/Error-aliasing-an-array-column-tp16288.html
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>
>>>
>>


Re: Error aliasing an array column.

2016-02-09 Thread Rakesh Chalasani
We are trying to dynamically create the query, with columns coming from
different places. We can over come this with a few more lines of code, but
it would be nice for us pass on the `alias` along (given that we can do so
for all the rest of the frame operations.)

Created JIRA here https://issues.apache.org/jira/browse/SPARK-13253

Thanks for the help.


On Tue, Feb 9, 2016 at 5:29 PM Ted Yu  wrote:

> What's your plan of using the arrayCol ?
> It would be part of some query, right ?
>
> On Tue, Feb 9, 2016 at 2:27 PM, Rakesh Chalasani 
> wrote:
>
>> Do you mean using "alias" instead of "as"? Unfortunately, that didn't help
>>
>> > val arrayCol = functions.array(df("a"), df("b")).alias("arrayCol")
>>
>> still throws the error.
>>
>> Surprisingly, doing the same thing inside a select works,
>> > df.select(functions.array(df("a"), df("b")).as("arrayCol")).show()
>>
>> ++
>> |arrayCol|
>> ++
>> |  [0, 1]|
>> |  [1, 2]|
>> |  [2, 3]|
>> |  [3, 4]|
>> |  [4, 5]|
>> |  [5, 6]|
>> |  [6, 7]|
>> |  [7, 8]|
>> |  [8, 9]|
>> | [9, 10]|
>> ++
>>
>>
>>
>> On Tue, Feb 9, 2016 at 4:52 PM Ted Yu  wrote:
>>
>>> How about changing the last line to:
>>>
>>> scala> val df2 = df.select(functions.array(df("a"),
>>> df("b")).alias("arrayCol"))
>>> df2: org.apache.spark.sql.DataFrame = [arrayCol: array]
>>>
>>> scala> df2.show()
>>> ++
>>> |arrayCol|
>>> ++
>>> |  [0, 1]|
>>> |  [1, 2]|
>>> |  [2, 3]|
>>> |  [3, 4]|
>>> |  [4, 5]|
>>> |  [5, 6]|
>>> |  [6, 7]|
>>> |  [7, 8]|
>>> |  [8, 9]|
>>> | [9, 10]|
>>> ++
>>>
>>> FYI
>>>
>>> On Tue, Feb 9, 2016 at 1:38 PM, Rakesh Chalasani 
>>> wrote:
>>>
 Sorry, didn't realize the mail didn't show the code. Using Spark
 release 1.6.0

 Below is an example to reproduce it.

 import org.apache.spark.sql.SQLContext
 val sqlContext = new SQLContext(sparkContext)
 import sqlContext.implicits._
 import org.apache.spark.sql.functions

 case class Test(a:Int, b:Int)
 val data = sparkContext.parallelize(Array.range(0, 10).map(x => Test(x,
 x+1)))
 val df = data.toDF()
 val arrayCol = functions.array(df("a"), df("b")).as("arrayCol")

 this throws the following exception:
 ava.lang.UnsupportedOperationException
 at
 org.apache.spark.sql.catalyst.expressions.PrettyAttribute.nullable(namedExpressions.scala:289)
 at
 org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
 at
 org.apache.spark.sql.catalyst.expressions.CreateArray$$anonfun$dataType$3.apply(complexTypeCreator.scala:40)
 at
 scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
 at
 scala.collection.IndexedSeqOptimized$$anonfun$exists$1.apply(IndexedSeqOptimized.scala:40)
 at
 scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
 at
 scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
 at
 scala.collection.GenSeqLike$class.prefixLength(GenSeqLike.scala:92)
 at scala.collection.AbstractSeq.prefixLength(Seq.scala:40)
 at
 scala.collection.IndexedSeqOptimized$class.exists(IndexedSeqOptimized.scala:40)
 at
 scala.collection.mutable.ArrayBuffer.exists(ArrayBuffer.scala:47)
 at
 org.apache.spark.sql.catalyst.expressions.CreateArray.dataType(complexTypeCreator.scala:40)
 at
 org.apache.spark.sql.catalyst.expressions.Alias.dataType(namedExpressions.scala:136)
 at
 org.apache.spark.sql.catalyst.expressions.NamedExpression$class.typeSuffix(namedExpressions.scala:84)
 at
 org.apache.spark.sql.catalyst.expressions.Alias.typeSuffix(namedExpressions.scala:120)
 at
 org.apache.spark.sql.catalyst.expressions.Alias.toString(namedExpressions.scala:155)
 at
 org.apache.spark.sql.catalyst.expressions.Expression.prettyString(Expression.scala:207)
 at org.apache.spark.sql.Column.toString(Column.scala:138)
 at java.lang.String.valueOf(String.java:2994)
 at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:331)
 at
 scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
 at .(:20)
 at .()
 at $print()
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
 

Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Jonathan Kelly
Oh, sheesh, how silly of me. I copied and pasted that setting name without
even noticing the "mapreduce" in it. Yes, I guess that would mean that
Spark AMs are probably running even on TASK instances currently, which is
OK but not consistent with what we do for MapReduce. I'll make sure we
set spark.yarn.am.nodeLabelExpression appropriately in the next EMR release.

~ Jonathan

On Tue, Feb 9, 2016 at 1:30 PM Marcelo Vanzin  wrote:

> On Tue, Feb 9, 2016 at 12:16 PM, Jonathan Kelly 
> wrote:
> > And we do set yarn.app.mapreduce.am.labels=CORE
>
> That sounds very mapreduce-specific, so I doubt Spark (or anything
> non-MR) would honor it.
>
> --
> Marcelo
>


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Alexander Pivovarov
Can you add an ability to set custom yarn labels instead/in addition to?
On Feb 9, 2016 3:28 PM, "Jonathan Kelly"  wrote:

> Oh, sheesh, how silly of me. I copied and pasted that setting name without
> even noticing the "mapreduce" in it. Yes, I guess that would mean that
> Spark AMs are probably running even on TASK instances currently, which is
> OK but not consistent with what we do for MapReduce. I'll make sure we
> set spark.yarn.am.nodeLabelExpression appropriately in the next EMR release.
>
> ~ Jonathan
>
> On Tue, Feb 9, 2016 at 1:30 PM Marcelo Vanzin  wrote:
>
>> On Tue, Feb 9, 2016 at 12:16 PM, Jonathan Kelly 
>> wrote:
>> > And we do set yarn.app.mapreduce.am.labels=CORE
>>
>> That sounds very mapreduce-specific, so I doubt Spark (or anything
>> non-MR) would honor it.
>>
>> --
>> Marcelo
>>
>


Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Jonathan Kelly
You can set custom per-instance-group configurations (e.g.,
["classification":"yarn-site",properties:{"yarn.nodemanager.labels":"SPARKAM"}])
using the Configurations parameter of
http://docs.aws.amazon.com/ElasticMapReduce/latest/API/API_InstanceGroupConfig.html.
Unfortunately, it's not currently possible to specify per-instance-group
configurations via the CLI though, only cluster wide configurations.

~ Jonathan

On Tue, Feb 9, 2016 at 12:36 PM Alexander Pivovarov 
wrote:

> Thanks Jonathan
>
> Actually I'd like to use maximizeResourceAllocation.
>
> Ideally for me would be to add new instance group having single small box
> labelled as AM
> I'm not sure "aws emr create-cluster" supports setting custom LABELS , the
> only settings awailable are:
>
> InstanceCount=1,BidPrice=0.5,Name=sparkAM,InstanceGroupType=TASK,InstanceType=m3.xlarge
>
>
> How can I specify yarn label AM for that box?
>
>
>
> On Tue, Feb 9, 2016 at 12:16 PM, Jonathan Kelly 
> wrote:
>
>> Interesting, I was not aware of spark.yarn.am.nodeLabelExpression.
>>
>> We do use YARN labels on EMR; each node is automatically labeled with its
>> type (MASTER, CORE, or TASK). And we do
>> set yarn.app.mapreduce.am.labels=CORE in yarn-site.xml, but we do not set
>> spark.yarn.am.nodeLabelExpression.
>>
>> Does Spark somehow not actually honor this? It seems weird that Spark
>> would have its own similar-sounding property
>> (spark.yarn.am.nodeLabelExpression). If spark.yarn.am.nodeLabelExpression
>> is used and yarn.app.mapreduce.am.labels ignored, I could be wrong about
>> Spark AMs only running on CORE instances in EMR.
>>
>> I'm guessing though that spark.yarn.am.nodeLabelExpression would simply
>> override yarn.app.mapreduce.am.labels, so yarn.app.mapreduce.am.labels
>> would be treated as a default when it is set and
>> spark.yarn.am.nodeLabelExpression is not. Is that correct?
>>
>> In short, Alex, you should not need to set any of the label-related
>> properties yourself if you do what I suggested regarding using small CORE
>> instances and large TASK instances. But if you want to do something
>> different, it would also be possible to add a TASK instance group with
>> small nodes and configured with some new label. Then you could set
>> spark.yarn.am.nodeLabelExpression to that label.
>>
>> Thanks, Marcelo, for pointing out spark.yarn.am.nodeLabelExpression!
>>
>> ~ Jonathan
>>
>> On Tue, Feb 9, 2016 at 9:54 AM Marcelo Vanzin 
>> wrote:
>>
>>> You should be able to use spark.yarn.am.nodeLabelExpression if your
>>> version of YARN supports node labels (and you've added a label to the
>>> node where you want the AM to run).
>>>
>>> On Tue, Feb 9, 2016 at 9:51 AM, Alexander Pivovarov
>>>  wrote:
>>> > Am container starts first and yarn selects random computer to run it.
>>> >
>>> > Is it possible to configure yarn so that it selects small computer for
>>> am
>>> > container.
>>> >
>>> > On Feb 9, 2016 12:40 AM, "Sean Owen"  wrote:
>>> >>
>>> >> If it's too small to run an executor, I'd think it would be chosen for
>>> >> the AM as the only way to satisfy the request.
>>> >>
>>> >> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
>>> >>  wrote:
>>> >> > If I add additional small box to the cluster can I configure yarn to
>>> >> > select
>>> >> > small box to run am container?
>>> >> >
>>> >> >
>>> >> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen 
>>> wrote:
>>> >> >>
>>> >> >> Typically YARN is there because you're mediating resource requests
>>> >> >> from things besides Spark, so yeah using every bit of the cluster
>>> is a
>>> >> >> little bit of a corner case. There's not a good answer if all your
>>> >> >> nodes are the same size.
>>> >> >>
>>> >> >> I think you can let YARN over-commit RAM though, and allocate more
>>> >> >> memory than it actually has. It may be beneficial to let them all
>>> >> >> think they have an extra GB, and let one node running the AM
>>> >> >> technically be overcommitted, a state which won't hurt at all
>>> unless
>>> >> >> you're really really tight on memory, in which case something might
>>> >> >> get killed.
>>> >> >>
>>> >> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly <
>>> jonathaka...@gmail.com>
>>> >> >> wrote:
>>> >> >> > Alex,
>>> >> >> >
>>> >> >> > That's a very good question that I've been trying to answer
>>> myself
>>> >> >> > recently
>>> >> >> > too. Since you've mentioned before that you're using EMR, I
>>> assume
>>> >> >> > you're
>>> >> >> > asking this because you've noticed this behavior on emr-4.3.0.
>>> >> >> >
>>> >> >> > In this release, we made some changes to the
>>> >> >> > maximizeResourceAllocation
>>> >> >> > (which you may or may not be using, but either way this issue is
>>> >> >> > present),
>>> >> >> > including the accidental inclusion of somewhat of a bug that
>>> makes it
>>> >> >> > not
>>> >> >> > reserve any space for the AM, which ultimately results in one of
>>> the
>>> >> >> > nodes
>>> >> >> > being utilized only by the AM and not an executor.
>>> >> >> >
>>>

Re: spark on yarn wastes one box (or 1 GB on each box) for am container

2016-02-09 Thread Alexander Pivovarov
Great! Thank you!

On Tue, Feb 9, 2016 at 4:02 PM, Jonathan Kelly 
wrote:

> You can set custom per-instance-group configurations (e.g.,
> ["classification":"yarn-site",properties:{"yarn.nodemanager.labels":"SPARKAM"}])
> using the Configurations parameter of
> http://docs.aws.amazon.com/ElasticMapReduce/latest/API/API_InstanceGroupConfig.html.
> Unfortunately, it's not currently possible to specify per-instance-group
> configurations via the CLI though, only cluster wide configurations.
>
> ~ Jonathan
>
>
> On Tue, Feb 9, 2016 at 12:36 PM Alexander Pivovarov 
> wrote:
>
>> Thanks Jonathan
>>
>> Actually I'd like to use maximizeResourceAllocation.
>>
>> Ideally for me would be to add new instance group having single small box
>> labelled as AM
>> I'm not sure "aws emr create-cluster" supports setting custom LABELS ,
>> the only settings awailable are:
>>
>> InstanceCount=1,BidPrice=0.5,Name=sparkAM,InstanceGroupType=TASK,InstanceType=m3.xlarge
>>
>>
>> How can I specify yarn label AM for that box?
>>
>>
>>
>> On Tue, Feb 9, 2016 at 12:16 PM, Jonathan Kelly 
>> wrote:
>>
>>> Interesting, I was not aware of spark.yarn.am.nodeLabelExpression.
>>>
>>> We do use YARN labels on EMR; each node is automatically labeled with
>>> its type (MASTER, CORE, or TASK). And we do
>>> set yarn.app.mapreduce.am.labels=CORE in yarn-site.xml, but we do not set
>>> spark.yarn.am.nodeLabelExpression.
>>>
>>> Does Spark somehow not actually honor this? It seems weird that Spark
>>> would have its own similar-sounding property
>>> (spark.yarn.am.nodeLabelExpression). If spark.yarn.am.nodeLabelExpression
>>> is used and yarn.app.mapreduce.am.labels ignored, I could be wrong about
>>> Spark AMs only running on CORE instances in EMR.
>>>
>>> I'm guessing though that spark.yarn.am.nodeLabelExpression would simply
>>> override yarn.app.mapreduce.am.labels, so yarn.app.mapreduce.am.labels
>>> would be treated as a default when it is set and
>>> spark.yarn.am.nodeLabelExpression is not. Is that correct?
>>>
>>> In short, Alex, you should not need to set any of the label-related
>>> properties yourself if you do what I suggested regarding using small CORE
>>> instances and large TASK instances. But if you want to do something
>>> different, it would also be possible to add a TASK instance group with
>>> small nodes and configured with some new label. Then you could set
>>> spark.yarn.am.nodeLabelExpression to that label.
>>>
>>> Thanks, Marcelo, for pointing out spark.yarn.am.nodeLabelExpression!
>>>
>>> ~ Jonathan
>>>
>>> On Tue, Feb 9, 2016 at 9:54 AM Marcelo Vanzin 
>>> wrote:
>>>
 You should be able to use spark.yarn.am.nodeLabelExpression if your
 version of YARN supports node labels (and you've added a label to the
 node where you want the AM to run).

 On Tue, Feb 9, 2016 at 9:51 AM, Alexander Pivovarov
  wrote:
 > Am container starts first and yarn selects random computer to run it.
 >
 > Is it possible to configure yarn so that it selects small computer
 for am
 > container.
 >
 > On Feb 9, 2016 12:40 AM, "Sean Owen"  wrote:
 >>
 >> If it's too small to run an executor, I'd think it would be chosen
 for
 >> the AM as the only way to satisfy the request.
 >>
 >> On Tue, Feb 9, 2016 at 8:35 AM, Alexander Pivovarov
 >>  wrote:
 >> > If I add additional small box to the cluster can I configure yarn
 to
 >> > select
 >> > small box to run am container?
 >> >
 >> >
 >> > On Mon, Feb 8, 2016 at 10:53 PM, Sean Owen 
 wrote:
 >> >>
 >> >> Typically YARN is there because you're mediating resource requests
 >> >> from things besides Spark, so yeah using every bit of the cluster
 is a
 >> >> little bit of a corner case. There's not a good answer if all your
 >> >> nodes are the same size.
 >> >>
 >> >> I think you can let YARN over-commit RAM though, and allocate more
 >> >> memory than it actually has. It may be beneficial to let them all
 >> >> think they have an extra GB, and let one node running the AM
 >> >> technically be overcommitted, a state which won't hurt at all
 unless
 >> >> you're really really tight on memory, in which case something
 might
 >> >> get killed.
 >> >>
 >> >> On Tue, Feb 9, 2016 at 6:49 AM, Jonathan Kelly <
 jonathaka...@gmail.com>
 >> >> wrote:
 >> >> > Alex,
 >> >> >
 >> >> > That's a very good question that I've been trying to answer
 myself
 >> >> > recently
 >> >> > too. Since you've mentioned before that you're using EMR, I
 assume
 >> >> > you're
 >> >> > asking this because you've noticed this behavior on emr-4.3.0.
 >> >> >
 >> >> > In this release, we made some changes to the
 >> >> > maximizeResourceAllocation
 >> >> > (which you may or may not be using, but either way this issue is
 >> >> > present),
 >> >> > including the accidental inclusion of somewhat o

map-side-combine in Spark SQL

2016-02-09 Thread Rishitesh Mishra
Can anybody confirm, whether ANY operator in Spark SQL uses
map-side-combine ? If not, is it safe to assume SortShuffleManager will
always use Serialized sorting in case of queries from Spark SQL ?


Re: Kmeans++ using 1 core only Was: Slowness in Kmeans calculating fastSquaredDistance

2016-02-09 Thread Li Ming Tsai
Forwarding to the dev list, hoping someone can chime in.


@mengxr?



From: Li Ming Tsai 
Sent: Wednesday, February 10, 2016 12:43 PM
To: u...@spark.apache.org
Subject: Re: Slowness in Kmeans calculating fastSquaredDistance


Hi,


It looks like Kmeans++ is slow 
(SPARK-3424) in the 
initialisation phase and is local to driver using 1 core only.


If I use random, the job completed in 1.5mins compared to 1hr+.


Should I move this to the dev list?


Regards,

Liming



From: Li Ming Tsai 
Sent: Sunday, February 7, 2016 10:03 AM
To: u...@spark.apache.org
Subject: Re: Slowness in Kmeans calculating fastSquaredDistance


Hi,


I did more investigation and found out that BLAS.scala is calling the native 
reference architecture (f2jblas) for level 1 routines.


I even patched it to use nativeBlas.ddot but it has no material impact.


https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala#L126


private def dot(x: DenseVector, y: DenseVector): Double = {

val n = x.size

f2jBLAS.ddot(n, x.values, 1, y.values, 1)

  }


Maybe Xiangrui can comment on this?




From: Li Ming Tsai 
Sent: Friday, February 5, 2016 10:56 AM
To: u...@spark.apache.org
Subject: Slowness in Kmeans calculating fastSquaredDistance


Hi,


I'm using INTEL MKL on Spark 1.6.0 which I built myself with the -Pnetlib-lgpl 
flag.


I am using spark local[4] mode and I run it like this:
# export LD_LIBRARY_PATH=/opt/intel/lib/intel64:/opt/intel/mkl/lib/intel64
# bin/spark-shell ...

I have also added the following to /opt/intel/mkl/lib/intel64:
lrwxrwxrwx 1 root root12 Feb  1 09:18 libblas.so -> libmkl_rt.so
lrwxrwxrwx 1 root root12 Feb  1 09:18 libblas.so.3 -> libmkl_rt.so
lrwxrwxrwx 1 root root12 Feb  1 09:18 liblapack.so -> libmkl_rt.so
lrwxrwxrwx 1 root root12 Feb  1 09:18 liblapack.so.3 -> libmkl_rt.so


I believe (???) that I'm using Intel MKL because the warnings went away:

16/02/01 07:49:38 WARN BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeSystemBLAS

16/02/01 07:49:38 WARN BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeRefBLAS

After collectAsMap, there is no progress but I can observe that only 1 CPU is 
being utilised with the following stack trace:

"ForkJoinPool-3-worker-7" #130 daemon prio=5 os_prio=0 tid=0x7fbf30ab6000 
nid=0xbdc runnable [0x7fbf12205000]

   java.lang.Thread.State: RUNNABLE

at com.github.fommil.netlib.F2jBLAS.ddot(F2jBLAS.java:71)

at org.apache.spark.mllib.linalg.BLAS$.dot(BLAS.scala:128)

at org.apache.spark.mllib.linalg.BLAS$.dot(BLAS.scala:111)

at 
org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:349)

at 
org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:587)

at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:561)

at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:555)


This last few steps takes more than half of the total time for a 1Mx100 dataset.


The code is just:

val clusters = KMeans.train(parsedData, 1000, 1)


Shouldn't it utilising all the cores for the dot product? Is this a 
misconfiguration?


Thanks!