Re: is it possible to create one KafkaDirectStream (Dstream) per topic?

2018-05-21 Thread Alonso Isidoro Roman
Check this thread

.

El lun., 21 may. 2018 a las 0:25, kant kodali ()
escribió:

> Hi All,
>
> I have 5 Kafka topics and I am wondering if is even possible to create one
> KafkaDirectStream (Dstream) per topic within the same JVM i.e using only
> one sparkcontext?
>
> Thanks!
>


-- 
Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman



Executors slow down when running on the same node

2018-05-21 Thread Javier Pareja
Hello,

I have a Spark Streaming job reading data from kafka, processing it and
inserting it into Cassandra. The job is running on a cluster with 3
machines. I use mesos to submit the job with 3 executors using 1 core each.
The problem is that when all executors are running on the same node, the
insertion stage onto Cassandra becomes x5 times slower. When the executors
run one on each machine, the insertion runs as expected.

I am using Datastax cassandra driver for the insertion of the stream.

For now all that I can do is to kill the submission and try again. Because
I can't predicts how Mesos assigns resources, I might have to submit it
several times until it works.
Does anyone know what could be wrong?  Any idea of what can I look into?
Network, Max Host Connections, shared VM...?

Javier Pareja


Adding jars

2018-05-21 Thread Malveeka Bhandari
Hi.

Can I add jars to the spark executor classpath in a running context?
Basically if I have a running spark session, if I edit the spark.jars in
the middle of the code, will it pick up the changes?

If not, is there any way to add new dependent jars to a running spark
context ?

We’re using Livy to keep the session up.

Thanks.


Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-05-21 Thread Alonso Isidoro Roman
The main language they developed spark with is scala, so all the new
features go first to scala, java and finally python. I'm not surprised by
the results, we've seen it on Stratio since the first versions of spark. At
the beginning of development, some of our engineers make the prototype with
python, but when it comes down to it, if it goes into production, it has to
be rewritten in scala or java, usually scala.



El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro ()
escribió:

> Hi Javier,
>
> Thank you a lot for the feedback.
> Indeed the CPU is a huge limitation. I got a lot of trouble trying to run
> this use case in yarn-client mode. I managed to run this in standalone
> (local master) mode only.
>
> I do not have the hardware available to run this setup in a cluster yet,
> so I decided to dig a little bit more in the implementation to see what
> could I improve. I just finished evaluating some results.
> If you find something wrong or odd please let me know.
>
> Following your suggestion to use "saveToCassandra" directly I decided to
> try Scala. Everything was implemented in the most similar way possible and
> I got surprised by the results. The scala implementation is much faster.
>
> My current implementation is slightly different from the Python code
> shared some emails ago but to compare the languages influence in the most
> comparable way I used the following snippets:
>
> # Scala implementation --
>
> val kstream = KafkaUtils.createDirectStream[String, String](
>  ssc,
>  LocationStrategies.PreferConsistent,
>  ConsumerStrategies.Subscribe[String, String](topic,
> kafkaParams))
> kstream
>.map( x => parse(x.value) )
>.saveToCassandra("hdpkns", "batch_measurement")
>
> # Python implementation 
> # Adapted from the previously shared code. However instead of calculating
> the metrics, it is just parsing the messages.
>
> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
 {"metadata.broker.list": brokers})

>>>
 kafkaStream \
 .transform(parse) \
 .foreachRDD(casssave)


> For the same streaming input the scala app took an average of ~1.5 seconds
> to handle each event. For the python implementation, the app took an
> average of ~80 seconds to handle each event (and after a lot of pickle
> concurrency access issues).
>
> Note that I considered the time as the difference between the event
> generation (before being published to Kafka) and the moment just before the
> saveToCassandra.
>
> The problem in the python implementation seems to be due to the delay
> introduced by the foreachRDD(casssave) call, which only runs 
> rdd.saveToCassandra(
> "test_hdpkns", "measurement" ).
>
>
> Honestly I was not expecting such a difference between these 2 codes...
> Can you understand why is this happening ?
>
>
>
> Again, Thank you very much for your help,
>
> Best Regards
>
>
> Sharing my current Scala code below
> # Scala Snippet =
> val sparkConf = new SparkConf(). // ...
> val ssc = new StreamingContext(sparkConf, Seconds(1))
> val sc = ssc.sparkContext
> //...
> val kstream = KafkaUtils.createDirectStream[String, String](
>  ssc,
>  LocationStrategies.PreferConsistent,
>  ConsumerStrategies.Subscribe[String, String](topic,
> kafkaParams))
> //...
> // handle Kafka messages in a parallel fashion
> val ckstream = kstream.map( x => parse(x.value) ).cache()
> ckstream
>   .foreachRDD( rdd => {
> rdd.foreach(metrics)
>   } )
> ckstream
>   .saveToCassandra("hdpkns", "microbatch_raw_measurement")
> #=
>
> On 30/04/2018 14:57:50, Javier Pareja  wrote:
> Hi Saulo,
>
> If the CPU is close to 100% then you are hitting the limit. I don't think
> that moving to Scala will make a difference. Both Spark and Cassandra are
> CPU hungry, your setup is small in terms of CPUs. Try running Spark on
> another (physical) machine so that the 2 cores are dedicated to Cassandra.
>
> Kind Regards
> Javier
>
>
>
> On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, 
> wrote:
>
>> Hi Javier,
>>
>> I will try to implement this in scala then. As far as I can see in the
>> documentation there is no SaveToCassandra in the python interface unless
>> you are working with dataframes and the kafkaStream instance does not
>> provide methods to convert an RDD into DF.
>>
>> Regarding my table, it is very simple (see below). Can I change something
>> to make it write faster?
>> CREATE TABLE test_hdpkns.measurement (
>>   mid bigint,
>>   tt timestamp,
>>   in_tt timestamp,
>>   out_tt timestamp,
>>   sensor_id int,
>>   measure double,
>>   PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
>> ) with compact storage;
>>
>> The system CPU while the demo is running is almost always at 100% for
>> both cores.
>>
>>
>> Thank you.
>>
>> Best Regards,
>>
>> On 29/04

testing frameworks

2018-05-21 Thread Steve Pruitt
Hi,

Can anyone recommend testing frameworks suitable for Spark jobs.  Something 
that can be integrated into a CI tool would be great.

Thanks.



help in copying data from one azure subscription to another azure subscription

2018-05-21 Thread amit kumar singh
HI Team,

We are trying to move data between one azure subscription to another azure
subscription is there a faster way to do through spark

i am using distcp and its taking for ever

thanks
rohit


Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread Thakrar, Jayesh
Probably you can do some preprocessing/checking of the paths before you attempt 
to read it via Spark.
Whether it is local or hdfs filesystem, you can try to check for existence and 
other details by using the "FileSystem.globStatus" method from the Hadoop API.

From: JF Chen 
Date: Sunday, May 20, 2018 at 10:30 PM
To: user 
Subject: How to skip nonexistent file when read files with spark?

Hi Everyone
I meet a tricky problem recently. I am trying to read some file paths generated 
by other method. The file paths are represented by wild card in list, like [ 
'/data/*/12', '/data/*/13']
But in practice, if the wildcard cannot match any existed path, it will throw 
an exception:"pyspark.sql.utils.AnalysisException: 'Path does not exist: ...'", 
and the program stops after that.
Actually I want spark can just ignore and skip these nonexistent  file path, 
and continues to run. I have tried python HDFSCli api to check the existence of 
path , but hdfs cli cannot support wildcard.

Any good idea to solve my problem? Thanks~

Regard,
Junfeng Chen


Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-05-21 Thread Russell Spitzer
The answer is most likely that when you use Cross Java - Python code you
incur a penalty for every objects that you transform from a Java object
into a Python object (and then back again to a Python object) when data is
being passed in and out of your functions. A way around this would probably
be to have used the Dataframe API if possible, which would have compiled
the interactions in Java and skipped python-java serialization. Using Scala
from the start thought is a great idea. I would also probably remove the
cache from your stream since that probably is only hurting (adding an
additional serialization which is only used once.)

On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman 
wrote:

> The main language they developed spark with is scala, so all the new
> features go first to scala, java and finally python. I'm not surprised by
> the results, we've seen it on Stratio since the first versions of spark. At
> the beginning of development, some of our engineers make the prototype with
> python, but when it comes down to it, if it goes into production, it has to
> be rewritten in scala or java, usually scala.
>
>
>
> El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<
> saulo.sobre...@outlook.pt>) escribió:
>
>> Hi Javier,
>>
>> Thank you a lot for the feedback.
>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to run
>> this use case in yarn-client mode. I managed to run this in standalone
>> (local master) mode only.
>>
>> I do not have the hardware available to run this setup in a cluster yet,
>> so I decided to dig a little bit more in the implementation to see what
>> could I improve. I just finished evaluating some results.
>> If you find something wrong or odd please let me know.
>>
>> Following your suggestion to use "saveToCassandra" directly I decided to
>> try Scala. Everything was implemented in the most similar way possible and
>> I got surprised by the results. The scala implementation is much faster.
>>
>> My current implementation is slightly different from the Python code
>> shared some emails ago but to compare the languages influence in the most
>> comparable way I used the following snippets:
>>
>> # Scala implementation --
>>
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>  ssc,
>>  LocationStrategies.PreferConsistent,
>>  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> kstream
>>.map( x => parse(x.value) )
>>.saveToCassandra("hdpkns", "batch_measurement")
>>
>> # Python implementation 
>> # Adapted from the previously shared code. However instead of
>> calculating the metrics, it is just parsing the messages.
>>
>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
> {"metadata.broker.list": brokers})
>

> kafkaStream \
> .transform(parse) \
> .foreachRDD(casssave)
>
>
>> For the same streaming input the scala app took an average of ~1.5
>> seconds to handle each event. For the python implementation, the app took
>> an average of ~80 seconds to handle each event (and after a lot of pickle
>> concurrency access issues).
>>
>> Note that I considered the time as the difference between the event
>> generation (before being published to Kafka) and the moment just before the
>> saveToCassandra.
>>
>> The problem in the python implementation seems to be due to the delay
>> introduced by the foreachRDD(casssave) call, which only runs 
>> rdd.saveToCassandra(
>> "test_hdpkns", "measurement" ).
>>
>>
>> Honestly I was not expecting such a difference between these 2 codes...
>> Can you understand why is this happening ?
>>
>>
>>
>> Again, Thank you very much for your help,
>>
>> Best Regards
>>
>>
>> Sharing my current Scala code below
>> # Scala Snippet =
>> val sparkConf = new SparkConf(). // ...
>> val ssc = new StreamingContext(sparkConf, Seconds(1))
>> val sc = ssc.sparkContext
>> //...
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>  ssc,
>>  LocationStrategies.PreferConsistent,
>>  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> //...
>> // handle Kafka messages in a parallel fashion
>> val ckstream = kstream.map( x => parse(x.value) ).cache()
>> ckstream
>>   .foreachRDD( rdd => {
>> rdd.foreach(metrics)
>>   } )
>> ckstream
>>   .saveToCassandra("hdpkns", "microbatch_raw_measurement")
>> #=
>>
>> On 30/04/2018 14:57:50, Javier Pareja  wrote:
>> Hi Saulo,
>>
>> If the CPU is close to 100% then you are hitting the limit. I don't think
>> that moving to Scala will make a difference. Both Spark and Cassandra are
>> CPU hungry, your setup is small in terms of CPUs. Try running Spark on
>> another (physical) machine so that the 2 cores are dedicated to Cassandra.
>>
>> Kind Regards
>> Ja

Re: testing frameworks

2018-05-21 Thread Holden Karau
So I’m biased as the author of spark-testing-base but I think it’s pretty
ok. Are you looking for unit or integration or something else?

On Mon, May 21, 2018 at 5:24 AM Steve Pruitt  wrote:

> Hi,
>
>
>
> Can anyone recommend testing frameworks suitable for Spark jobs.
> Something that can be integrated into a CI tool would be great.
>
>
>
> Thanks.
>
>
>
-- 
Twitter: https://twitter.com/holdenkarau


Spark horizontal scaling is not supported in which cluster mode? Ask

2018-05-21 Thread unk1102
Hi I came by one Spark question which was about which spark cluster manager
does not support horizontal scalability? Answer options were Mesos, Yarn,
Standalone and local mode. I believe all cluster managers are horizontal
scalable please correct if I am wrong. And I think answer is local mode. Is
it true? Please guide. Thanks in advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark horizontal scaling is not supported in which cluster mode? Ask

2018-05-21 Thread Mark Hamstra
Horizontal scaling is scaling across multiple, distributed computers (or at
least OS instances). Local mode is, therefore, by definition not
horizontally scalable since it just uses a configurable number of local
threads. If the question actually asked "which cluster manager...?", then I
have a small issue with it. Local mode isn't really a cluster manager,
since there is no cluster to manage. What it is is one of Spark's
scheduling modes.

On Mon, May 21, 2018 at 9:29 AM unk1102  wrote:

> Hi I came by one Spark question which was about which spark cluster manager
> does not support horizontal scalability? Answer options were Mesos, Yarn,
> Standalone and local mode. I believe all cluster managers are horizontal
> scalable please correct if I am wrong. And I think answer is local mode. Is
> it true? Please guide. Thanks in advance.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark Worker Re-register to Master

2018-05-21 Thread sushil.chaudhary
All, 

We have a problem with Spark Worker. The worker goes down whenever we are
not able to get the spark master up and running before starting the worker. 
Of course- it does try to ReregisterWithMaster  upto 16 attemps : 

1. First 6 attempts it make in interval of appx 10 seconds 
2. Next 10 attempts it make in interval of appox 60 seconds.

But if Master is late, it fails and go down. Can anyone please suggest the
configuration to change the retry behavior.  Is there way, to have more
number of retry with a configurable wait interval so that it can wait for
master for longer period of time. I am unable to find the props which
control these behavior.

Thanks in advance.
Sushil



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Worker Re-register to Master

2018-05-21 Thread sushil.chaudhary
All, 

We have a problem with Spark Worker. The worker goes down whenever we are
not able to get the spark master up and running before starting the worker. 
Of course- it does try to ReregisterWithMaster  upto 16 attemps : 

1. First 6 attempts it make in interval of appx 10 seconds 
2. Next 10 attempts it make in interval of appox 60 seconds.

But if Master is late, it fails and go down. Can anyone please suggest the
configuration to change the retry behavior.  Is there way, to have more
number of retry with a configurable wait interval so that it can wait for
master for longer period of time. I am unable to find the props which
control these behavior.

Thanks in advance.
Sushil



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread JF Chen
Thanks, Thakrar,

I have tried to check the existence of path before read it, but HDFSCli
python package seems not support wildcard.  "FileSystem.globStatus" is a
java api while I am using python via livy Do you know any python api
implementing the same function?


Regard,
Junfeng Chen

On Mon, May 21, 2018 at 9:01 PM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Probably you can do some preprocessing/checking of the paths before you
> attempt to read it via Spark.
>
> Whether it is local or hdfs filesystem, you can try to check for existence
> and other details by using the "FileSystem.globStatus" method from the
> Hadoop API.
>
>
>
> *From: *JF Chen 
> *Date: *Sunday, May 20, 2018 at 10:30 PM
> *To: *user 
> *Subject: *How to skip nonexistent file when read files with spark?
>
>
>
> Hi Everyone
>
> I meet a tricky problem recently. I am trying to read some file paths
> generated by other method. The file paths are represented by wild card in
> list, like [ '/data/*/12', '/data/*/13']
>
> But in practice, if the wildcard cannot match any existed path, it will
> throw an exception:"pyspark.sql.utils.AnalysisException: 'Path does not
> exist: ...'", and the program stops after that.
>
> Actually I want spark can just ignore and skip these nonexistent  file
> path, and continues to run. I have tried python HDFSCli api to check the
> existence of path , but hdfs cli cannot support wildcard.
>
>
>
> Any good idea to solve my problem? Thanks~
>
>
>
> Regard,
> Junfeng Chen
>


Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread ayan guha
A relatively naive solution will be:

0. Create a dummy blank dataframe
1. Loop through the list of paths.
2. Try to create the dataframe from the path. If success then union it
cumulatively.
3. If error, just ignore it or handle as you wish.

At the end of the loop, just use the unioned df. This should not have any
additional performance overhead as declaring dataframes and union is not
expensive, unless you call any action within the loop.

Best
Ayan

On Tue, 22 May 2018 at 11:27 am, JF Chen  wrote:

> Thanks, Thakrar,
>
> I have tried to check the existence of path before read it, but HDFSCli
> python package seems not support wildcard.  "FileSystem.globStatus" is a
> java api while I am using python via livy Do you know any python api
> implementing the same function?
>
>
> Regard,
> Junfeng Chen
>
> On Mon, May 21, 2018 at 9:01 PM, Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
>> Probably you can do some preprocessing/checking of the paths before you
>> attempt to read it via Spark.
>>
>> Whether it is local or hdfs filesystem, you can try to check for
>> existence and other details by using the "FileSystem.globStatus" method
>> from the Hadoop API.
>>
>>
>>
>> *From: *JF Chen 
>> *Date: *Sunday, May 20, 2018 at 10:30 PM
>> *To: *user 
>> *Subject: *How to skip nonexistent file when read files with spark?
>>
>>
>>
>> Hi Everyone
>>
>> I meet a tricky problem recently. I am trying to read some file paths
>> generated by other method. The file paths are represented by wild card in
>> list, like [ '/data/*/12', '/data/*/13']
>>
>> But in practice, if the wildcard cannot match any existed path, it will
>> throw an exception:"pyspark.sql.utils.AnalysisException: 'Path does not
>> exist: ...'", and the program stops after that.
>>
>> Actually I want spark can just ignore and skip these nonexistent  file
>> path, and continues to run. I have tried python HDFSCli api to check the
>> existence of path , but hdfs cli cannot support wildcard.
>>
>>
>>
>> Any good idea to solve my problem? Thanks~
>>
>>
>>
>> Regard,
>> Junfeng Chen
>>
>
> --
Best Regards,
Ayan Guha


Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread JF Chen
Thanks Thakrar~


Regard,
Junfeng Chen

On Tue, May 22, 2018 at 11:22 AM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Junfeng,
>
>
>
> I would suggest preprocessing/validating the paths in plain Python (and
> not Spark) before you try to fetch data.
>
> I am not familiar with Python Hadoop libraries, but see if this helps -
> http://crs4.github.io/pydoop/tutorial/hdfs_api.html
>
>
>
> Best,
>
> Jayesh
>
>
>
> *From: *JF Chen 
> *Date: *Monday, May 21, 2018 at 10:20 PM
> *To: *ayan guha 
> *Cc: *"Thakrar, Jayesh" , user <
> user@spark.apache.org>
> *Subject: *Re: How to skip nonexistent file when read files with spark?
>
>
>
> Thanks ayan,
>
>
>
> Also I have tried this method, the most tricky thing is that dataframe
> union method must be based on same structure schema, while on my files, the
> schema is variable.
>
>
>
>
> Regard,
> Junfeng Chen
>
>
>
> On Tue, May 22, 2018 at 10:33 AM, ayan guha  wrote:
>
> A relatively naive solution will be:
>
>
>
> 0. Create a dummy blank dataframe
>
> 1. Loop through the list of paths.
>
> 2. Try to create the dataframe from the path. If success then union it
> cumulatively.
>
> 3. If error, just ignore it or handle as you wish.
>
>
>
> At the end of the loop, just use the unioned df. This should not have any
> additional performance overhead as declaring dataframes and union is not
> expensive, unless you call any action within the loop.
>
>
>
> Best
>
> Ayan
>
>
>
> On Tue, 22 May 2018 at 11:27 am, JF Chen  wrote:
>
> Thanks, Thakrar,
>
>
>
> I have tried to check the existence of path before read it, but HDFSCli
> python package seems not support wildcard.  "FileSystem.globStatus" is a
> java api while I am using python via livy Do you know any python api
> implementing the same function?
>
>
>
>
> Regard,
> Junfeng Chen
>
>
>
> On Mon, May 21, 2018 at 9:01 PM, Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
> Probably you can do some preprocessing/checking of the paths before you
> attempt to read it via Spark.
>
> Whether it is local or hdfs filesystem, you can try to check for existence
> and other details by using the "FileSystem.globStatus" method from the
> Hadoop API.
>
>
>
> *From: *JF Chen 
> *Date: *Sunday, May 20, 2018 at 10:30 PM
> *To: *user 
> *Subject: *How to skip nonexistent file when read files with spark?
>
>
>
> Hi Everyone
>
> I meet a tricky problem recently. I am trying to read some file paths
> generated by other method. The file paths are represented by wild card in
> list, like [ '/data/*/12', '/data/*/13']
>
> But in practice, if the wildcard cannot match any existed path, it will
> throw an exception:"pyspark.sql.utils.AnalysisException: 'Path does not
> exist: ...'", and the program stops after that.
>
> Actually I want spark can just ignore and skip these nonexistent  file
> path, and continues to run. I have tried python HDFSCli api to check the
> existence of path , but hdfs cli cannot support wildcard.
>
>
>
> Any good idea to solve my problem? Thanks~
>
>
>
> Regard,
> Junfeng Chen
>
>
>
> --
>
> Best Regards,
> Ayan Guha
>
>
>


Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread Thakrar, Jayesh
Junfeng,

I would suggest preprocessing/validating the paths in plain Python (and not 
Spark) before you try to fetch data.
I am not familiar with Python Hadoop libraries, but see if this helps - 
http://crs4.github.io/pydoop/tutorial/hdfs_api.html

Best,
Jayesh

From: JF Chen 
Date: Monday, May 21, 2018 at 10:20 PM
To: ayan guha 
Cc: "Thakrar, Jayesh" , user 

Subject: Re: How to skip nonexistent file when read files with spark?

Thanks ayan,

Also I have tried this method, the most tricky thing is that dataframe union 
method must be based on same structure schema, while on my files, the schema is 
variable.


Regard,
Junfeng Chen

On Tue, May 22, 2018 at 10:33 AM, ayan guha 
mailto:guha.a...@gmail.com>> wrote:
A relatively naive solution will be:

0. Create a dummy blank dataframe
1. Loop through the list of paths.
2. Try to create the dataframe from the path. If success then union it 
cumulatively.
3. If error, just ignore it or handle as you wish.

At the end of the loop, just use the unioned df. This should not have any 
additional performance overhead as declaring dataframes and union is not 
expensive, unless you call any action within the loop.

Best
Ayan

On Tue, 22 May 2018 at 11:27 am, JF Chen 
mailto:darou...@gmail.com>> wrote:
Thanks, Thakrar,

I have tried to check the existence of path before read it, but HDFSCli python 
package seems not support wildcard.  "FileSystem.globStatus" is a java api 
while I am using python via livy Do you know any python api implementing 
the same function?


Regard,
Junfeng Chen

On Mon, May 21, 2018 at 9:01 PM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Probably you can do some preprocessing/checking of the paths before you attempt 
to read it via Spark.
Whether it is local or hdfs filesystem, you can try to check for existence and 
other details by using the "FileSystem.globStatus" method from the Hadoop API.

From: JF Chen mailto:darou...@gmail.com>>
Date: Sunday, May 20, 2018 at 10:30 PM
To: user mailto:user@spark.apache.org>>
Subject: How to skip nonexistent file when read files with spark?

Hi Everyone
I meet a tricky problem recently. I am trying to read some file paths generated 
by other method. The file paths are represented by wild card in list, like [ 
'/data/*/12', '/data/*/13']
But in practice, if the wildcard cannot match any existed path, it will throw 
an exception:"pyspark.sql.utils.AnalysisException: 'Path does not exist: ...'", 
and the program stops after that.
Actually I want spark can just ignore and skip these nonexistent  file path, 
and continues to run. I have tried python HDFSCli api to check the existence of 
path , but hdfs cli cannot support wildcard.

Any good idea to solve my problem? Thanks~

Regard,
Junfeng Chen

--
Best Regards,
Ayan Guha



Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread JF Chen
Thanks ayan,

Also I have tried this method, the most tricky thing is that dataframe
union method must be based on same structure schema, while on my files, the
schema is variable.


Regard,
Junfeng Chen

On Tue, May 22, 2018 at 10:33 AM, ayan guha  wrote:

> A relatively naive solution will be:
>
> 0. Create a dummy blank dataframe
> 1. Loop through the list of paths.
> 2. Try to create the dataframe from the path. If success then union it
> cumulatively.
> 3. If error, just ignore it or handle as you wish.
>
> At the end of the loop, just use the unioned df. This should not have any
> additional performance overhead as declaring dataframes and union is not
> expensive, unless you call any action within the loop.
>
> Best
> Ayan
>
> On Tue, 22 May 2018 at 11:27 am, JF Chen  wrote:
>
>> Thanks, Thakrar,
>>
>> I have tried to check the existence of path before read it, but HDFSCli
>> python package seems not support wildcard.  "FileSystem.globStatus" is a
>> java api while I am using python via livy Do you know any python api
>> implementing the same function?
>>
>>
>> Regard,
>> Junfeng Chen
>>
>> On Mon, May 21, 2018 at 9:01 PM, Thakrar, Jayesh <
>> jthak...@conversantmedia.com> wrote:
>>
>>> Probably you can do some preprocessing/checking of the paths before you
>>> attempt to read it via Spark.
>>>
>>> Whether it is local or hdfs filesystem, you can try to check for
>>> existence and other details by using the "FileSystem.globStatus" method
>>> from the Hadoop API.
>>>
>>>
>>>
>>> *From: *JF Chen 
>>> *Date: *Sunday, May 20, 2018 at 10:30 PM
>>> *To: *user 
>>> *Subject: *How to skip nonexistent file when read files with spark?
>>>
>>>
>>>
>>> Hi Everyone
>>>
>>> I meet a tricky problem recently. I am trying to read some file paths
>>> generated by other method. The file paths are represented by wild card in
>>> list, like [ '/data/*/12', '/data/*/13']
>>>
>>> But in practice, if the wildcard cannot match any existed path, it will
>>> throw an exception:"pyspark.sql.utils.AnalysisException: 'Path does not
>>> exist: ...'", and the program stops after that.
>>>
>>> Actually I want spark can just ignore and skip these nonexistent  file
>>> path, and continues to run. I have tried python HDFSCli api to check the
>>> existence of path , but hdfs cli cannot support wildcard.
>>>
>>>
>>>
>>> Any good idea to solve my problem? Thanks~
>>>
>>>
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>
>> --
> Best Regards,
> Ayan Guha
>