Rockie Yang created ZEPPELIN-1568:
-------------------------------------

             Summary: print %angular does not work in asynchronous 
                 Key: ZEPPELIN-1568
                 URL: https://issues.apache.org/jira/browse/ZEPPELIN-1568
             Project: Zeppelin
          Issue Type: Bug
          Components: zeppelin-interpreter
    Affects Versions: 0.6.2, 0.6.1
         Environment: Docker image based on openjdk:8
            Reporter: Rockie Yang


I am trying to add Spark Structure Streaming support to Zeppelin with 
Highcharts. https://github.com/knockdata/spark-highcharts/issues/11

It does not render properly with %angular.

    case class NuclearStockpile(country: String, stockpile: Int, year: Int)
    
    class CustomSinkProvider extends StreamSinkProvider {
      def createSink(
                      sqlContext: SQLContext,
                      parameters: Map[String, String],
                      partitionColumns: Seq[String],
                      outputMode: OutputMode): Sink = {
        new Sink {
          override def addBatch(batchId: Long, data: DataFrame): Unit = {
            println("%angular <h3>angular asynchronous</h3>")
          }
        }
      }
    }
    
    implicit val sqlCtx = sqlContext
    val input = MemoryStream[NuclearStockpile]
    
    val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640,
    1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126,
    27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662,
    26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605,
    24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586,
    22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950,
    10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104).
    zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2))
    
    val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
    5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322,
    4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478,
    15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049,
    33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000,
    35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000,
    21000, 20000, 19000, 18000, 18000, 17000, 16000).
    zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2))
    
    input.addData(USA.take(30))
    input.addData(USSR.take(30))
    
    val query = input.toDF.writeStream
    .format(classOf[CustomSinkProvider].getCanonicalName)
    .start()

    query.processAllAvailable()
    input.addData(USA.drop(30))
    input.addData(USSR.drop(30))
    query.processAllAvailable()





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to