package com.cg.spark.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext

// This class is used to retrieve the Product Hierarchy Dimension values by joining CAS master data Tables.

/*select
  UPC.PKey as UCPPKey,
  UPC.ID as UPCID,
  UPCDesc.Text1 as UPCDesc,
  MJBP.PKey as MJBPPKey,
  MJBP.ID AS MJBPID,
  MJBPDESC.Text1 as MJBPDesc,
  SPG.PKey as SPGPKey,
  SPG.ID as SPGID,
  SPGDESC.Text1 as SPGDesc,
  FG.PKey as FGPKey,
  FG.ID as FGID,
  FGDESC.Text1 as FGDesc
From
		PrdMain UPC,
		PrdDescription UPCDesc,
		
	  PrdMain MJBP,
	  PrdDescription MJBPDesc, 
	  PrdGroupRel UPCtoMJBP,

    PrdMain SPG,
	  PrdDescription SPGDesc, 
	  PrdGroupRel MJBPtoSPG,
	  
    PrdMain FG,
	  PrdDescription FGDesc, 
	  PrdGroupRel SPGtoFG  	  
 
where
  UPC.pkey = UPCDesc.prdmainpkey and
  MJBP.pkey = MJBPDesc.prdmainpkey and
  UPC.PKey = UPCtoMJBP.PrdMainPKey and
  MJBP.Pkey = UPCtoMJBP.GroupPrdMainPKey AND
  
  SPG.pkey = SPGDesc.prdmainpkey and
  MJBP.PKey = MJBPtoSPG.PrdMainPKey and
  SPG.Pkey = MJBPtoSPG.GroupPrdMainPKey AND
  
  FG.pkey = FGDesc.prdmainpkey and
  SPG.PKey = SPGtoFG.PrdMainPKey and
  FG.Pkey = SPGtoFG.GroupPrdMainPKey AND
  
  
  --upc.pkey = '00100000003msmvi' AND
  UPC.Status<>'d' AND
	UPC.Client='010' AND
	--UPC.OrgLevel='UPC' AND
	UPC.OrgLevel='Product' AND
	UPCDesc.language = 'en' AND
	
 	
  MJBP.Status<>'d' AND
	MJBP.Client='010' AND
	--MJBP.OrgLevel='Major Brand/Pack' AND
	MJBP.OrgLevel='Brand' AND
	MJBPDesc.language = 'en' AND
	
  UPCtoMJBP.Status<>'d' AND
	UPCtoMJBP.Client='010' AND
	UPCtoMJBP.SalesOrg='0003' AND
	UPCtoMJBP.RelationType='Hierarchy' AND
	UPCtoMJBP.StructureType='Sales' AND
	
  SPG.Status<>'d' AND
	SPG.Client='010' AND
	--MJBP.OrgLevel='Form' AND
	SPG.OrgLevel='SubCategory' AND
	SPGDesc.language = 'en' AND
	
  MJBPtoSPG.Status<>'d' AND
	MJBPtoSPG.Client='010' AND
	MJBPtoSPG.SalesOrg='0003' AND
	MJBPtoSPG.RelationType='Hierarchy' AND
	MJBPtoSPG.StructureType='Sales' AND

  FG.Status<>'d' AND
	FG.Client='010' AND
	--FG.OrgLevel='Form Group' AND
	FG.OrgLevel='Category' AND
	FGDesc.language = 'en'*/

object ProductHierarchy_Dimension {
    def main(args: Array[String]) {

    //  ***************** Setting the SparkContext and its configurations.  *****************
        val conf = new SparkConf().setMaster("local").setAppName("ProductHierarchy_Dimension")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
      
    /* ***************** Creating DataFrame by loading CSV/DSV files with Schema & along with the columns and data required by filtering conditions and load it to Cache. 
        As this DataFrame is the base data for other dataframe instances, so avoiding the IO operation by loading it into cache ***************** */
        
        val df_PrdMain = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true") // Use first line of all files as header
        .option("inferSchema", "true") // Automatically infer data types
        .option("delimiter", ",")
        .load("C:\\Users\\bahari\\Desktop\\CASTablesExport\\ML-PrdMain.csv")
        .select("PKey", "Id", "Status", "Client", "OrgLevel")
        .filter("Status <> 'd'").filter("Client = '010'")
        .cache()
        
        print("df_PrdMain count is: "+df_PrdMain.count()+"\n")
        
   // *****************  Multiple instances based on the filter conditions.  *****************
        
        val df_PrdMain_UPC = df_PrdMain.filter("OrgLevel='Product'")
        print("df_PrdMain_UPC count is: "+df_PrdMain_UPC.count()+"\n")  // 0
        val df_PrdMain_MJBP = df_PrdMain.filter("OrgLevel='Brand'")
        print("df_PrdMain_MJBP count is: "+df_PrdMain_MJBP.count()+"\n")  // 1
        val df_PrdMain_SPG = df_PrdMain.filter("OrgLevel='SubCategory'")
        print("df_PrdMain_SPG count is: "+df_PrdMain_SPG.count()+"\n")  // 0
        val df_PrdMain_FG = df_PrdMain.filter("OrgLevel='Category'")
        print("df_PrdMain_FG count is: "+df_PrdMain_FG.count()+"\n")  // 0
   
        
    /* ***************** Creating DataFrame by loading CSV/DSV files with Schema & along with the columns and data required by filtering conditions and load it to Cache. 
        As this DataFrame is the base data for other dataframe instances, so avoiding the IO operation by loading it into cache ***************** */
        
        val df_PrdDescription = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true") // Use first line of all files as header
        .option("inferSchema", "true") // Automatically infer data types
        .option("delimiter", ",")
        .load("C:\\Users\\bahari\\Desktop\\CASTablesExport\\ML-PrdDescription.csv")
        .select("Text1", "PrdMainPKey", "Language")
        .filter("Language = 'en'")
        .cache()
        
         print("df_PrdDescription count is: "+df_PrdDescription.count()+"\n")
        
   // *****************  Multiple instances based on the filter conditions.  *****************
        
        val df_PrdDescription_UPC = df_PrdDescription
        val df_PrdDescription_MJBP = df_PrdDescription
        val df_PrdDescription_SPG = df_PrdDescription
        val df_PrdDescription_FG = df_PrdDescription
        
 
    /* ***************** Creating DataFrame by loading CSV/DSV files with Schema & along with the columns and data required by filtering conditions and load it to Cache. 
        As this DataFrame is the base data for other dataframe instances, so avoiding the IO operation by loading it into cache ***************** */
        
        val df_PrdGroupRel = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true") // Use first line of all files as header
        .option("inferSchema", "true") // Automatically infer data types
        .option("delimiter", ",")
        .load("C:\\Users\\bahari\\Desktop\\CASTablesExport\\ML-PrdGroupRel.csv")
        .select("PrdMainPKey", "GroupPrdMainPKey", "Status", "Client", "SalesOrg", "RelationType", "StructureType")
        .filter("Status <> 'd'").filter("Client= '010'").filter("SalesOrg='0003'").filter("RelationType='Hierarchy'").filter("StructureType='Sales'")
        .cache()
        
        print("df_PrdGroupRel count is: "+df_PrdGroupRel.count()+"\n")

   // *****************  Multiple instances based on the filter conditions.  *****************
        
        val df_PrdGroupRel_UPCtoMJBP = df_PrdGroupRel
        val df_PrdGroupRel_MJBPtoSPG = df_PrdGroupRel
        val df_PrdGroupRel_SPGtoFG = df_PrdGroupRel
    
        
        /*df_PrdMain.printSchema()
        df_PrdMain.take(10).foreach(println)*/
        
        val before = System.currentTimeMillis()/60000
        
        
   // *****************  Register TempTables.  *****************   
        
        df_PrdMain_UPC.registerTempTable("UPC")
        df_PrdMain_MJBP.registerTempTable("MJBP")
        df_PrdMain_SPG.registerTempTable("SPG")
        df_PrdMain_FG.registerTempTable("FG")     
        
        df_PrdDescription_UPC.registerTempTable("UPCDesc")
        df_PrdDescription_MJBP.registerTempTable("MJBPDesc")
        df_PrdDescription_SPG.registerTempTable("SPGDesc")
        df_PrdDescription_FG.registerTempTable("FGDesc")  
        
        df_PrdGroupRel_UPCtoMJBP.registerTempTable("UPCtoMJBP")
        df_PrdGroupRel_MJBPtoSPG.registerTempTable("MJBPtoSPG")
        df_PrdGroupRel_SPGtoFG.registerTempTable("SPGtoFG")
                
        
   // *****************  Join conditions.  *****************
        
/*        val joinedDF = df_PrdMain_UPC.join(df_PrdDescription_UPC, df_PrdMain_UPC("PKey") === df_PrdDescription_UPC("PrdMainPKey"))
                      .join(df_PrdGroupRel_UPCtoMJBP, df_PrdMain_UPC("PKey") === df_PrdGroupRel_UPCtoMJBP("PrdMainPKey"))
                      .join(df_PrdMain_MJBP, df_PrdMain_MJBP("PKey") === df_PrdDescription_MJBP("PrdMainPKey"))
                      .join(df_PrdMain_MJBP, df_PrdMain_MJBP("PKey") === df_PrdGroupRel_UPCtoMJBP("GroupPrdMainPKey"))
                      .join(df_PrdMain_MJBP, df_PrdMain_MJBP("PKey") === df_PrdGroupRel_MJBPtoSPG("PrdMainPKey"))
                      .join(df_PrdMain_SPG, df_PrdMain_SPG("PKey") === df_PrdDescription_SPG("PrdMainPKey"))
                      .join(df_PrdMain_SPG, df_PrdMain_SPG("PKey") === df_PrdGroupRel_MJBPtoSPG("GroupPrdMainPKey"))
                      .join(df_PrdMain_SPG, df_PrdMain_SPG("PKey") === df_PrdGroupRel_SPGtoFG("PrdMainPKey"))
                      .join(df_PrdMain_FG, df_PrdMain_FG("PKey") === df_PrdDescription_FG("PrdMainPKey"))
                      .join(df_PrdMain_FG, df_PrdMain_FG("PKey") === df_PrdGroupRel_SPGtoFG("GroupPrdMainPKey"))*/
        
       /* ----- val joinedDF = df_PrdMain_UPC.join(df_PrdDescription_UPC, df_PrdMain_UPC("PKey") === df_PrdDescription_UPC("PrdMainPKey"))
                      .join(df_PrdGroupRel_UPCtoMJBP, df_PrdMain_UPC("PKey") === df_PrdGroupRel_UPCtoMJBP("PrdMainPKey"))                      
                      .join(df_PrdMain_MJBP, df_PrdMain_MJBP("PKey") === df_PrdGroupRel_UPCtoMJBP("GroupPrdMainPKey"))
                      .join(df_PrdDescription_MJBP, df_PrdMain_MJBP("PKey") === df_PrdDescription_MJBP("PrdMainPKey"))                      
                      .join(df_PrdGroupRel_MJBPtoSPG, df_PrdMain_MJBP("PKey") === df_PrdGroupRel_MJBPtoSPG("PrdMainPKey"))                      
                      .join(df_PrdMain_SPG, df_PrdMain_SPG("PKey") === df_PrdGroupRel_MJBPtoSPG("GroupPrdMainPKey"))                      
                      .join(df_PrdDescription_SPG, df_PrdMain_SPG("PKey") === df_PrdDescription_SPG("PrdMainPKey"))
                      .join(df_PrdGroupRel_SPGtoFG, df_PrdMain_SPG("PKey") === df_PrdGroupRel_SPGtoFG("PrdMainPKey"))
                      .join(df_PrdMain_FG, df_PrdMain_FG("PKey") === df_PrdGroupRel_SPGtoFG("GroupPrdMainPKey"))
                      .join(df_PrdDescription_FG, df_PrdMain_FG("PKey") === df_PrdDescription_FG("PrdMainPKey"))*/

        
/*        val PrdMain_UPCJoinedDF = df_PrdMain_UPC.join(df_PrdDescription_UPC, df_PrdMain_UPC("PKey") === df_PrdDescription_UPC("PrdMainPKey"))
                      .join(df_PrdGroupRel_UPCtoMJBP, df_PrdMain_UPC("PKey") === df_PrdGroupRel_UPCtoMJBP("PrdMainPKey"))
                      
                   
       
        val PrdMain_MJBPJoinedDF = df_PrdMain_MJBP.join(df_PrdDescription_MJBP, df_PrdMain_MJBP("PKey") === df_PrdDescription_MJBP("PrdMainPKey"))
                      .join(df_PrdGroupRel_UPCtoMJBP, df_PrdMain_MJBP("PKey") === df_PrdGroupRel_UPCtoMJBP("GroupPrdMainPKey"))
                      .join(df_PrdGroupRel_MJBPtoSPG, df_PrdMain_MJBP("PKey") === df_PrdGroupRel_MJBPtoSPG("PrdMainPKey"))
                      
                     
                      

        val PrdMain_SPGJoinedDF = df_PrdMain_SPG.join(df_PrdDescription_SPG, df_PrdMain_SPG("PKey") === df_PrdDescription_SPG("PrdMainPKey"))
                      .join(df_PrdGroupRel_MJBPtoSPG, df_PrdMain_SPG("PKey") === df_PrdGroupRel_MJBPtoSPG("GroupPrdMainPKey"))
                      .join(df_PrdGroupRel_SPGtoFG, df_PrdMain_SPG("PKey") === df_PrdGroupRel_SPGtoFG("PrdMainPKey"))
                      
                  
                      
        val PrdMain_FGJoinedDF = df_PrdMain_FG.join(df_PrdDescription_FG, df_PrdMain_FG("PKey") === df_PrdDescription_FG("PrdMainPKey"))
                      .join(df_PrdGroupRel_SPGtoFG, df_PrdMain_FG("PKey") === df_PrdGroupRel_SPGtoFG("GroupPrdMainPKey"))
                      
        val UPCJoinedDFCount =  PrdMain_UPCJoinedDF.count()   
        val MJBPJoinedDFCount =  PrdMain_MJBPJoinedDF.count() 
        val SPGJoinedDFCount =  PrdMain_SPGJoinedDF.count() 
        val FGJoinedDFCount =  PrdMain_FGJoinedDF.count()   
        
        println("UPCJoinedDFCount: "+UPCJoinedDFCount)
        println("MJBPJoinedDFCount: "+MJBPJoinedDFCount)
        println("SPGJoinedDFCount: "+SPGJoinedDFCount)
        println("FGJoinedDFCount: "+FGJoinedDFCount)*/
                      
/*        print("PrdMain_UPCJoinedDF count is: "+PrdMain_UPCJoinedDF.count()+"\n") 
        print("PrdMain_MJBPJoinedDF count is: "+PrdMain_MJBPJoinedDF.count()+"\n")         
        print("PrdMain_SPGJoinedDF count is: "+PrdMain_SPGJoinedDF.count()+"\n")                          
        print("PrdMain_FGJoinedDF count is: "+PrdMain_FGJoinedDF.count()+"\n")*/
        
         /* val joinedDF = df_PrdMain_UPC.join(df_PrdDescription_UPC)
                      .join(df_PrdGroupRel_UPCtoMJBP)
                      .join(df_PrdMain_MJBP)
                      .join(df_PrdDescription_MJBP)
                      .join(df_PrdGroupRel_MJBPtoSPG)
                      .join(df_PrdMain_SPG)
                      .join(df_PrdDescription_SPG)
                      .join(df_PrdGroupRel_SPGtoFG)
                      .join(df_PrdMain_FG)
                      .join(df_PrdDescription_FG)*/
        
        val joinedDF = sqlContext.sql("select UPC.PKey as UCPPKey, UPC.Id as UPCID, UPCDesc.Text1 as UPCDesc,  MJBP.PKey as MJBPPKey,  MJBP.Id AS MJBPID,  MJBPDesc.Text1 as MJBPDesc,  SPG.PKey as SPGPKey,  SPG.Id as SPGID,  SPGDesc.Text1 as SPGDesc,  FG.PKey as FGPKey,  FG.Id as FGID,  FGDesc.Text1 as FGDesc From						UPC,						UPCDesc,	   MJBP,	   MJBPDesc,						 UPCtoMJBP,     SPG,	   SPGDesc,	   MJBPtoSPG,	       FG,	   FGDesc,	   SPGtoFG where							UPC.PKey = UPCDesc.PrdMainPKey and							MJBP.PKey = MJBPDesc.PrdMainPKey and							UPC.PKey = UPCtoMJBP.PrdMainPKey and							MJBP.PKey = UPCtoMJBP.GroupPrdMainPKey AND  							SPG.PKey = SPGDesc.PrdMainPKey and							MJBP.PKey = MJBPtoSPG.PrdMainPKey and							SPG.PKey = MJBPtoSPG.GroupPrdMainPKey AND							FG.PKey = FGDesc.PrdMainPKey and							SPG.PKey = SPGtoFG.PrdMainPKey and							FG.PKey = SPGtoFG.GroupPrdMainPKey AND							  UPC.Status<>'d' AND								UPC.Client='010' AND	UPC.OrgLevel='Product' AND								UPCDesc.Language = 'en' AND							  MJBP.Status<>'d' AND								MJBP.Client='010' AND	MJBP.OrgLevel='Brand' AND								MJBPDesc.Language = 'en' AND							  UPCtoMJBP.Status<>'d' AND								UPCtoMJBP.Client='010' AND								UPCtoMJBP.SalesOrg='0003' AND								UPCtoMJBP.RelationType='Hierarchy' AND								UPCtoMJBP.StructureType='Sales' AND							  SPG.Status<>'d' AND									SPG.Client='010' AND	SPG.OrgLevel='SubCategory' AND								SPGDesc.Language = 'en' AND								  MJBPtoSPG.Status<>'d' AND									MJBPtoSPG.Client='010' AND									MJBPtoSPG.SalesOrg='0003' AND									MJBPtoSPG.RelationType='Hierarchy' AND									MJBPtoSPG.StructureType='Sales' AND								  FG.Status<>'d' AND									FG.Client='010' AND	FG.OrgLevel='Category' AND									FGDesc.Language = 'en'")//.foreach(println)         

/*                val count = sqlContext.sql("select count(*) From						UPC,						UPCDesc,	   MJBP,	   MJBPDesc,						 UPCtoMJBP,     SPG,	   SPGDesc,	   MJBPtoSPG,	       FG,	   FGDesc,	   SPGtoFG where							UPC.PKey = UPCDesc.PrdMainPKey and							MJBP.PKey = MJBPDesc.PrdMainPKey and							UPC.PKey = UPCtoMJBP.PrdMainPKey and							MJBP.PKey = UPCtoMJBP.GroupPrdMainPKey AND  							SPG.PKey = SPGDesc.PrdMainPKey and							MJBP.PKey = MJBPtoSPG.PrdMainPKey and							SPG.PKey = MJBPtoSPG.GroupPrdMainPKey AND							FG.PKey = FGDesc.PrdMainPKey and							SPG.PKey = SPGtoFG.PrdMainPKey and							FG.PKey = SPGtoFG.GroupPrdMainPKey AND							  UPC.Status<>'d' AND								UPC.Client='010' AND	UPC.OrgLevel='Product' AND								UPCDesc.Language = 'en' AND							  MJBP.Status<>'d' AND								MJBP.Client='010' AND	MJBP.OrgLevel='Brand' AND								MJBPDesc.Language = 'en' AND							  UPCtoMJBP.Status<>'d' AND								UPCtoMJBP.Client='010' AND								UPCtoMJBP.SalesOrg='0003' AND								UPCtoMJBP.RelationType='Hierarchy' AND								UPCtoMJBP.StructureType='Sales' AND							  SPG.Status<>'d' AND									SPG.Client='010' AND	SPG.OrgLevel='SubCategory' AND								SPGDesc.Language = 'en' AND								  MJBPtoSPG.Status<>'d' AND									MJBPtoSPG.Client='010' AND									MJBPtoSPG.SalesOrg='0003' AND									MJBPtoSPG.RelationType='Hierarchy' AND									MJBPtoSPG.StructureType='Sales' AND								  FG.Status<>'d' AND									FG.Client='010' AND	FG.OrgLevel='Category' AND									FGDesc.Language = 'en'")
        count.map(t => "Record Count: " + t(0)).collect().foreach(println) // 14 records
    */   
                      
                    //print("JoinedDF count is: "+joinedDF.count()+"\n")
                      
   // *****************  Select Columns.  *****************
                      
/*        joinedDF.select(df_PrdMain_UPC("PKey").alias("UCPPKey"),
                        df_PrdMain_UPC("Id").alias("UPCID"),
                        df_PrdDescription_UPC("Text1").alias("UPCDesc"),
                        df_PrdMain_MJBP("PKey").alias("MJBPPKey"),
                        df_PrdMain_MJBP("Id").alias("MJBPID"),
                        df_PrdDescription_MJBP("Text1").alias("MJBPDesc"),
                        df_PrdMain_SPG("PKey").alias("SPGPKey"),
                        df_PrdMain_SPG("Id").alias("SPGID"),
                        df_PrdDescription_SPG("Text1").alias("SPGDesc"),
                        df_PrdMain_FG("PKey").alias("FGPKey"),
                        df_PrdMain_FG("Id").alias("FGID"),
                        df_PrdDescription_FG("Text1").alias("FGDesc")           
                       ).foreach(println)*/
                       
    // ************** Total Record COUNT in the Resultset DataFrame. ***************                       
        print("JoinedDF count is: "+joinedDF.count()+"\n")
        
    // ************** Schema of the Resultset DataFrame. ***************              
        joinedDF.printSchema()
        
    // ************** Total Records in the Resultset DataFrame. ***************              
        joinedDF.collect().foreach(println)
        
        print("Total Execution Time for Product Hierarchy is: ")
        print((System.currentTimeMillis()/60000)-before+" min(s)")
        
     }
}