Hi, I haven't checked my answer (too lazy today), but think I know what might be going on.
tl;dr Use cache to preserve the initial set of rows from mysql After you append new rows, you will have twice as many rows as you had previously. Correct? Since newDF references the table every time you use it in a structured query, say to write it to a table, the source table will get re-loaded and hence the number of rows changes. What you should do is to execute newDF.cache.count right after val newDF = mysqlDF.select... so the data (rows) remains on executors and won't get reloaded. Hope that helps. Pozdrawiam, Jacek Laskowski ---- https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Wed, Aug 29, 2018 at 4:59 PM <ryanda...@gmail.com> wrote: > Sorry, last mail format was not good. > > > > *println*(*"Going to talk to mySql"*) > > > *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl, > table, properties) > *println*(*"I am back from mySql"*) > > mysqlDF.show() > > > *// Create a new Dataframe with column 'id' increased to avoid Duplicate > primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*), > *col*(*"country"*), *col*(*"city"*)) > newDF.printSchema() > newDF.show() > > > *// Insert records into the table.*newDF.write > .mode(SaveMode.*Append*) > .jdbc(jdbcUrl, table, properties) > > > *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable( > *"cities"*) > newDF.show() > > > > > > > > Going to talk to mySql > > I am back from mySql > > +---+--------------+---------+ > > | id| country| city| > > +---+--------------+---------+ > > | 1| USA|Palo Alto| > > | 2|Czech Republic| Brno| > > | 3| USA|Sunnyvale| > > | 4| null| null| > > +---+--------------+---------+ > > > > root > > |-- id: long (nullable = false) > > |-- country: string (nullable = true) > > |-- city: string (nullable = true) > > > > +---+--------------+---------+ > > | id| country| city| > > +---+--------------+---------+ > > | 11| USA|Palo Alto| > > | 12|Czech Republic| Brno| > > | 13| USA|Sunnyvale| > > | 14| null| null| > > +---+--------------+---------+ > > > > +---+--------------+---------+ > > | id| country| city| > > +---+--------------+---------+ > > | 11| USA|Palo Alto| > > | 12|Czech Republic| Brno| > > | 13| USA|Sunnyvale| > > | 14| null| null| > > | 24| null| null| > > | 23| USA|Sunnyvale| > > | 22|Czech Republic| Brno| > > | 21| USA|Palo Alto| > > +---+--------------+---------+ > > > > Thanks, > > Ravi > > > > *From:* ryanda...@gmail.com <ryanda...@gmail.com> > *Sent:* Wednesday, August 29, 2018 8:19 PM > *To:* user@spark.apache.org > *Subject:* Spark code to write to MySQL and Hive > > > > Hi, > > > > Can anyone help me to understand what is happening with my code ? > > > > I wrote a Spark application to read from a MySQL table [that already has 4 > records], Create a new DF by adding 10 to the ID field. Then, I wanted to > write the new DF to MySQL as well as to Hive. > > > > I am surprised to see additional set of records in Hive !! I am not able > to understand how the *newDF *has records with IDs 21 to 24. I know that > a DF is immutable. If so, how come it has 4 records at one point and 8 > records at later point ? > > > > > *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl, > table, properties) > *println*(*"I am back from mySql"*) > > > > > > > > > > mysqlDF.show() > > > > > > > > > > > > > *// Create a new Dataframe with column 'id' increased to avoid Duplicate > primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*), > *col*(*"country"*), *col*(*"city"*)) > newDF.printSchema() > newDF.show() > > > > > > > *// Insert records into the MySQL table.*newDF.write > .mode(SaveMode.*Append*) > .jdbc(jdbcUrl, table, properties) > > > > > *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable( > *"cities"*) > newDF.show() > > > > > > *Records already existing in mySql* > > > > +---+--------------+---------+ > > | id| country| city| > > +---+--------------+---------+ > > | 1| USA|Palo Alto| > > | 2|Czech Republic| Brno| > > | 3| USA|Sunnyvale| > > | 4| null| null| > > +---+--------------+---------+ > > > > root > > |-- id: long (nullable = false) > > |-- country: string (nullable = true) > > |-- city: string (nullable = true) > > > > *newDF.show()* > > > > +---+--------------+---------+ > > | id| country| city| > > +---+--------------+---------+ > > | 11| USA|Palo Alto| > > | 12|Czech Republic| Brno| > > | 13| USA|Sunnyvale| > > | 14| null| null| > > +---+--------------+---------+ > > > > +---+--------------+---------+ > > | id| country| city| > > +---+--------------+---------+ > > | 11| USA|Palo Alto| > > | 12|Czech Republic| Brno| > > | 13| USA|Sunnyvale| > > | 14| null| null| > > | 24| null| null| > > | 23| USA|Sunnyvale| > > | 22|Czech Republic| Brno| > > | 21| USA|Palo Alto| > > +---+--------------+---------+ > > > > > > Thanks for you time. > > Ravi >
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org