If you're talking about limiting the number of messages per batch to try and keep from exceeding batch time, see
http://spark.apache.org/docs/latest/configuration.html look for backpressure and maxRatePerParition But if you're only seeing zeros after your job runs for a minute, it sounds like something else is wrong. On Tue, Jul 5, 2016 at 10:02 AM, rss rss <rssde...@gmail.com> wrote: > Hello, > > I'm trying to organize processing of messages from Kafka. And there is a > typical case when a number of messages in kafka's queue is more then Spark > app's possibilities to process. But I need a strong time limit to prepare > result for at least for a part of data. > > Code example: > > SparkConf sparkConf = new SparkConf() > .setAppName("Spark") > .setMaster("local"); > > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, > Milliseconds.apply(5000)); > > jssc.checkpoint("/tmp/spark_checkpoint"); > > Set<String> topicMap = new > HashSet<>(Arrays.asList(topicList.split(","))); > Map<String, String> kafkaParams = new HashMap<String, String>() { > { > put("metadata.broker.list", bootstrapServers); > put("auto.offset.reset", "smallest"); > } > }; > > JavaPairInputDStream<String, String> messages = > KafkaUtils.createDirectStream(jssc, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > topicMap); > > messages.countByWindow(Seconds.apply(10), Milliseconds.apply(5000)) > .map(x -> {System.out.println(x); return x;}) > .dstream().saveAsTextFiles("/tmp/spark", "spark-streaming"); > > > I need to see a result of window operation each 10 seconds (this is only > simplest example). But really with my test data ~10M messages I have first > result a minute after and further I see only zeros. Is a way to limit > processing time to guarantee a response in specified time like Apache > Flink's triggers? > > Thanks. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org