> Yes and I sent you results. It is appropriate only with known parameters of > input data stream.
No, as far as I can tell from your posts in this thread and your linked project, you only tested with auto.offset.reset smallest and a large backlog. That's not what I advised you to do. Don't draw inaccurate conclusions about Spark DStreams from that test. The reason you need to specify maxRatePerPartition is because you're starting with a large backlog and thus a large first batch. If you were testing an ongoing stream with auto.offset.reset largest, backpressure alone should be sufficient. On Wed, Jul 6, 2016 at 12:23 PM, rss rss <rssde...@gmail.com> wrote: >> If you aren't processing messages as fast as you receive them, you're >> going to run out of kafka retention regardless of whether you're using >> Spark or Flink. Again, physics. It's just a question of what >> compromises you choose. > > > Yes. I wrote about it. But in case of Flink you will have output strictly > after specified time. If it is impossible to process 1000 messages per 1 > second but possible process 500, then Flink makes an output for 500. If only > 1 message processed, Flink produced an output for one only but after 1 > second. At the same time Spark processes all 1000 but much longer that 1 > second in this case. > >> that's what backpressure >> and maxRatePerPartition are for. As long as those are set reasonably, >> you'll have a reasonably fixed output interval. Have you actually >> tested this in the way I suggested? > > > Yes and I sent you results. It is appropriate only with known parameters of > input data stream. I'm not able to estimate bounds of Sparks usage in > general. And I'm not about it. I'm about Spark has these limitations. And > most problem is when a calculation time depends on input data. That is if > you want to have a stable period of output data generation you have to use > computational system with free resources in hot reserve. > > In any case thanks, now I understand how to use Spark. > > PS: I will continue work with Spark but to minimize emails stream I plan to > unsubscribe from this mail list > > 2016-07-06 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> If you aren't processing messages as fast as you receive them, you're >> going to run out of kafka retention regardless of whether you're using >> Spark or Flink. Again, physics. It's just a question of what >> compromises you choose. >> >> If by "growing of a processing window time of Spark" you mean a >> processing time that exceeds batch time... that's what backpressure >> and maxRatePerPartition are for. As long as those are set reasonably, >> you'll have a reasonably fixed output interval. Have you actually >> tested this in the way I suggested? >> >> On Wed, Jul 6, 2016 at 11:38 AM, rss rss <rssde...@gmail.com> wrote: >> > Ok, thanks. But really this is not full decision. In case of growing of >> > processing time I will have growing of window time. That is really with >> > Spark I have 2 points of a latency growing. First is a delay of >> > processing >> > of messages in Kafka queue due to physical limitation of a computer >> > system. >> > And second one is growing of a processing window time of Spark. In case >> > of >> > Flink there is only first point of delay but time intervals of output >> > data >> > are fixed. It is really looks like limitation of Spark. That is if >> > dataflow >> > is stable, all is ok. If there are peaks of loading more than >> > possibility of >> > computational system or data dependent time of calculation, Spark is not >> > able to provide a periodically stable results output. Sometimes this is >> > appropriate but sometime this is not appropriate. >> > >> > 2016-07-06 18:11 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> >> >> Then double the upper limit you have set until the processing time >> >> approaches the batch time. >> >> >> >> On Wed, Jul 6, 2016 at 11:06 AM, rss rss <rssde...@gmail.com> wrote: >> >> > Ok, with: >> >> > >> >> > .set("spark.streaming.backpressure.enabled","true") >> >> > .set("spark.streaming.receiver.maxRate", "10000") >> >> > .set("spark.streaming.kafka.maxRatePerPartition", "10000") >> >> > >> >> > I have something like >> >> > >> >> > >> >> > >> >> > *************************************************************************** >> >> > Processing time: 5626 >> >> > Expected time: 10000 >> >> > Processed messages: 100000 >> >> > Message example: {"message": 950002, >> >> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"} >> >> > Recovered json: >> >> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"} >> >> > >> >> > That is yes, it works but throughput is much less than without >> >> > limitations >> >> > because of this is an absolute upper limit. And time of processing is >> >> > half >> >> > of available. >> >> > >> >> > Regarding Spark 2.0 structured streaming I will look it some later. >> >> > Now >> >> > I >> >> > don't know how to strictly measure throughput and latency of this >> >> > high >> >> > level >> >> > API. My aim now is to compare streaming processors. >> >> > >> >> > >> >> > 2016-07-06 17:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> >> >> >> >> The configuration you set is spark.streaming.receiver.maxRate. The >> >> >> direct stream is not a receiver. As I said in my first message in >> >> >> this thread, and as the pages at >> >> >> >> >> >> >> >> >> >> >> >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers >> >> >> and >> >> >> >> >> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming >> >> >> also say, use maxRatePerPartition for the direct stream. >> >> >> >> >> >> Bottom line, if you have more information than your system can >> >> >> process >> >> >> in X amount of time, after X amount of time you can either give the >> >> >> wrong answer, or take longer to process. Flink can't violate the >> >> >> laws >> >> >> of physics. If the tradeoffs that Flink make are better for your >> >> >> use >> >> >> case than the tradeoffs that DStreams make, you may be better off >> >> >> using Flink (or testing out spark 2.0 structured streaming, although >> >> >> there's no kafka integration available for that yet) >> >> >> >> >> >> On Wed, Jul 6, 2016 at 10:25 AM, rss rss <rssde...@gmail.com> wrote: >> >> >> > ok, thanks. I tried to set minimum max rate for beginning. >> >> >> > However >> >> >> > in >> >> >> > general I don't know initial throughput. BTW it would be very >> >> >> > useful >> >> >> > to >> >> >> > explain it in >> >> >> > >> >> >> > >> >> >> > >> >> >> > https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning >> >> >> > >> >> >> > And really with >> >> >> > >> >> >> > .set("spark.streaming.backpressure.enabled","true") >> >> >> > .set("spark.streaming.receiver.maxRate", "10000") >> >> >> > >> >> >> > I have same problem: >> >> >> > >> >> >> > >> >> >> > >> >> >> > *************************************************************************** >> >> >> > Processing time: 36994 >> >> >> > Expected time: 10000 >> >> >> > Processed messages: 3015830 >> >> >> > Message example: {"message": 1, >> >> >> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"} >> >> >> > Recovered json: >> >> >> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"} >> >> >> > >> >> >> > >> >> >> > Regarding auto.offset.reset smallest, now it is because of a test >> >> >> > and >> >> >> > I >> >> >> > want >> >> >> > to get same messages for each run. But in any case I expect to >> >> >> > read >> >> >> > all >> >> >> > new >> >> >> > messages from queue. >> >> >> > >> >> >> > Regarding backpressure detection. What is to do then a process >> >> >> > time >> >> >> > is >> >> >> > much >> >> >> > more then input rate? Now I see growing time of processing instead >> >> >> > of >> >> >> > stable >> >> >> > 10 second and decreasing number of processed messages. Where is a >> >> >> > limit >> >> >> > of >> >> >> > of backpressure algorithm? >> >> >> > >> >> >> > Regarding Flink. I don't know how works core of Flink but you can >> >> >> > check >> >> >> > self >> >> >> > that Flink will strictly terminate processing of messages by time. >> >> >> > Deviation >> >> >> > of the time window from 10 seconds to several minutes is >> >> >> > impossible. >> >> >> > >> >> >> > PS: I prepared this example to make possible easy observe the >> >> >> > problem >> >> >> > and >> >> >> > fix it if it is a bug. For me it is obvious. May I ask you to be >> >> >> > near >> >> >> > to >> >> >> > this simple source code? In other case I have to think that this >> >> >> > is a >> >> >> > technical limitation of Spark to work with unstable data flows. >> >> >> > >> >> >> > Cheers >> >> >> > >> >> >> > 2016-07-06 16:40 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> >> >> >> >> >> >> The direct stream determines batch sizes on the driver, in >> >> >> >> advance >> >> >> >> of >> >> >> >> processing. If you haven't specified a maximum batch size, how >> >> >> >> would >> >> >> >> you suggest the backpressure code determine how to limit the >> >> >> >> first >> >> >> >> batch? It has no data on throughput until at least one batch is >> >> >> >> completed. >> >> >> >> >> >> >> >> Again, this is why I'm saying test by producing messages into >> >> >> >> kafka >> >> >> >> at >> >> >> >> a rate comparable to production, not loading a ton of messages in >> >> >> >> and >> >> >> >> starting from auto.offset.reset smallest. >> >> >> >> >> >> >> >> Even if you're unwilling to take that advice for some reason, and >> >> >> >> unwilling to empirically determine a reasonable maximum partition >> >> >> >> size, you should be able to estimate an upper bound such that the >> >> >> >> first batch does not encompass your entire kafka retention. >> >> >> >> Backpressure will kick in once it has some information to work >> >> >> >> with. >> >> >> >> >> >> >> >> On Wed, Jul 6, 2016 at 8:45 AM, rss rss <rssde...@gmail.com> >> >> >> >> wrote: >> >> >> >> > Hello, >> >> >> >> > >> >> >> >> > thanks, I tried to >> >> >> >> > .set("spark.streaming.backpressure.enabled","true") >> >> >> >> > but >> >> >> >> > result is negative. Therefore I have prepared small test >> >> >> >> > https://github.com/rssdev10/spark-kafka-streaming >> >> >> >> > >> >> >> >> > How to run: >> >> >> >> > git clone >> >> >> >> > https://github.com/rssdev10/spark-kafka-streaming.git >> >> >> >> > cd spark-kafka-streaming >> >> >> >> > >> >> >> >> > # downloads kafka and zookeeper >> >> >> >> > ./gradlew setup >> >> >> >> > >> >> >> >> > # run zookeeper, kafka, and run messages generation >> >> >> >> > ./gradlew test_data_prepare >> >> >> >> > >> >> >> >> > And in other console just run: >> >> >> >> > ./gradlew test_spark >> >> >> >> > >> >> >> >> > It is easy to break data generation by CTRL-C. And continue by >> >> >> >> > same >> >> >> >> > command >> >> >> >> > ./gradlew test_data_prepare >> >> >> >> > >> >> >> >> > To stop all run: >> >> >> >> > ./gradlew stop_all >> >> >> >> > >> >> >> >> > Spark test must generate messages each 10 seconds like: >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > *************************************************************************** >> >> >> >> > Processing time: 33477 >> >> >> >> > Expected time: 10000 >> >> >> >> > Processed messages: 2897866 >> >> >> >> > Message example: {"message": 1, >> >> >> >> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"} >> >> >> >> > Recovered json: >> >> >> >> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"} >> >> >> >> > >> >> >> >> > >> >> >> >> > message is number of fist message in the window. Time values >> >> >> >> > are >> >> >> >> > in >> >> >> >> > milliseconds. >> >> >> >> > >> >> >> >> > Brief results: >> >> >> >> > >> >> >> >> > Spark always reads all messaged from Kafka after first >> >> >> >> > connection >> >> >> >> > independently on dstream or window size time. It looks like a >> >> >> >> > bug. >> >> >> >> > When processing speed in Spark's app is near to speed of data >> >> >> >> > generation >> >> >> >> > all >> >> >> >> > is ok. >> >> >> >> > I added delayFactor in >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java >> >> >> >> > to emulate slow processing. And streaming process is in >> >> >> >> > degradation. >> >> >> >> > When >> >> >> >> > delayFactor=0 it looks like stable process. >> >> >> >> > >> >> >> >> > >> >> >> >> > Cheers >> >> >> >> > >> >> >> >> > >> >> >> >> > 2016-07-05 17:51 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> >> >> >> >> >> >> >> >> Test by producing messages into kafka at a rate comparable to >> >> >> >> >> what >> >> >> >> >> you >> >> >> >> >> expect in production. >> >> >> >> >> >> >> >> >> >> Test with backpressure turned on, it doesn't require you to >> >> >> >> >> specify >> >> >> >> >> a >> >> >> >> >> fixed limit on number of messages and will do its best to >> >> >> >> >> maintain >> >> >> >> >> batch timing. Or you could empirically determine a reasonable >> >> >> >> >> fixed >> >> >> >> >> limit. >> >> >> >> >> >> >> >> >> >> Setting up a kafka topic with way more static messages in it >> >> >> >> >> than >> >> >> >> >> your >> >> >> >> >> system can handle in one batch, and then starting a stream >> >> >> >> >> from >> >> >> >> >> the >> >> >> >> >> beginning of it without turning on backpressure or limiting >> >> >> >> >> the >> >> >> >> >> number >> >> >> >> >> of messages... isn't a reasonable way to test steady state >> >> >> >> >> performance. Flink can't magically give you a correct answer >> >> >> >> >> under >> >> >> >> >> those circumstances either. >> >> >> >> >> >> >> >> >> >> On Tue, Jul 5, 2016 at 10:41 AM, rss rss <rssde...@gmail.com> >> >> >> >> >> wrote: >> >> >> >> >> > Hi, thanks. >> >> >> >> >> > >> >> >> >> >> > I know about possibility to limit number of messages. But >> >> >> >> >> > the >> >> >> >> >> > problem >> >> >> >> >> > is >> >> >> >> >> > I don't know number of messages which the system able to >> >> >> >> >> > process. >> >> >> >> >> > It >> >> >> >> >> > depends >> >> >> >> >> > on data. The example is very simple. I need a strict >> >> >> >> >> > response >> >> >> >> >> > after >> >> >> >> >> > specified time. Something like soft real time. In case of >> >> >> >> >> > Flink >> >> >> >> >> > I >> >> >> >> >> > able >> >> >> >> >> > to >> >> >> >> >> > setup strict time of processing like this: >> >> >> >> >> > >> >> >> >> >> > KeyedStream<Event, Integer> keyed = >> >> >> >> >> > eventStream.keyBy(event.userId.getBytes()[0] % partNum); >> >> >> >> >> > WindowedStream<Event, Integer, TimeWindow> uniqUsersWin = >> >> >> >> >> > keyed.timeWindow( >> >> >> >> >> > Time.seconds(10) ); >> >> >> >> >> > DataStream<Aggregator> uniqUsers = >> >> >> >> >> > uniq.trigger(ProcessingTimeTrigger.create()) >> >> >> >> >> > .fold(new Aggregator(), new FoldFunction<Event, >> >> >> >> >> > Aggregator>() >> >> >> >> >> > { >> >> >> >> >> > @Override >> >> >> >> >> > public Aggregator fold(Aggregator accumulator, >> >> >> >> >> > Event >> >> >> >> >> > value) >> >> >> >> >> > throws Exception { >> >> >> >> >> > accumulator.add(event.userId); >> >> >> >> >> > return accumulator; >> >> >> >> >> > } >> >> >> >> >> > }); >> >> >> >> >> > >> >> >> >> >> > uniq.print(); >> >> >> >> >> > >> >> >> >> >> > And I can see results every 10 seconds independently on >> >> >> >> >> > input >> >> >> >> >> > data >> >> >> >> >> > stream. >> >> >> >> >> > Is it possible something in Spark? >> >> >> >> >> > >> >> >> >> >> > Regarding zeros in my example the reason I have prepared >> >> >> >> >> > message >> >> >> >> >> > queue >> >> >> >> >> > in >> >> >> >> >> > Kafka for the tests. If I add some messages after I able to >> >> >> >> >> > see >> >> >> >> >> > new >> >> >> >> >> > messages. But in any case I need first response after 10 >> >> >> >> >> > second. >> >> >> >> >> > Not >> >> >> >> >> > minutes >> >> >> >> >> > or hours after. >> >> >> >> >> > >> >> >> >> >> > Thanks. >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > 2016-07-05 17:12 GMT+02:00 Cody Koeninger >> >> >> >> >> > <c...@koeninger.org>: >> >> >> >> >> >> >> >> >> >> >> >> If you're talking about limiting the number of messages per >> >> >> >> >> >> batch >> >> >> >> >> >> to >> >> >> >> >> >> try and keep from exceeding batch time, see >> >> >> >> >> >> >> >> >> >> >> >> http://spark.apache.org/docs/latest/configuration.html >> >> >> >> >> >> >> >> >> >> >> >> look for backpressure and maxRatePerParition >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> But if you're only seeing zeros after your job runs for a >> >> >> >> >> >> minute, >> >> >> >> >> >> it >> >> >> >> >> >> sounds like something else is wrong. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Tue, Jul 5, 2016 at 10:02 AM, rss rss >> >> >> >> >> >> <rssde...@gmail.com> >> >> >> >> >> >> wrote: >> >> >> >> >> >> > Hello, >> >> >> >> >> >> > >> >> >> >> >> >> > I'm trying to organize processing of messages from >> >> >> >> >> >> > Kafka. >> >> >> >> >> >> > And >> >> >> >> >> >> > there >> >> >> >> >> >> > is >> >> >> >> >> >> > a >> >> >> >> >> >> > typical case when a number of messages in kafka's queue >> >> >> >> >> >> > is >> >> >> >> >> >> > more >> >> >> >> >> >> > then >> >> >> >> >> >> > Spark >> >> >> >> >> >> > app's possibilities to process. But I need a strong time >> >> >> >> >> >> > limit >> >> >> >> >> >> > to >> >> >> >> >> >> > prepare >> >> >> >> >> >> > result for at least for a part of data. >> >> >> >> >> >> > >> >> >> >> >> >> > Code example: >> >> >> >> >> >> > >> >> >> >> >> >> > SparkConf sparkConf = new SparkConf() >> >> >> >> >> >> > .setAppName("Spark") >> >> >> >> >> >> > .setMaster("local"); >> >> >> >> >> >> > >> >> >> >> >> >> > JavaStreamingContext jssc = new >> >> >> >> >> >> > JavaStreamingContext(sparkConf, >> >> >> >> >> >> > Milliseconds.apply(5000)); >> >> >> >> >> >> > >> >> >> >> >> >> > jssc.checkpoint("/tmp/spark_checkpoint"); >> >> >> >> >> >> > >> >> >> >> >> >> > Set<String> topicMap = new >> >> >> >> >> >> > HashSet<>(Arrays.asList(topicList.split(","))); >> >> >> >> >> >> > Map<String, String> kafkaParams = new >> >> >> >> >> >> > HashMap<String, >> >> >> >> >> >> > String>() >> >> >> >> >> >> > { >> >> >> >> >> >> > { >> >> >> >> >> >> > put("metadata.broker.list", >> >> >> >> >> >> > bootstrapServers); >> >> >> >> >> >> > put("auto.offset.reset", "smallest"); >> >> >> >> >> >> > } >> >> >> >> >> >> > }; >> >> >> >> >> >> > >> >> >> >> >> >> > JavaPairInputDStream<String, String> messages = >> >> >> >> >> >> > KafkaUtils.createDirectStream(jssc, >> >> >> >> >> >> > String.class, >> >> >> >> >> >> > String.class, >> >> >> >> >> >> > StringDecoder.class, >> >> >> >> >> >> > StringDecoder.class, >> >> >> >> >> >> > kafkaParams, >> >> >> >> >> >> > topicMap); >> >> >> >> >> >> > >> >> >> >> >> >> > messages.countByWindow(Seconds.apply(10), >> >> >> >> >> >> > Milliseconds.apply(5000)) >> >> >> >> >> >> > .map(x -> {System.out.println(x); return >> >> >> >> >> >> > x;}) >> >> >> >> >> >> > .dstream().saveAsTextFiles("/tmp/spark", >> >> >> >> >> >> > "spark-streaming"); >> >> >> >> >> >> > >> >> >> >> >> >> > >> >> >> >> >> >> > I need to see a result of window operation each 10 >> >> >> >> >> >> > seconds >> >> >> >> >> >> > (this >> >> >> >> >> >> > is >> >> >> >> >> >> > only >> >> >> >> >> >> > simplest example). But really with my test data ~10M >> >> >> >> >> >> > messages I >> >> >> >> >> >> > have >> >> >> >> >> >> > first >> >> >> >> >> >> > result a minute after and further I see only zeros. Is a >> >> >> >> >> >> > way >> >> >> >> >> >> > to >> >> >> >> >> >> > limit >> >> >> >> >> >> > processing time to guarantee a response in specified time >> >> >> >> >> >> > like >> >> >> >> >> >> > Apache >> >> >> >> >> >> > Flink's triggers? >> >> >> >> >> >> > >> >> >> >> >> >> > Thanks. >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org