Structured Streaming Source error

2017-01-31 Thread Sam Elamin
Hi Folks


I am getting a weird error when trying to write a BigQuery Structured
Streaming source


Error:
java.lang.AbstractMethodError:
com.samelamin.spark.bigquery.streaming.BigQuerySource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358)


FYI if you are interested in Spark and BigQuery feel free to give my
connector a go! Still trying to get structured streaming to it as a source
hence this email. but you can use it as a sink!


Regards
Sam


Re: Structured Streaming Source error

2017-01-31 Thread Sam Elamin
Ha Ryan your everywhere,JIRA and maillist. I thought multitasking was a
myth!

Thanks for your help. It was using different versions!

Regards
Sam

On Tue, Jan 31, 2017 at 9:48 PM, Shixiong(Ryan) Zhu  wrote:

> You used one Spark version to compile your codes but another newer version
> to run. As the Source APIs are not stable, Spark doesn't guarantee that
> they are binary compatibility.
>
> On Tue, Jan 31, 2017 at 1:39 PM, Sam Elamin 
> wrote:
>
>> Hi Folks
>>
>>
>> I am getting a weird error when trying to write a BigQuery Structured
>> Streaming source
>>
>>
>> Error:
>> java.lang.AbstractMethodError: com.samelamin.spark.bigquery.s
>> treaming.BigQuerySource.commit(Lorg/apache/spark/sql/executi
>> on/streaming/Offset;)V
>> at org.apache.spark.sql.execution.streaming.StreamExecution$$
>> anonfun$org$apache$spark$sql$execution$streaming$
>> StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$
>> 5.apply(StreamExecution.scala:359)
>> at org.apache.spark.sql.execution.streaming.StreamExecution$$
>> anonfun$org$apache$spark$sql$execution$streaming$
>> StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$
>> 5.apply(StreamExecution.scala:358)
>>
>>
>> FYI if you are interested in Spark and BigQuery feel free to give my
>> connector a go! Still trying to get structured streaming to it as a source
>> hence this email. but you can use it as a sink!
>>
>>
>> Regards
>> Sam
>>
>>
>>
>>
>


Structured Streaming Schema Issue

2017-02-01 Thread Sam Elamin
Hi All

I am writing a bigquery connector here
 and I am getting a strange
error with schemas being overwritten when a dataframe is passed over to the
Sink


for example the source returns this StructType
WARN streaming.BigQuerySource:
StructType(StructField(customerid,LongType,true),

and the sink is recieving this StructType
WARN streaming.BigQuerySink:
StructType(StructField(customerid,StringType,true)


Any idea why this might be happening?
I dont have infering schema on

spark.conf.set("spark.sql.streaming.schemaInference", "false")

I know its off by default but I set it just to be sure

So completely lost to what could be causing this

Regards
Sam


Re: Structured Streaming Schema Issue

2017-02-01 Thread Sam Elamin
Thanks for the quick response TD!

Ive been trying to identify where exactly this transformation happens

The readStream returns a dataframe with the correct schema

The minute I call writeStream, by the time I get to the addBatch method,
the dataframe there has an incorrect Schema

So Im skeptical about the issue being prior to the readStream since the
output dataframe has the correct Schema


Am I missing something completely obvious?

Regards
Sam

On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das 
wrote:

> You should make sure that schema of the streaming Dataset returned by
> `readStream`, and the schema of the DataFrame returned by the sources
> getBatch.
>
> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin 
> wrote:
>
>> Hi All
>>
>> I am writing a bigquery connector here
>> <http://github.com/samelamin/spark-bigquery> and I am getting a strange
>> error with schemas being overwritten when a dataframe is passed over to the
>> Sink
>>
>>
>> for example the source returns this StructType
>> WARN streaming.BigQuerySource: StructType(StructField(custome
>> rid,LongType,true),
>>
>> and the sink is recieving this StructType
>> WARN streaming.BigQuerySink: StructType(StructField(custome
>> rid,StringType,true)
>>
>>
>> Any idea why this might be happening?
>> I dont have infering schema on
>>
>> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>>
>> I know its off by default but I set it just to be sure
>>
>> So completely lost to what could be causing this
>>
>> Regards
>> Sam
>>
>
>


Re: Structured Streaming Schema Issue

2017-02-01 Thread Sam Elamin
Yeah sorry Im still working on it, its on a branch you can find here
<https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
ignore the logging messages I was trying to workout how the APIs work and
unfortunately because I have to shade the dependency I cant debug it in an
IDE (that I know of! )

So I can see the correct schema here
<https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64>
and
also when the df is returned(After .load() )

But when that same df has writeStream applied to it, the addBatch dataframe
has a new schema. Its similar to the old schema but some ints have been
turned to strings.



On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das 
wrote:

> I am assuming that you have written your own BigQuerySource (i dont see
> that code in the link you posted). In that source, you must have
> implemented getBatch which uses offsets to return the Dataframe having the
> data of a batch. Can you double check when this DataFrame returned by
> getBatch, has the expected schema?
>
> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin 
> wrote:
>
>> Thanks for the quick response TD!
>>
>> Ive been trying to identify where exactly this transformation happens
>>
>> The readStream returns a dataframe with the correct schema
>>
>> The minute I call writeStream, by the time I get to the addBatch method,
>> the dataframe there has an incorrect Schema
>>
>> So Im skeptical about the issue being prior to the readStream since the
>> output dataframe has the correct Schema
>>
>>
>> Am I missing something completely obvious?
>>
>> Regards
>> Sam
>>
>> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> You should make sure that schema of the streaming Dataset returned by
>>> `readStream`, and the schema of the DataFrame returned by the sources
>>> getBatch.
>>>
>>> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin 
>>> wrote:
>>>
>>>> Hi All
>>>>
>>>> I am writing a bigquery connector here
>>>> <http://github.com/samelamin/spark-bigquery> and I am getting a
>>>> strange error with schemas being overwritten when a dataframe is passed
>>>> over to the Sink
>>>>
>>>>
>>>> for example the source returns this StructType
>>>> WARN streaming.BigQuerySource: StructType(StructField(custome
>>>> rid,LongType,true),
>>>>
>>>> and the sink is recieving this StructType
>>>> WARN streaming.BigQuerySink: StructType(StructField(custome
>>>> rid,StringType,true)
>>>>
>>>>
>>>> Any idea why this might be happening?
>>>> I dont have infering schema on
>>>>
>>>> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>>>>
>>>> I know its off by default but I set it just to be sure
>>>>
>>>> So completely lost to what could be causing this
>>>>
>>>> Regards
>>>> Sam
>>>>
>>>
>>>
>>
>


Re: Structured Streaming Schema Issue

2017-02-01 Thread Sam Elamin
There isn't a query per se.im writing the entire dataframe from the output
of the read stream. Once I got that working I was planning to test the
query aspect


I'll do a bit more digging. Thank you very much for your help. Structued
streaming is very exciting and I really am enjoying writing a connector for
it!

Regards
Sam
On Thu, 2 Feb 2017 at 00:02, Tathagata Das 
wrote:

> What is the query you are apply writeStream on? Essentially can you print
> the whole query.
>
> Also, you can do StreamingQuery.explain() to see in full details how the
> logical plan changes to physical plan, for a batch of data. that might
> help. try doing that with some other sink to make sure the source works
> correctly, and then try using your sink.
>
> If you want further debugging, then you will have to dig into the
> StreamingExecution class in Spark, and debug stuff there.
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L523
>
> On Wed, Feb 1, 2017 at 3:49 PM, Sam Elamin 
> wrote:
>
> Yeah sorry Im still working on it, its on a branch you can find here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
> ignore the logging messages I was trying to workout how the APIs work and
> unfortunately because I have to shade the dependency I cant debug it in an
> IDE (that I know of! )
>
> So I can see the correct schema here
> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64>
>  and
> also when the df is returned(After .load() )
>
> But when that same df has writeStream applied to it, the addBatch
> dataframe has a new schema. Its similar to the old schema but some ints
> have been turned to strings.
>
>
>
> On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
> I am assuming that you have written your own BigQuerySource (i dont see
> that code in the link you posted). In that source, you must have
> implemented getBatch which uses offsets to return the Dataframe having the
> data of a batch. Can you double check when this DataFrame returned by
> getBatch, has the expected schema?
>
> On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin 
> wrote:
>
> Thanks for the quick response TD!
>
> Ive been trying to identify where exactly this transformation happens
>
> The readStream returns a dataframe with the correct schema
>
> The minute I call writeStream, by the time I get to the addBatch method,
> the dataframe there has an incorrect Schema
>
> So Im skeptical about the issue being prior to the readStream since the
> output dataframe has the correct Schema
>
>
> Am I missing something completely obvious?
>
> Regards
> Sam
>
> On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
> You should make sure that schema of the streaming Dataset returned by
> `readStream`, and the schema of the DataFrame returned by the sources
> getBatch.
>
> On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin 
> wrote:
>
> Hi All
>
> I am writing a bigquery connector here
> <http://github.com/samelamin/spark-bigquery> and I am getting a strange
> error with schemas being overwritten when a dataframe is passed over to the
> Sink
>
>
> for example the source returns this StructType
> WARN streaming.BigQuerySource:
> StructType(StructField(customerid,LongType,true),
>
> and the sink is recieving this StructType
> WARN streaming.BigQuerySink:
> StructType(StructField(customerid,StringType,true)
>
>
> Any idea why this might be happening?
> I dont have infering schema on
>
> spark.conf.set("spark.sql.streaming.schemaInference", "false")
>
> I know its off by default but I set it just to be sure
>
> So completely lost to what could be causing this
>
> Regards
> Sam
>
>
>
>
>
>
>


Re: Structured Streaming Schema Issue

2017-02-02 Thread Sam Elamin
city,StringType,true), StructField(email,StringType,true),
StructField(id,StringType,true), StructField(name,StringType,true),
StructField(phonenumber,StringType,true),
StructField(postcode,StringType,true),
StructField(previousjeordercount,StringType,true), StructField(
previousrestuarantordercount,StringType,true),
StructField(timezone,StringType,true)),true),
StructField(id,StringType,true), StructField(islocked,BooleanType,true),
StructField(legacyid,StringType,true), StructField(order,StructType(
StructField(duedate,StringType,true),
StructField(duedatewithutcoffset,StringType,true),
StructField(initialduedate,StringType,true), StructField(
initialduedatewithutcoffset,StringType,true),
StructField(notetorestaurant,StringType,true),
StructField(orderable,BooleanType,true),
StructField(placeddate,StringType,true),
StructField(promptasap,BooleanType,true),
StructField(servicetype,StringType,true)),true),
StructField(paymentinfo,StructType(StructField(drivertipvalue,LongType,true),
StructField(orderid,StringType,true), StructField(paiddate,StringType,true),
StructField(paymentlines,ArrayType(StructType(StructField(cardfee,DoubleType,true),
StructField(cardtype,StringType,true),
StructField(paymenttransactionref,StringType,true),
StructField(pspname,StringType,true), StructField(type,StringType,true),
StructField(value,DoubleType,true)),true),true),
StructField(total,DoubleType,true),
StructField(totalcomplementary,LongType,true)),true),
StructField(restaurantinfo,StructType(StructField(addresslines,ArrayType(StringType,true),true),
StructField(city,StringType,true), StructField(dispatchmethod,StringType,true),
StructField(id,StringType,true), StructField(latitude,DoubleType,true),
StructField(longitude,DoubleType,true), StructField(name,StringType,true),
StructField(offline,BooleanType,true),
StructField(phonenumber,StringType,true),
StructField(postcode,StringType,true), StructField(seoname,StringType,true),
StructField(tempoffline,BooleanType,true)),true)),true),
StructField(orderid,StringType,true),
StructField(orderresolutionstatus,StringType,true),
StructField(raisingcomponent,StringType,true),
StructField(restaurantid,StringType,true),
StructField(tenant,StringType,true), StructField(timestamp,StringType,true))


On Thu, Feb 2, 2017 at 12:04 AM, Sam Elamin  wrote:

> There isn't a query per se.im writing the entire dataframe from the
> output of the read stream. Once I got that working I was planning to test
> the query aspect
>
>
> I'll do a bit more digging. Thank you very much for your help. Structued
> streaming is very exciting and I really am enjoying writing a connector for
> it!
>
> Regards
> Sam
> On Thu, 2 Feb 2017 at 00:02, Tathagata Das 
> wrote:
>
>> What is the query you are apply writeStream on? Essentially can you print
>> the whole query.
>>
>> Also, you can do StreamingQuery.explain() to see in full details how the
>> logical plan changes to physical plan, for a batch of data. that might
>> help. try doing that with some other sink to make sure the source works
>> correctly, and then try using your sink.
>>
>> If you want further debugging, then you will have to dig into the
>> StreamingExecution class in Spark, and debug stuff there.
>> https://github.com/apache/spark/blob/master/sql/core/src/
>> main/scala/org/apache/spark/sql/execution/streaming/Stream
>> Execution.scala#L523
>>
>> On Wed, Feb 1, 2017 at 3:49 PM, Sam Elamin 
>> wrote:
>>
>> Yeah sorry Im still working on it, its on a branch you can find here
>> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>,
>> ignore the logging messages I was trying to workout how the APIs work and
>> unfortunately because I have to shade the dependency I cant debug it in an
>> IDE (that I know of! )
>>
>> So I can see the correct schema here
>> <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64>
>>  and
>> also when the df is returned(After .load() )
>>
>> But when that same df has writeStream applied to it, the addBatch
>> dataframe has a new schema. Its similar to the old schema but some ints
>> have been turned to strings.
>>
>>
>>
>> On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>> I am assuming that you have written your own BigQuerySource (i dont see
>> that code in the link you posted). In that source, you must have
>> implemented getBatch which uses offsets to return the Dataframe having the
>> data of a batch. Can you double check when this DataFrame returned by
>> getBatch, has the expected schema?

Re: Structured Streaming Schema Issue

2017-02-03 Thread Sam Elamin
Hey td


I figured out what was happening

My source would return the correct schema but the schema on the returned df
was actually different. I'm loading json data from cloud storage and that
gets infered instead of set

So basically the schema I return on the source provider wasn't actually
being used

Thanks again for your help. Out of interest is there a way of debugging
this on an ide? What do you recommend because I've been adding far too many
debug statements just to understand what's happening!

Regards
Sam
On Fri, 3 Feb 2017 at 13:19, Tathagata Das 
wrote:

> Hard to say what is going on without really understanding how you are
> creating the DF in Source.getBatch(). Yes, there may be another inference
> in getBatch, where you have to be careful that you dont infer a schema that
> is different the schema inferred earlier (when creating the streaming DF
> using readStream...load()). How about print the schema the batch DF before
> returning from getBatch. At least that would narrow down the possible
> location of the problem - in getBatch, OR after getBatch returns + before
> addBatch is called OR after addBatch is called.
>
>
>
> On Thu, Feb 2, 2017 at 7:30 AM, Sam Elamin 
> wrote:
>
> Hi All
>
> Ive done a bit more digging to where exactly this happens. It seems like
> the schema is infered again after the data leaves the source and then comes
> into the sink
>
> Below is a stack trace, the schema at the BigQuerySource has a LongType
> for customer id but then at the sink, the data received has an incorrect
> schema
>
> where exactly is the data stored in between these steps? Shouldnt the sink
> call the Schema method from Source?
>
> If it helps I want to clarify that I am not passing in the schema when i
> initialise the readStream, but I am overriding the sourceSchema in my
> class that extends StreamSourceProvider. But even when that method is
> called the schema is correct
>
> p.s Ignore the "production" bucket, thats just a storage bucket I am
> using, not actually using structured streaming in production just yet :)
>
>
> 17/02/02 14:57:22 WARN streaming.BigQuerySource:
> StructType(StructField(customerid,LongType,true),
> StructField(id,StringType,true), StructField(legacyorderid,LongType,true),
> StructField(notifiycustomer,BooleanType,true),
> StructField(ordercontainer,StructType(StructField(applicationinfo,StructType(StructField(applicationname,StringType,true),
> StructField(applicationversion,StringType,true),
> StructField(clientip,StringType,true),
> StructField(jefeature,StringType,true),
> StructField(useragent,StringType,true)),true),
> StructField(basketinfo,StructType(StructField(basketid,StringType,true),
> StructField(deliverycharge,FloatType,true),
> StructField(discount,FloatType,true),
> StructField(discounts,StringType,true),
> StructField(items,StructType(StructField(combinedprice,FloatType,true),
> StructField(description,StringType,true),
> StructField(discounts,StringType,true),
> StructField(mealparts,StringType,true),
> StructField(menucardnumber,StringType,true),
> StructField(multibuydiscounts,StringType,true),
> StructField(name,StringType,true),
> StructField(optionalaccessories,StringType,true),
> StructField(productid,LongType,true),
> StructField(producttypeid,LongType,true),
> StructField(requiredaccessories,StructType(StructField(groupid,LongType,true),
> StructField(name,StringType,true),
> StructField(requiredaccessoryid,LongType,true),
> StructField(unitprice,FloatType,true)),true),
> StructField(synonym,StringType,true),
> StructField(unitprice,FloatType,true)),true),
> StructField(menuid,LongType,true),
> StructField(multibuydiscount,FloatType,true),
> StructField(subtotal,FloatType,true), StructField(tospend,FloatType,true),
> StructField(total,FloatType,true)),true),
> StructField(customerinfo,StructType(StructField(address,StringType,true),
> StructField(city,StringType,true), StructField(email,StringType,true),
> StructField(id,StringType,true), StructField(name,StringType,true),
> StructField(phonenumber,StringType,true),
> StructField(postcode,StringType,true),
> StructField(previousjeordercount,LongType,true),
> StructField(previousrestuarantordercount,LongType,true),
> StructField(timezone,StringType,true)),true),
> StructField(id,StringType,true), StructField(islocked,BooleanType,true),
> StructField(legacyid,LongType,true),
> StructField(order,StructType(StructField(duedate,StringType,true),
> StructField(duedatewithutcoffset,StringType,true),
> StructField(initialduedate,StringType,true),
> StructField(initialduedatewithutcoffset,StringType,true),
> StructField(notetorestaurant,StringType,true),
> StructField(orderable,BooleanType,true),
> StructF

specifing schema on dataframe

2017-02-04 Thread Sam Elamin
Hi All

I would like to specify a schema when reading from a json but when trying
to map a number to a Double it fails, I tried FloatType and IntType with no
joy!


When inferring the schema customer id is set to String, and I would like to
cast it as Double

so df1 is corrupted while df2 shows


Also FYI I need this to be generic as I would like to apply it to any json,
I specified the below schema as an example of the issue I am facing

import org.apache.spark.sql.types.{BinaryType, StringType,
StructField, DoubleType,FloatType, StructType, LongType,DecimalType}
val testSchema = StructType(Array(StructField("customerid",DoubleType)))
val df1 = 
spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
val df2 = spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
df1.show(1)
df2.show(1)


Any help would be appreciated, I am sure I am missing something obvious but
for the life of me I cant tell what it is!


Kind Regards
Sam


Re: specifing schema on dataframe

2017-02-04 Thread Sam Elamin
Hi Direceu

Thanks your right! that did work


But now im facing an even bigger problem since i dont have access to change
the underlying data, I just want to apply a schema over something that was
written via the sparkContext.newAPIHadoopRDD

Basically I am reading in a RDD[JsonObject] and would like to convert it
into a dataframe which I pass the schema into

Whats the best way to do this?

I doubt removing all the quotes in the JSON is the best solution is it?

Regards
Sam

On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Sam
> Remove the " from the number that it will work
>
> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" 
> escreveu:
>
>> Hi All
>>
>> I would like to specify a schema when reading from a json but when trying
>> to map a number to a Double it fails, I tried FloatType and IntType with no
>> joy!
>>
>>
>> When inferring the schema customer id is set to String, and I would like
>> to cast it as Double
>>
>> so df1 is corrupted while df2 shows
>>
>>
>> Also FYI I need this to be generic as I would like to apply it to any
>> json, I specified the below schema as an example of the issue I am facing
>>
>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>> DoubleType,FloatType, StructType, LongType,DecimalType}
>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>> val df1 = 
>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> val df2 = 
>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> df1.show(1)
>> df2.show(1)
>>
>>
>> Any help would be appreciated, I am sure I am missing something obvious
>> but for the life of me I cant tell what it is!
>>
>>
>> Kind Regards
>> Sam
>>
>


Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Hi All

When trying to read a stream off S3 and I try and drop duplicates I get the
following error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append
output mode not supported when there are streaming aggregations on
streaming DataFrames/DataSets;;


Whats strange if I use the batch "spark.read.json", it works

Can I assume you cant drop duplicates in structured streaming

Regards
Sam


Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Thanks Micheal!



On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust 
wrote:

> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>
> We should add this soon.
>
> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin 
> wrote:
>
>> Hi All
>>
>> When trying to read a stream off S3 and I try and drop duplicates I get
>> the following error:
>>
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> Append output mode not supported when there are streaming aggregations on
>> streaming DataFrames/DataSets;;
>>
>>
>> Whats strange if I use the batch "spark.read.json", it works
>>
>> Can I assume you cant drop duplicates in structured streaming
>>
>> Regards
>> Sam
>>
>
>


Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
On another note, when it comes to checkpointing on structured streaming

I noticed if I have  a stream running off s3 and I kill the process. The
next time the process starts running it dulplicates the last record
inserted. is that normal?




So say I have streaming enabled on one folder "test" which only has two
files "update1" and "update 2", then I kill the spark job using Ctrl+C.
When I rerun the stream it picks up "update 2" again

Is this normal? isnt ctrl+c a failure?

I would expect checkpointing to know that update 2 was already processed

Regards
Sam

On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin  wrote:

> Thanks Micheal!
>
>
>
> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust 
> wrote:
>
>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>
>> We should add this soon.
>>
>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin 
>> wrote:
>>
>>> Hi All
>>>
>>> When trying to read a stream off S3 and I try and drop duplicates I get
>>> the following error:
>>>
>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>> Append output mode not supported when there are streaming aggregations on
>>> streaming DataFrames/DataSets;;
>>>
>>>
>>> Whats strange if I use the batch "spark.read.json", it works
>>>
>>> Can I assume you cant drop duplicates in structured streaming
>>>
>>> Regards
>>> Sam
>>>
>>
>>
>


Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Hmm ok I understand that but the job is running for a good few mins before
I kill it so there should not be any jobs left because I can see in the log
that its now polling for new changes, the latest offset is the right one

After I kill it and relaunch it picks up that same file?


Sorry if I misunderstood you

On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust 
wrote:

> It is always possible that there will be extra jobs from failed batches.
> However, for the file sink, only one set of files will make it into
> _spark_metadata directory log.  This is how we get atomic commits even when
> there are files in more than one directory.  When reading the files with
> Spark, we'll detect this directory and use it instead of listStatus to find
> the list of valid files.
>
> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin 
> wrote:
>
>> On another note, when it comes to checkpointing on structured streaming
>>
>> I noticed if I have  a stream running off s3 and I kill the process. The
>> next time the process starts running it dulplicates the last record
>> inserted. is that normal?
>>
>>
>>
>>
>> So say I have streaming enabled on one folder "test" which only has two
>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>> When I rerun the stream it picks up "update 2" again
>>
>> Is this normal? isnt ctrl+c a failure?
>>
>> I would expect checkpointing to know that update 2 was already processed
>>
>> Regards
>> Sam
>>
>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin 
>> wrote:
>>
>>> Thanks Micheal!
>>>
>>>
>>>
>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust >> > wrote:
>>>
>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>>
>>>> We should add this soon.
>>>>
>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin 
>>>> wrote:
>>>>
>>>>> Hi All
>>>>>
>>>>> When trying to read a stream off S3 and I try and drop duplicates I
>>>>> get the following error:
>>>>>
>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>>> Append output mode not supported when there are streaming aggregations on
>>>>> streaming DataFrames/DataSets;;
>>>>>
>>>>>
>>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>>
>>>>> Can I assume you cant drop duplicates in structured streaming
>>>>>
>>>>> Regards
>>>>> Sam
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Ah I see ok so probably it's the retry that's causing it

So when you say I'll have to take this into account, how do I best do that?
My sink will have to know what was that extra file. And i was under the
impression spark would automagically know this because of the checkpoint
directory set when you created the writestream

If that's not the case then how would I go about ensuring no duplicates?


Thanks again for the awesome support!

Regards
Sam
On Tue, 7 Feb 2017 at 18:05, Michael Armbrust 
wrote:

> Sorry, I think I was a little unclear.  There are two things at play here.
>
>  - Exactly-once semantics with file output: spark writes out extra
> metadata on which files are valid to ensure that failures don't cause us to
> "double count" any of the input.  Spark 2.0+ detects this info
> automatically when you use dataframe reader (spark.read...). There may be
> extra files, but they will be ignored. If you are consuming the output with
> another system you'll have to take this into account.
>  - Retries: right now we always retry the last batch when restarting.
> This is safe/correct because of the above, but we could also optimize this
> away by tracking more information about batch progress.
>
> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin 
> wrote:
>
> Hmm ok I understand that but the job is running for a good few mins before
> I kill it so there should not be any jobs left because I can see in the log
> that its now polling for new changes, the latest offset is the right one
>
> After I kill it and relaunch it picks up that same file?
>
>
> Sorry if I misunderstood you
>
> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust 
> wrote:
>
> It is always possible that there will be extra jobs from failed batches.
> However, for the file sink, only one set of files will make it into
> _spark_metadata directory log.  This is how we get atomic commits even when
> there are files in more than one directory.  When reading the files with
> Spark, we'll detect this directory and use it instead of listStatus to find
> the list of valid files.
>
> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin 
> wrote:
>
> On another note, when it comes to checkpointing on structured streaming
>
> I noticed if I have  a stream running off s3 and I kill the process. The
> next time the process starts running it dulplicates the last record
> inserted. is that normal?
>
>
>
>
> So say I have streaming enabled on one folder "test" which only has two
> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
> When I rerun the stream it picks up "update 2" again
>
> Is this normal? isnt ctrl+c a failure?
>
> I would expect checkpointing to know that update 2 was already processed
>
> Regards
> Sam
>
> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin 
> wrote:
>
> Thanks Micheal!
>
>
>
> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust 
> wrote:
>
> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>
> We should add this soon.
>
> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin 
> wrote:
>
> Hi All
>
> When trying to read a stream off S3 and I try and drop duplicates I get
> the following error:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Append
> output mode not supported when there are streaming aggregations on
> streaming DataFrames/DataSets;;
>
>
> Whats strange if I use the batch "spark.read.json", it works
>
> Can I assume you cant drop duplicates in structured streaming
>
> Regards
> Sam
>
>
>
>
>
>
>
>


Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Hi Micheal

If thats the case for the below example, where should i be reading these
json log files first? im assuming sometime between df and query?


val df = spark
.readStream
.option("tableReferenceSource",tableName)
.load()
setUpGoogle(spark.sqlContext)

val query = df
  .writeStream
  .option("tableReferenceSink",tableName2)
  .option("checkpointLocation","checkpoint")
  .start()


On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust 
wrote:

> Read the JSON log of files that is in `/your/path/_spark_metadata` and
> only read files that are present in that log (ignore anything else).
>
> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin 
> wrote:
>
>> Ah I see ok so probably it's the retry that's causing it
>>
>> So when you say I'll have to take this into account, how do I best do
>> that? My sink will have to know what was that extra file. And i was under
>> the impression spark would automagically know this because of the
>> checkpoint directory set when you created the writestream
>>
>> If that's not the case then how would I go about ensuring no duplicates?
>>
>>
>> Thanks again for the awesome support!
>>
>> Regards
>> Sam
>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust 
>> wrote:
>>
>>> Sorry, I think I was a little unclear.  There are two things at play
>>> here.
>>>
>>>  - Exactly-once semantics with file output: spark writes out extra
>>> metadata on which files are valid to ensure that failures don't cause us to
>>> "double count" any of the input.  Spark 2.0+ detects this info
>>> automatically when you use dataframe reader (spark.read...). There may be
>>> extra files, but they will be ignored. If you are consuming the output with
>>> another system you'll have to take this into account.
>>>  - Retries: right now we always retry the last batch when restarting.
>>> This is safe/correct because of the above, but we could also optimize this
>>> away by tracking more information about batch progress.
>>>
>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin 
>>> wrote:
>>>
>>> Hmm ok I understand that but the job is running for a good few mins
>>> before I kill it so there should not be any jobs left because I can see in
>>> the log that its now polling for new changes, the latest offset is the
>>> right one
>>>
>>> After I kill it and relaunch it picks up that same file?
>>>
>>>
>>> Sorry if I misunderstood you
>>>
>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust >> > wrote:
>>>
>>> It is always possible that there will be extra jobs from failed batches.
>>> However, for the file sink, only one set of files will make it into
>>> _spark_metadata directory log.  This is how we get atomic commits even when
>>> there are files in more than one directory.  When reading the files with
>>> Spark, we'll detect this directory and use it instead of listStatus to find
>>> the list of valid files.
>>>
>>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin 
>>> wrote:
>>>
>>> On another note, when it comes to checkpointing on structured streaming
>>>
>>> I noticed if I have  a stream running off s3 and I kill the process. The
>>> next time the process starts running it dulplicates the last record
>>> inserted. is that normal?
>>>
>>>
>>>
>>>
>>> So say I have streaming enabled on one folder "test" which only has two
>>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>>> When I rerun the stream it picks up "update 2" again
>>>
>>> Is this normal? isnt ctrl+c a failure?
>>>
>>> I would expect checkpointing to know that update 2 was already processed
>>>
>>> Regards
>>> Sam
>>>
>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin 
>>> wrote:
>>>
>>> Thanks Micheal!
>>>
>>>
>>>
>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust >> > wrote:
>>>
>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>
>>> We should add this soon.
>>>
>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin 
>>> wrote:
>>>
>>> Hi All
>>>
>>> When trying to read a stream off S3 and I try and drop duplicates I get
>>> the following error:
>>>
>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>> Append output mode not supported when there are streaming aggregations on
>>> streaming DataFrames/DataSets;;
>>>
>>>
>>> Whats strange if I use the batch "spark.read.json", it works
>>>
>>> Can I assume you cant drop duplicates in structured streaming
>>>
>>> Regards
>>> Sam
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>


Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Sorry those are methods I wrote so you can ignore them :)

so just adding a path parameter tells spark thats where the update log is?

Do I check for the unique id there and identify which batch was written and
which weren't

Are there any examples of this out there? there aren't much connectors in
the wild which I can reimplement is there
Should I look at how the file sink is set up and follow that pattern?


Regards
Sam

On Tue, Feb 7, 2017 at 8:40 PM, Michael Armbrust 
wrote:

> The JSON log is only used by the file sink (which it doesn't seem like you
> are using).  Though, I'm not sure exactly what is going on inside of
> setupGoogle or how tableReferenceSource is used.
>
> Typically you would run df.writeStream.option("path", "/my/path")... and
> then the transaction log would go into /my/path/_spark_metadata.
>
> There is not requirement that a sink uses this kind of a update log.  This
> is just how we get better transactional semantics than HDFS is providing.
> If your sink supports transactions natively you should just use those
> instead.  We pass a unique ID to the sink method addBatch so that you can
> make sure you don't commit the same transaction more than once.
>
> On Tue, Feb 7, 2017 at 3:29 PM, Sam Elamin 
> wrote:
>
>> Hi Micheal
>>
>> If thats the case for the below example, where should i be reading these
>> json log files first? im assuming sometime between df and query?
>>
>>
>> val df = spark
>> .readStream
>> .option("tableReferenceSource",tableName)
>> .load()
>> setUpGoogle(spark.sqlContext)
>>
>> val query = df
>>   .writeStream
>>   .option("tableReferenceSink",tableName2)
>>   .option("checkpointLocation","checkpoint")
>>   .start()
>>
>>
>> On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust 
>> wrote:
>>
>>> Read the JSON log of files that is in `/your/path/_spark_metadata` and
>>> only read files that are present in that log (ignore anything else).
>>>
>>> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin 
>>> wrote:
>>>
>>>> Ah I see ok so probably it's the retry that's causing it
>>>>
>>>> So when you say I'll have to take this into account, how do I best do
>>>> that? My sink will have to know what was that extra file. And i was under
>>>> the impression spark would automagically know this because of the
>>>> checkpoint directory set when you created the writestream
>>>>
>>>> If that's not the case then how would I go about ensuring no duplicates?
>>>>
>>>>
>>>> Thanks again for the awesome support!
>>>>
>>>> Regards
>>>> Sam
>>>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust 
>>>> wrote:
>>>>
>>>>> Sorry, I think I was a little unclear.  There are two things at play
>>>>> here.
>>>>>
>>>>>  - Exactly-once semantics with file output: spark writes out extra
>>>>> metadata on which files are valid to ensure that failures don't cause us 
>>>>> to
>>>>> "double count" any of the input.  Spark 2.0+ detects this info
>>>>> automatically when you use dataframe reader (spark.read...). There may be
>>>>> extra files, but they will be ignored. If you are consuming the output 
>>>>> with
>>>>> another system you'll have to take this into account.
>>>>>  - Retries: right now we always retry the last batch when restarting.
>>>>> This is safe/correct because of the above, but we could also optimize this
>>>>> away by tracking more information about batch progress.
>>>>>
>>>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin 
>>>>> wrote:
>>>>>
>>>>> Hmm ok I understand that but the job is running for a good few mins
>>>>> before I kill it so there should not be any jobs left because I can see in
>>>>> the log that its now polling for new changes, the latest offset is the
>>>>> right one
>>>>>
>>>>> After I kill it and relaunch it picks up that same file?
>>>>>
>>>>>
>>>>> Sorry if I misunderstood you
>>>>>
>>>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>> It is always possible that there will be extra

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Sam Elamin
Ignore me, a bit more digging and I was able to find the filesink source
<https://github.com/apache/spark/blob/1ae4652b7e1f77a984b8459c778cb06c814192c5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala>

Following that pattern worked a treat!

Thanks again Micheal :)

On Tue, Feb 7, 2017 at 8:44 PM, Sam Elamin  wrote:

> Sorry those are methods I wrote so you can ignore them :)
>
> so just adding a path parameter tells spark thats where the update log is?
>
> Do I check for the unique id there and identify which batch was written
> and which weren't
>
> Are there any examples of this out there? there aren't much connectors in
> the wild which I can reimplement is there
> Should I look at how the file sink is set up and follow that pattern?
>
>
> Regards
> Sam
>
> On Tue, Feb 7, 2017 at 8:40 PM, Michael Armbrust 
> wrote:
>
>> The JSON log is only used by the file sink (which it doesn't seem like
>> you are using).  Though, I'm not sure exactly what is going on inside of
>> setupGoogle or how tableReferenceSource is used.
>>
>> Typically you would run df.writeStream.option("path", "/my/path")... and
>> then the transaction log would go into /my/path/_spark_metadata.
>>
>> There is not requirement that a sink uses this kind of a update log.
>> This is just how we get better transactional semantics than HDFS is
>> providing.  If your sink supports transactions natively you should just use
>> those instead.  We pass a unique ID to the sink method addBatch so that you
>> can make sure you don't commit the same transaction more than once.
>>
>> On Tue, Feb 7, 2017 at 3:29 PM, Sam Elamin 
>> wrote:
>>
>>> Hi Micheal
>>>
>>> If thats the case for the below example, where should i be reading these
>>> json log files first? im assuming sometime between df and query?
>>>
>>>
>>> val df = spark
>>> .readStream
>>> .option("tableReferenceSource",tableName)
>>> .load()
>>> setUpGoogle(spark.sqlContext)
>>>
>>> val query = df
>>>   .writeStream
>>>   .option("tableReferenceSink",tableName2)
>>>   .option("checkpointLocation","checkpoint")
>>>   .start()
>>>
>>>
>>> On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust >> > wrote:
>>>
>>>> Read the JSON log of files that is in `/your/path/_spark_metadata` and
>>>> only read files that are present in that log (ignore anything else).
>>>>
>>>> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin 
>>>> wrote:
>>>>
>>>>> Ah I see ok so probably it's the retry that's causing it
>>>>>
>>>>> So when you say I'll have to take this into account, how do I best do
>>>>> that? My sink will have to know what was that extra file. And i was under
>>>>> the impression spark would automagically know this because of the
>>>>> checkpoint directory set when you created the writestream
>>>>>
>>>>> If that's not the case then how would I go about ensuring no
>>>>> duplicates?
>>>>>
>>>>>
>>>>> Thanks again for the awesome support!
>>>>>
>>>>> Regards
>>>>> Sam
>>>>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust 
>>>>> wrote:
>>>>>
>>>>>> Sorry, I think I was a little unclear.  There are two things at play
>>>>>> here.
>>>>>>
>>>>>>  - Exactly-once semantics with file output: spark writes out extra
>>>>>> metadata on which files are valid to ensure that failures don't cause us 
>>>>>> to
>>>>>> "double count" any of the input.  Spark 2.0+ detects this info
>>>>>> automatically when you use dataframe reader (spark.read...). There may be
>>>>>> extra files, but they will be ignored. If you are consuming the output 
>>>>>> with
>>>>>> another system you'll have to take this into account.
>>>>>>  - Retries: right now we always retry the last batch when
>>>>>> restarting.  This is safe/correct because of the above, but we could also
>>>>>> optimize this away by tracking more information about batch progress.
>>>>>>
>>>>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin 

Structured Streaming. S3 To Google BigQuery

2017-02-08 Thread Sam Elamin
Hi All

Thank you all for the amazing support! I have written a BigQuery connector
for structured streaming that you can find here


I just tweeted 
about it and would really appreciated it if you retweeted when you get a
chance

The more people know about it and use it the more feedback I can get to
make the connector better!

Ofcourse PRs and feedback are always welcome :)

Thanks again!

Regards
Sam


[Newbie] spark conf

2017-02-10 Thread Sam Elamin
Hi All,


really newbie question here folks, i have properties like my aws access and
secret keys in the core-site.xml in hadoop among other properties, but
thats the only reason I have hadoop installed which seems a bit of an
overkill.

Is there an equivalent of core-site.xml for spark so I dont have to
reference the HADOOP_CONF_DIR in my spark env.sh?

I know I can export env variables for the AWS credentials but other
properties that my application might want to use?

Regards
Sam


Re: [Newbie] spark conf

2017-02-10 Thread Sam Elamin
yeah I thought of that but the file made it seem that its environment
specific rather than application specific configurations

Im more interested in the best practices, would you recommend using the
default conf file for this and uploading them to where the application will
be running (remote clusters etc) ?


Regards
Sam

On Fri, Feb 10, 2017 at 9:36 PM, Reynold Xin  wrote:

> You can put them in spark's own conf/spark-defaults.conf file
>
> On Fri, Feb 10, 2017 at 10:35 PM, Sam Elamin 
> wrote:
>
>> Hi All,
>>
>>
>> really newbie question here folks, i have properties like my aws access
>> and secret keys in the core-site.xml in hadoop among other properties, but
>> thats the only reason I have hadoop installed which seems a bit of an
>> overkill.
>>
>> Is there an equivalent of core-site.xml for spark so I dont have to
>> reference the HADOOP_CONF_DIR in my spark env.sh?
>>
>> I know I can export env variables for the AWS credentials but other
>> properties that my application might want to use?
>>
>> Regards
>> Sam
>>
>>
>>
>>
>


Re: [Newbie] spark conf

2017-02-10 Thread Sam Elamin
yup that worked

Thanks for the clarification!

On Fri, Feb 10, 2017 at 9:42 PM, Marcelo Vanzin  wrote:

> If you place core-site.xml in $SPARK_HOME/conf, I'm pretty sure Spark
> will pick it up. (Sounds like you're not running YARN, which would
> require HADOOP_CONF_DIR.)
>
> Also this is more of a user@ question.
>
> On Fri, Feb 10, 2017 at 1:35 PM, Sam Elamin 
> wrote:
> > Hi All,
> >
> >
> > really newbie question here folks, i have properties like my aws access
> and
> > secret keys in the core-site.xml in hadoop among other properties, but
> thats
> > the only reason I have hadoop installed which seems a bit of an overkill.
> >
> > Is there an equivalent of core-site.xml for spark so I dont have to
> > reference the HADOOP_CONF_DIR in my spark env.sh?
> >
> > I know I can export env variables for the AWS credentials but other
> > properties that my application might want to use?
> >
> > Regards
> > Sam
> >
> >
> >
>
>
>
> --
> Marcelo
>


Re: welcoming Takuya Ueshin as a new Apache Spark committer

2017-02-13 Thread Sam Elamin
Congrats Takuya-san! Clearly well deserved! Well done :)

On Mon, Feb 13, 2017 at 9:02 PM, Maciej Szymkiewicz 
wrote:

> Congratulations!
>
>
> On 02/13/2017 08:16 PM, Reynold Xin wrote:
> > Hi all,
> >
> > Takuya-san has recently been elected an Apache Spark committer. He's
> > been active in the SQL area and writes very small, surgical patches
> > that are high quality. Please join me in congratulating Takuya-san!
> >
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Structured Streaming Spark Summit Demo - Databricks people

2017-02-15 Thread Sam Elamin
Hey folks

This one is mainly aimed at the databricks folks, I have been trying to
replicate the cloudtrail demo 
Micheal did at Spark Summit. The code for it can be found here


My question is how did you get the results to be displayed and updated
continusly in real time

I am also using databricks to duplicate it but I noticed the code link
mentions

 "If you count the number of rows in the table, you should find the value
increasing over time. Run the following every few minutes."
This leads me to believe that the version of Databricks that Micheal was
using for the demo is still not released, or at-least the functionality to
display those changes in real time aren't

Is this the case? or am I completely wrong?

Can I display the results of a structured streaming query in realtime using
the databricks "display" function?


Regards
Sam


Re: Structured Streaming Spark Summit Demo - Databricks people

2017-02-15 Thread Sam Elamin
Fair enough your absolutely right

Thanks for pointing me in the right direction
On Wed, 15 Feb 2017 at 20:13, Nicholas Chammas 
wrote:

> I don't think this is the right place for questions about Databricks. I'm
> pretty sure they have their own website with a forum for questions about
> their product.
>
> Maybe this? https://forums.databricks.com/
>
> On Wed, Feb 15, 2017 at 2:34 PM Sam Elamin 
> wrote:
>
> Hey folks
>
> This one is mainly aimed at the databricks folks, I have been trying to
> replicate the cloudtrail demo
> <https://www.youtube.com/watch?v=IJmFTXvUZgY> Micheal did at Spark
> Summit. The code for it can be found here
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8599738367597028/2070341989008532/3601578643761083/latest.html>
>
> My question is how did you get the results to be displayed and updated
> continusly in real time
>
> I am also using databricks to duplicate it but I noticed the code link
> mentions
>
>  "If you count the number of rows in the table, you should find the value
> increasing over time. Run the following every few minutes."
> This leads me to believe that the version of Databricks that Micheal was
> using for the demo is still not released, or at-least the functionality to
> display those changes in real time aren't
>
> Is this the case? or am I completely wrong?
>
> Can I display the results of a structured streaming query in realtime
> using the databricks "display" function?
>
>
> Regards
> Sam
>
>


Re: Spark Improvement Proposals

2017-02-16 Thread Sam Elamin
Hi Folks

I thought id chime in as someone new to the process so feel free to
disregard it if it doesn't make sense.

I definitely agree that we need a new forum to identify or discuss changes
as JIRA isnt exactly the best place to do that, its a Bug tracker first and
foremost.

For example I was cycling in through the JIRAs to see if theres anything I
can contribute to and there isnt one overall story or goal. There are bugs
or wish lists but I am talking overall high level goals or wishes.

If the point of the SPIP is to focus the discussions on the problems but
also encourage non PMC to be more active in the project the requirement
that every single idea must have a shepherd might do more harm than good.

As Cody mentioned, any PMC member can veto a change, thats fine for
stopping potential detrimental changes, but if this process is hoping to
open the flood gates to allow literally anyone to come up with an idea or
at least facilitate conversations on positive changes then I foresee that
the PMCs will be far too stretched to govern it, since the number of ideas
or discussions will be a number of magnitudes greater than the number of
PMCs who can manage it. We will end up needing to get "project managers"
and "scrum masters" and nobody wants that!

Hence it would deter anyone from raising ideas in the first place unless
they are willing to find a shepherd for it. It reminds me of the need to
raise a BOC (Business Operation Case) when I worked in big corporations.
Overall it reduces morale since by default this isn't necessarily the
average engineers strong point. There is also the issue that the PMCs might
just be too busy to shepherd all the ideas regardless of merit so potential
great additions might die off purely because there just arent enough PMCs
around to take time out of their already busy schedules and give all the
SPIPs the attention they deserve

My point is this, allow anyone to raise ideas, facilitate discussions
proposals in a safe environment

At the end of the day  PMC members will have the guide/veto anything since
they have the experience.

I hope I was able to articulate what I meant, I really am loving working on
Spark and I think the future looks very promising for it and very much look
forward to being involved in the evolution of it

Kind Regards
Sam




On Thu, Feb 16, 2017 at 8:23 PM, Tim Hunter 
wrote:

> The doc looks good to me.
>
> Ryan, the role of the shepherd is to make sure that someone
> knowledgeable with Spark processes is involved: this person can advise
> on technical and procedural considerations for people outside the
> community. Also, if no one is willing to be a shepherd, the proposed
> idea is probably not going to receive much traction in the first
> place.
>
> Tim
>
> On Thu, Feb 16, 2017 at 9:17 AM, Cody Koeninger 
> wrote:
> > Reynold, thanks, LGTM.
> >
> > Sean, great concerns.  I agree that behavior is largely cultural and
> > writing down a process won't necessarily solve any problems one way or
> > the other.  But one outwardly visible change I'm hoping for out of
> > this a way for people who have a stake in Spark, but can't follow
> > jiras closely, to go to the Spark website, see the list of proposed
> > major changes, contribute discussion on issues that are relevant to
> > their needs, and see a clear direction once a vote has passed.  We
> > don't have that now.
> >
> > Ryan, realistically speaking any PMC member can and will stop any
> > changes they don't like anyway, so might as well be up front about the
> > reality of the situation.
> >
> > On Thu, Feb 16, 2017 at 10:43 AM, Sean Owen  wrote:
> >> The text seems fine to me. Really, this is not describing a
> fundamentally
> >> new process, which is good. We've always had JIRAs, we've always been
> able
> >> to call a VOTE for a big question. This just writes down a sensible set
> of
> >> guidelines for putting those two together when a major change is
> proposed. I
> >> look forward to turning some big JIRAs into a request for a SPIP.
> >>
> >> My only hesitation is that this seems to be perceived by some as a new
> or
> >> different thing, that is supposed to solve some problems that aren't
> >> otherwise solvable. I see mentioned problems like: clear process for
> >> managing work, public communication, more committers, some sort of
> binding
> >> outcome and deadline.
> >>
> >> If SPIP is supposed to be a way to make people design in public and a
> way to
> >> force attention to a particular change, then, this doesn't do that by
> >> itself. Therefore I don't want to let a detailed discussion of SPIP
> detract
> >> from the discussion about doing what SPIP implies. It's just a process
> >> document.
> >>
> >> Still, a fine step IMHO.
> >>
> >> On Thu, Feb 16, 2017 at 4:22 PM Reynold Xin 
> wrote:
> >>>
> >>> Updated. Any feedback from other community members?
> >>>
> >>>
> >>> On Wed, Feb 15, 2017 at 2:53 AM, Cody Koeninger 
> >>> wrote:
> 
>  Thanks for doing that.
> 
>  

Re: Structured Streaming Spark Summit Demo - Databricks people

2017-02-16 Thread Sam Elamin
Thanks Micheal it really was a great demo

I figured I needed to add a trigger to display the results. But Buraz from
Databricks mentioned here
<https://forums.databricks.com/questions/10925/structured-streaming-in-real-time.html#comment-10929>
that the display on this functionality wont be available till potentially
the next release of databricks 2.1-db3

Ill take your points into account and try and duplicate it

Apologies if this isn't the forum for the question, im happy to take the
question offline but I genuinely believe the mailing list users might find
it very interesting

Happy to take the discussion offline though :)



On Thu, Feb 16, 2017 at 8:30 PM, Michael Armbrust 
wrote:

> Thanks for your interest in Apache Spark Structured Streaming!
>
> There is nothing secret in that demo, though I did make some configuration
> changes in order to get the timing right (gotta have some dramatic effect
> :) ).  Also I think the visualizations based on metrics output by the
> StreamingQueryListener
> <https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>
>  are
> still being rolled out, but should be available everywhere soon.
>
> First, I set two options to make sure that files were read one at a time,
> thus allowing us to see incremental results.
>
> spark.readStream
>   .option("maxFilesPerTrigger", "1")
>   .option("latestFirst", "true")
> ...
>
> There is more detail on how these options work in this post
> <https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html>
> .
>
> Regarding continually updating result of a streaming query using
> display(df)for streaming DataFrames (i.e. one created with
> spark.readStream), that has worked in Databrick's since Spark 2.1.  The
> longer form example we published requires you to rerun the count to see it
> change at the end of the notebook because that is not a streaming query.
> Instead it is a batch query over data that has been written out by another
> stream.  I'd like to add the ability to run a streaming query from data
> that has been written out by the FileSink (tracked here SPARK-19633
> <https://issues.apache.org/jira/browse/SPARK-19633>).
>
> In the demo, I started two different streaming queries:
>  - one that reads from json / kafka => writes to parquet
>  - one that reads from json / kafka => writes to memory sink
> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks>
> / pushes latest answer to the js running in a browser using the
> StreamingQueryListener
> <https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html>.
> This is packaged up nicely in display(), but there is nothing stopping
> you from building something similar with vanilla Apache Spark.
>
> Michael
>
>
> On Wed, Feb 15, 2017 at 11:34 AM, Sam Elamin 
> wrote:
>
>> Hey folks
>>
>> This one is mainly aimed at the databricks folks, I have been trying to
>> replicate the cloudtrail demo
>> <https://www.youtube.com/watch?v=IJmFTXvUZgY> Micheal did at Spark
>> Summit. The code for it can be found here
>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8599738367597028/2070341989008532/3601578643761083/latest.html>
>>
>> My question is how did you get the results to be displayed and updated
>> continusly in real time
>>
>> I am also using databricks to duplicate it but I noticed the code link
>> mentions
>>
>>  "If you count the number of rows in the table, you should find the
>> value increasing over time. Run the following every few minutes."
>> This leads me to believe that the version of Databricks that Micheal was
>> using for the demo is still not released, or at-least the functionality to
>> display those changes in real time aren't
>>
>> Is this the case? or am I completely wrong?
>>
>> Can I display the results of a structured streaming query in realtime
>> using the databricks "display" function?
>>
>>
>> Regards
>> Sam
>>
>
>


Re: Support for decimal separator (comma or period) in spark 2.1

2017-02-23 Thread Sam Elamin
Hi Arkadiuz

Not sure if there is a localisation ability but I'm sure other will correct
me if I'm wrong


What you could do is write a udf function that replaces the commas with a .

Assuming you know the column in question


Regards
Sam
On Thu, 23 Feb 2017 at 12:31, Arkadiusz Bicz 
wrote:

> Hi Team,
>
> I would like to know if it is possible to specify decimal localization for
> DataFrameReader for  csv?
>
> I have cvs files from localization where decimal separator is comma like
> 0,32 instead of US way like 0.32
>
> Is it a way to specify in current version of spark to provide
> localization:
>
> spark.read.option("sep",";").option("header",
> "true").option("inferSchema",
> "true").format("csv").load("nonuslocalized.csv")
>
> If not should I create ticket in jira for this ? I can work on solution if
> not available.
>
> Best Regards,
>
> Arkadiusz Bicz
>
>
>


Updating

2017-04-13 Thread Sam Elamin
Hey all

Who do I need to talk to in order to update the Useful developer tools page
?

I want to update the build instructions for IntelliJ as they do not apply
at the moment


Regards
Sam


Re: Updating

2017-04-13 Thread Sam Elamin
perfect, thanks Sean!

On Thu, Apr 13, 2017 at 4:21 PM, Sean Owen  wrote:

> The site source is at https://github.com/apache/spark-website/
>
> On Thu, Apr 13, 2017 at 4:20 PM Sam Elamin 
> wrote:
>
>> Hey all
>>
>> Who do I need to talk to in order to update the Useful developer tools
>> page <http://spark.apache.org/developer-tools.html>?
>>
>> I want to update the build instructions for IntelliJ as they do not apply
>> at the moment
>>
>>
>> Regards
>> Sam
>>
>


Re: [ANNOUNCE] Announcing Apache Spark 2.2.0

2017-07-17 Thread Sam Elamin
Well done!  This is amazing news :) Congrats and really cant wait to spread
the structured streaming love!

On Mon, Jul 17, 2017 at 5:25 PM, kant kodali  wrote:

> +1
>
> On Tue, Jul 11, 2017 at 3:56 PM, Jean Georges Perrin  wrote:
>
>> Awesome! Congrats! Can't wait!!
>>
>> jg
>>
>>
>> On Jul 11, 2017, at 18:48, Michael Armbrust 
>> wrote:
>>
>> Hi all,
>>
>> Apache Spark 2.2.0 is the third release of the Spark 2.x line. This
>> release removes the experimental tag from Structured Streaming. In
>> addition, this release focuses on usability, stability, and polish,
>> resolving over 1100 tickets.
>>
>> We'd like to thank our contributors and users for their contributions and
>> early feedback to this release. This release would not have been possible
>> without you.
>>
>> To download Spark 2.2.0, head over to the download page:
>> http://spark.apache.org/downloads.html
>>
>> To view the release notes: https://spark.apache.or
>> g/releases/spark-release-2-2-0.html
>>
>> *(note: If you see any issues with the release notes, webpage or
>> published artifacts, please contact me directly off-list) *
>>
>> Michael
>>
>>
>