This may be a good addition. I suggest you read our guidelines on
contributing code to Spark.

https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-PreparingtoContributeCodeChanges

Its long document but it should have everything for you to figure out how
to contribute your changes. I hope to see your changes in a Github PR soon!

TD

On Mon, Nov 7, 2016 at 5:30 PM, Chan Chor Pang <chin...@indetail.co.jp>
wrote:

> hi everyone
>
> it seems that there is not much who interested in creating a api for
> Streaming.
> never the less I still really want the api for monitoring.
> so i tried to see if i can implement by my own.
>
> after some test,
> i believe i can achieve the goal by
> 1. implement a package(org.apache.spark.streaming.status.api.v1) that
> serve the same purpose as org.apache.spark.status.api.v1
> 2. register the api path through StreamingTab
> and 3. retrive the streaming informateion through
> StreamingJobProgressListener
>
> what my most concern now is will my implementation be able to merge to the
> main stream.
>
> im new to open source project, so anyone could please show me some light?
> how should/could i proceed to make my implementation to be able to merge
> to the main stream.
>
>
> here is my test code base on v1.6.0
> ###################################
> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/JacksonMessageWriter.scala b/streaming/src/main/scala/org
> /apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
> new file mode 100644
> index 0000000..690e2d8
> --- /dev/null
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/JacksonMessageWriter.scala
> @@ -0,0 +1,68 @@
> +package org.apache.spark.streaming.status.api.v1
> +
> +import java.io.OutputStream
> +import java.lang.annotation.Annotation
> +import java.lang.reflect.Type
> +import java.text.SimpleDateFormat
> +import java.util.{Calendar, SimpleTimeZone}
> +import javax.ws.rs.Produces
> +import javax.ws.rs.core.{MediaType, MultivaluedMap}
> +import javax.ws.rs.ext.{MessageBodyWriter, Provider}
> +
> +import com.fasterxml.jackson.annotation.JsonInclude
> +import com.fasterxml.jackson.databind.{ObjectMapper,
> SerializationFeature}
> +
> +@Provider
> +@Produces(Array(MediaType.APPLICATION_JSON))
> +private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
> +
> +  val mapper = new ObjectMapper() {
> +    override def writeValueAsString(t: Any): String = {
> +      super.writeValueAsString(t)
> +    }
> +  }
> + mapper.registerModule(com.fasterxml.jackson.module.scala.
> DefaultScalaModule)
> +  mapper.enable(SerializationFeature.INDENT_OUTPUT)
> +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
> +  mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
> +
> +  override def isWriteable(
> +      aClass: Class[_],
> +      `type`: Type,
> +      annotations: Array[Annotation],
> +      mediaType: MediaType): Boolean = {
> +      true
> +  }
> +
> +  override def writeTo(
> +      t: Object,
> +      aClass: Class[_],
> +      `type`: Type,
> +      annotations: Array[Annotation],
> +      mediaType: MediaType,
> +      multivaluedMap: MultivaluedMap[String, AnyRef],
> +      outputStream: OutputStream): Unit = {
> +    t match {
> +      //case ErrorWrapper(err) => outputStream.write(err.getByte
> s("utf-8"))
> +      case _ => mapper.writeValue(outputStream, t)
> +    }
> +  }
> +
> +  override def getSize(
> +      t: Object,
> +      aClass: Class[_],
> +      `type`: Type,
> +      annotations: Array[Annotation],
> +      mediaType: MediaType): Long = {
> +    -1L
> +  }
> +}
> +
> +private[spark] object JacksonMessageWriter {
> +  def makeISODateFormat: SimpleDateFormat = {
> +    val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
> +    val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
> +    iso8601.setCalendar(cal)
> +    iso8601
> +  }
> +}
> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/StreamingApiRootResource.scala b/streaming/src/main/scala/org
> /apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
> new file mode 100644
> index 0000000..f4e43dd
> --- /dev/null
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/StreamingApiRootResource.scala
> @@ -0,0 +1,74 @@
> +package org.apache.spark.streaming.status.api.v1
> +
> +import org.apache.spark.status.api.v1.UIRoot
> +import org.eclipse.jetty.server.handler.ContextHandler
> +import org.eclipse.jetty.servlet.ServletContextHandler
> +import org.eclipse.jetty.servlet.ServletHolder
> +
> +import com.sun.jersey.spi.container.servlet.ServletContainer
> +
> +import javax.servlet.ServletContext
> +import javax.ws.rs.Path
> +import javax.ws.rs.Produces
> +import javax.ws.rs.core.Context
> +import org.apache.spark.streaming.ui.StreamingJobProgressListener
> +
> +
> +@Path("/v1")
> +private[v1] class StreamingApiRootResource extends
> UIRootFromServletContext{
> +
> +  @Path("streaminginfo")
> +  def getStreamingInfo(): StreamingInfoResource = {
> +    new StreamingInfoResource(uiRoot,listener)
> +  }
> +
> +}
> +
> +private[spark] object StreamingApiRootResource {
> +
> +  def getServletHandler(uiRoot: UIRoot, 
> listener:StreamingJobProgressListener):
> ServletContextHandler = {
> +
> +    val jerseyContext = new ServletContextHandler(ServletC
> ontextHandler.NO_SESSIONS)
> +    jerseyContext.setContextPath("/streamingapi")
> +    val holder: ServletHolder = new ServletHolder(classOf[ServletC
> ontainer])
> + holder.setInitParameter("com.sun.jersey.config.property.reso
> urceConfigClass",
> +      "com.sun.jersey.api.core.PackagesResourceConfig")
> + holder.setInitParameter("com.sun.jersey.config.property.packages",
> +      "org.apache.spark.streaming.status.api.v1")
> + //holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_
> REQUEST_FILTERS,
> +    //  classOf[SecurityFilter].getCanonicalName)
> +    UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
> +    UIRootFromServletContext.setListener(jerseyContext, listener)
> +    jerseyContext.addServlet(holder, "/*")
> +    jerseyContext
> +  }
> +}
> +
> +private[v1] object UIRootFromServletContext {
> +
> +  private val attribute = getClass.getCanonicalName
> +
> +  def setListener(contextHandler:ContextHandler, listener:
> StreamingJobProgressListener):Unit={
> +   contextHandler.setAttribute(attribute+"_listener", listener)
> +  }
> +
> +  def getListener(context:ServletContext):StreamingJobProgressListener={
> + context.getAttribute(attribute+"_listener").asInstanceOf[Str
> eamingJobProgressListener]
> +  }
> +
> +  def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot): Unit = {
> +    contextHandler.setAttribute(attribute, uiRoot)
> +  }
> +
> +  def getUiRoot(context: ServletContext): UIRoot = {
> +    context.getAttribute(attribute).asInstanceOf[UIRoot]
> +  }
> +}
> +
> +private[v1] trait UIRootFromServletContext {
> +  @Context
> +  var servletContext: ServletContext = _
> +
> +  def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext)
> +  def listener: StreamingJobProgressListener =
> UIRootFromServletContext.getListener(servletContext)
> +}
> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/StreamingInfoResource.scala b/streaming/src/main/scala/org
> /apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
> new file mode 100644
> index 0000000..d5fc11b
> --- /dev/null
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/StreamingInfoResource.scala
> @@ -0,0 +1,22 @@
> +package org.apache.spark.streaming.status.api.v1
> +
> +import org.apache.spark.status.api.v1.SimpleDateParam
> +import org.apache.spark.status.api.v1.UIRoot
> +
> +import javax.ws.rs.GET
> +import javax.ws.rs.Produces
> +import javax.ws.rs.core.MediaType
> +import org.apache.spark.streaming.StreamingContext
> +import org.apache.spark.streaming.ui.StreamingJobProgressListener
> +
> +@Produces(Array(MediaType.APPLICATION_JSON))
> +private[v1] class StreamingInfoResource(uiRoot: UIRoot, listener:
> StreamingJobProgressListener){
> +
> +  @GET
> +  def streamingInfo()
> +  :Iterator[StreamingInfo]={
> +    var v = listener.numTotalCompletedBatches
> +    Iterator(new StreamingInfo("testname",v))
> +
> +  }
> +}
> \ No newline at end of file
> diff --git 
> a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
> b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/api.scala
> new file mode 100644
> index 0000000..958dd41
> --- /dev/null
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/status
> /api/v1/api.scala
> @@ -0,0 +1,6 @@
> +package org.apache.spark.streaming.status.api.v1
> +
> +class StreamingInfo private[streaming](
> +    val name:String,
> +    val completedBatchCount:Long)
> +
> \ No newline at end of file
> diff --git 
> a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
> b/streaming/src/main/scala/org/apache/spark/streaming/ui/Str
> eamingTab.scala
> index bc53f2a..877abf4 100644
> --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/Str
> eamingTab.scala
> +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/Str
> eamingTab.scala
> @@ -22,6 +22,7 @@ import org.apache.spark.streaming.StreamingContext
>  import org.apache.spark.ui.{SparkUI, SparkUITab}
>
>  import StreamingTab._
> +import org.apache.spark.streaming.status.api.v1.StreamingApiRootResource
>
>  /**
>   * Spark Web UI tab that shows statistics of a streaming job.
> @@ -39,6 +40,9 @@ private[spark] class StreamingTab(val ssc:
> StreamingContext)
>    ssc.sc.addSparkListener(listener)
>    attachPage(new StreamingPage(this))
>    attachPage(new BatchPage(this))
> +
> +  //register streaming api
> + parent.attachHandler(StreamingApiRootResource.getServletHand
> ler(parent,listener));
>
>    def attach() {
>      getSparkUI(ssc).attachTab(this)
>
>
> On 9/14/16 10:13 AM, Chan Chor Pang wrote:
>
>> Hi everyone,
>>
>> Trying to monitoring our streaming application using Spark REST interface
>> only to found that there is no such thing for Streaming.
>>
>> I wonder if anyone already working on this or I should just start
>> implementing my own one?
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to