Spark Structured Streaming - DF shows only one column with list of byte array

2018-12-06 Thread salemi
Hi All, I am trying to read messages from Kafka, deserialize the values using Avro and then convert the JSON content to a DF. I would like to see a dataframe like the following for a Kafka message value like {"a": "1" , "b": "1"}: +---+ |a| b |

Re: bulk upsert data batch from Kafka dstream into Postgres db

2017-12-14 Thread salemi
Thank you for your response. In case of an update we need sometime to just update a record and in other cases we need to update the existing record and insert a new record. The statement you proposed doesn't handle that. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --

Re: bulk upsert data batch from Kafka dstream into Postgres db

2017-12-14 Thread salemi
Thank you for your respond. The approach loads just the data into the DB. I am looking for an approach that allows me to update existing entries in the DB amor insert a new entry if it doesn't exist. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ ---

bulk upsert data batch from Kafka dstream into Postgres db

2017-12-13 Thread salemi
Hi All, we are consuming messages from Kafka using Spark dsteam. Once the processing is done we would like to update/insert the data in bulk fashion into the database. I was wondering what the best solution for this might be. Our Postgres database table is not partitioned. Thank you, Ali --

pyspark.sql.utils.AnalysisException: u'Left outer/semi/anti joins with a streaming DataFrame/Dataset on the right is not supported;

2017-12-11 Thread salemi
Hi All, I am having trouble joining two structured streaming DataFrames. I am getting the following error: pyspark.sql.utils.AnalysisException: u'Left outer/semi/anti joins with a streaming DataFrame/Dataset on the right is not supported; Is there other way to join two streaming DataFrames based

Re: pyspark + from_json(col("col_name"), schema) returns all null

2017-12-10 Thread salemi
I found the root cause! There was mismatch between the StructField type and the json message. Is there a good write up / wiki out there that describes how to debug spark jobs? Thanks -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --

pyspark + from_json(col("col_name"), schema) returns all null

2017-12-09 Thread salemi
Hi All, I am using pyspark and consuming messages from Kafka and when I .select(from_json(col("col_name"), schema)) the return values are all null. I looked at the json messages and they are valid strings. any ideas? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ ---

Structured Streaming + Kafka 0.10. connectors + valueDecoder and messageHandler with python

2017-12-09 Thread salemi
Hi All, we are currently using direct streams to get the data from a kafka topic as followed KafkaUtils.createDirectStream(ssc=self.streaming_context, topics=topics, kafkaParams=kafka_params,

Re: PySpark 2.2.0, Kafka 0.10 DataFrames

2017-11-20 Thread salemi
Yes, we are using --packages $SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.2.0 --py-files shell.py -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscrib

PySpark 2.2.0, Kafka 0.10 DataFrames

2017-11-20 Thread salemi
Hi All, we are trying to use DataFrames approach with Kafka 0.10 and PySpark 2.2.0. We followed the instruction on the wiki https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html. We coded something similar to the code below using Python: df = spark \ .read \ .format(

rule engine based on spark

2014-10-14 Thread salemi
hi, is the a rule engine based on spark? i like to allow the business user to define their rules in a language and the execution of the rules should be done in spark. Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rule-engine-based-on-spark-t

spark streaming - saving DStream into HBASE doesn't work

2014-09-04 Thread salemi
Hi, I am using the following code to write data to hbase. I see the jobs are send off but I never get anything in my hbase database. Spark doesn't throw any error? How can such a problem be debugged. Is the code below correct for writing data to hbase? val conf = HBaseConfiguration.create(

Re: Spark Streaming - how to implement multiple calculation using the same data set

2014-09-02 Thread Alireza Salemi
Tobias, That was what I was planing to do and technical lead is the opinion that we should some how process a message only once and calculate all the measures for the worker. I was wondering if there is a solution out there for that? Thanks, Ali > Hi, > > On Wed, Sep 3, 2014 at 6:54 A

Spark Streaming - how to implement multiple calculation using the same data set

2014-09-02 Thread salemi
Hi, I am planing to use a incoming DStream and calculate different measures from the same stream. I was able to calculate the individual measures separately and know I have to merge them and spark streaming doesn't support outer join yet. handlingtimePerWorker List(workerId, hanlingTime) filePr

spark streaming - realtime reports - storing current state of resources

2014-08-22 Thread salemi
Hi All, I have set of 1000k Workers of a company with different attribute associated with them. I like at anytime to be able to report on their current state and update the reports every 5 second. Spark Streaming allows you to receive events about the Workers state changes and process them. Where

spark streaming - how to prevent that empty dstream to get written out to hdfs

2014-08-19 Thread salemi
Hi All, I have the following code and if the dstream is empty spark streaming writes empty files ti hdfs. How can I prevent it? val ssc = new StreamingContext(sparkConf, Minutes(1)) val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) dStream.saveAsNewAPIHadoop

Re: spark - Identifying and skipping processed data in hdfs

2014-08-19 Thread salemi
I like to read those file as they get written and transform the content and write it out as Parquet file -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-Identifying-and-skipping-processed-data-in-hdfs-tp12347p12402.html Sent from the Apache Spark User

Re: spark - reading hfds files every 5 minutes

2014-08-19 Thread salemi
Thank you but how do you convert the stream to parquet file? Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-spark-reading-hfds-files-every-5-minutes-tp12359p12401.html Sent from the Apache Spark User List mailing list archive at Nabble.com. ---

spark - Identifying and skipping processed data in hdfs

2014-08-18 Thread salemi
Hi, Mine data source stores the incoming data every 10 second to hdfs. The naming convention save-.csv (see below) drwxr-xr-x ali supergroup 0 B 0 0 B save-1408396065000.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-140839607.csv drwxr-xr-x ali

spark - reading hfds files every 5 minutes

2014-08-18 Thread salemi
Hi, Mine data source stores the incoming data every 10 second to hdfs. The naming convention save-.csv (see below) drwxr-xr-x ali supergroup 0 B 0 0 B save-1408396065000.csv drwxr-xr-x ali supergroup 0 B 0 0 B save-140839607.csv drwxr-xr-x ali supe

Spark Streaming - saving DStream into hadoop throws execption if checkpoint is enabled.

2014-08-18 Thread salemi
Hi, Using the code below to save DStream into hadoop throws execption if checkpoint is enabled. see the really simple example below. if i take out the scc.checkpoint(...) the the code works. val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(hdfsCheckPointUrl)

Re: spark streaming - saving kafka DStream into hadoop throws exception

2014-08-15 Thread salemi
if I reduce the app to the following code then I don't see the exception. It creates the hadoop files but they are empty! The DStream doesn't get written out to the files! def main(args: Array[String]) { try { val properties = getProperties("settings.properties") StreamingExamples

Re: spark streaming - saving kafka DStream into hadoop throws exception

2014-08-15 Thread salemi
Look this is the whole program. I am not trying to serialize the JobConf. def main(args: Array[String]) { try { val properties = getProperties("settings.properties") StreamingExamples.setStreamingLogLevels() val zkQuorum = properties.get("zookeeper.list").toString() v

spark streaming - saving kafka DStream into hadoop throws exception

2014-08-15 Thread salemi
Hi All, I am just trying to save the kafka dstream to hadoop as followed val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) dStream.saveAsHadoopFiles(hdfsDataUrl, "data") It throws the following exception. What am I doing wrong? 14/08/15 14:30:09 ERROR OneForOne

Re: spark streaming - lamda architecture

2014-08-14 Thread salemi
gt; > On Thu, Aug 14, 2014 at 2:27 PM, salemi < > alireza.salemi@ > > wrote: > >> Hi, >> >> How would you implement the batch layer of lamda architecture with >> spark/spark streaming? >> >> Thanks, >> Ali >> >> >

spark streaming - lamda architecture

2014-08-14 Thread salemi
Hi, How would you implement the batch layer of lamda architecture with spark/spark streaming? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-lamda-architecture-tp12142.html Sent from the Apache Spark User List mailing list arch

spark streaming : what is the best way to make a driver highly available

2014-08-13 Thread salemi
Hi All, what is the best way to make a spark streaming driver highly available. I would like the backup driver to pickup the processing if the primary driver dies. Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-what-is-the-best

Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-08 Thread salemi
Is it possible to keep the events in memory rather than pushing them out to the file system? Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11792.html Sent fro

Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-08 Thread salemi
Thank you. I will look into setting up a hadoop hdfs node. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11790.html Sent from the Apache Spark User List mailing li

Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-07 Thread salemi
That is correct. I do scc.checkpOint("checkpoint"). Why is the checkpoint required? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11731.html Sent from the Apache

Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-07 Thread salemi
Hi, Thank you or your help. With the new code I am getting the following error in the driver. What is going wrong here? 14/08/07 13:22:28 ERROR JobScheduler: Error running job streaming job 1407450148000 ms.0 org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[4528] at apply at List.sca

Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-06 Thread salemi
Hi, The reason I am looking to do it differently is because the latency and batch processing times are bad about 40 sec. I took the times from the Streaming UI. As you suggested I tried the window as below and still the times are bad. val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, t

Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-06 Thread salemi
Hi, I have a DStream called eventData and it contains set of Data objects defined as followed: case class Data(startDate: Long, endDate: Long, className: String, id: String, state: String) How would the reducer and inverse reducer functions look like if I would like to add the data for current

Streaming + SQL : How to resgister a DStream content as a table and access it

2014-08-04 Thread salemi
Hi, I was wondering if you can give me an example on How to resgister a DStream content as a table and access it. Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-SQL-How-to-resgister-a-DStream-content-as-a-table-and-access-it-tp11372.

spark streaming kafka

2014-08-04 Thread salemi
Hi, I have the following driver and it works when I run it in the local[*] mode but if I execute it in a standalone cluster then then I don't get any data from kafka. Does anybody know why that might be? val sparkConf = new SparkConf().setAppName("KafkaMessageReceiver") val sc = new

Re: Tasks fail when ran in cluster but they work fine when submited using local local

2014-08-03 Thread salemi
Let me answer the solution to this problem. I had to set the spark.httpBroadcast.uri to the FQDN of the driver. Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tasks-fail-when-ran-in-cluster-but-they-work-fine-when-submited-using-local-local-tp11167p1

Kafka and Spark application after polling twice.

2014-08-03 Thread salemi
Hi All, My application works when I use the spark-submit with master=local[*]. But if I deploy the application to a standalone cluster master=spark://master:7077 then the application polls twice twice from kafka topic and then it stops working. I don't get any error logs. I can see application c

Tasks fail when ran in cluster but they work fine when submited using local local

2014-08-01 Thread salemi
Hi All, My application works when I use the spark-submit with master=local[*]. But if I deploy the application to a standalone cluster master=spark://master:7077 that the application doesn't work and I get the following exception: 14/08/01 05:18:51 ERROR TaskSchedulerImpl: Lost executor 0 on dev

spark-submit registers the driver twice

2014-07-31 Thread salemi
Hi All, I am using the spark-submit command to submit my jar to a standalone cluster with two executor. When I use the spark-submit it deploys the application twice and I see two application entries in the master UI. The master logs as shown below also indicate that submit try to deploy the app

store spark streaming dstream in hdfs or cassandra

2014-07-31 Thread salemi
Hi, I was wondering what is the best way to store off dstreams in hdfs or casandra. Could somebody provide an example? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/store-spark-streaming-dstream-in-hdfs-or-cassandra-tp11064.html Sent from th