> Yes and I sent you results. It is appropriate only with known parameters of 
> input data stream.

No, as far as I can tell from your posts in this thread and your
linked project, you only tested with auto.offset.reset smallest and a
large backlog.  That's not what I advised you to do.  Don't draw
inaccurate conclusions about Spark DStreams from that test.  The
reason you need to specify maxRatePerPartition is because you're
starting with a large backlog and thus a large first batch.  If you
were testing an ongoing stream with auto.offset.reset largest,
backpressure alone should be sufficient.



On Wed, Jul 6, 2016 at 12:23 PM, rss rss <rssde...@gmail.com> wrote:
>> If you aren't processing messages as fast as you receive them, you're
>> going to run out of kafka retention regardless of whether you're using
>> Spark or Flink.  Again, physics.  It's just a question of what
>> compromises you choose.
>
>
> Yes. I wrote about it. But in case of Flink you will have output strictly
> after specified time. If it is impossible to process 1000 messages per 1
> second but possible process 500, then Flink makes an output for 500. If only
> 1 message processed, Flink produced an output for one only but after 1
> second. At the same time Spark processes all 1000 but much longer that 1
> second in this case.
>
>>  that's what backpressure
>> and maxRatePerPartition are for.  As long as those are set reasonably,
>> you'll have a reasonably fixed output interval.  Have you actually
>> tested this in the way I suggested?
>
>
> Yes and I sent you results. It is appropriate only with known parameters of
> input data stream. I'm not able to estimate bounds of Sparks usage in
> general. And I'm not about it. I'm about Spark has these limitations. And
> most problem is when a calculation time depends on input data. That is if
> you want to have a stable period of output data generation you have to use
> computational system with free resources in hot reserve.
>
>  In any case thanks, now I understand how to use Spark.
>
> PS: I will continue work with Spark but to minimize emails stream I plan to
> unsubscribe from this mail list
>
> 2016-07-06 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>
>> If you aren't processing messages as fast as you receive them, you're
>> going to run out of kafka retention regardless of whether you're using
>> Spark or Flink.  Again, physics.  It's just a question of what
>> compromises you choose.
>>
>> If by "growing of a processing window time of Spark" you mean a
>> processing time that exceeds batch time... that's what backpressure
>> and maxRatePerPartition are for.  As long as those are set reasonably,
>> you'll have a reasonably fixed output interval.  Have you actually
>> tested this in the way I suggested?
>>
>> On Wed, Jul 6, 2016 at 11:38 AM, rss rss <rssde...@gmail.com> wrote:
>> > Ok, thanks. But really this is not full decision. In case of growing of
>> > processing time I will have growing of window time. That is really with
>> > Spark I have 2 points of a latency growing. First is a delay of
>> > processing
>> > of messages in Kafka queue due to physical limitation of a computer
>> > system.
>> > And second one is growing of a processing window time of Spark. In case
>> > of
>> > Flink there is only first point of delay but time intervals of output
>> > data
>> > are fixed. It is really looks like limitation of Spark. That is if
>> > dataflow
>> > is stable, all is ok. If there are peaks of loading more than
>> > possibility of
>> > computational system or data dependent time of calculation, Spark is not
>> > able to provide a periodically stable results output. Sometimes this is
>> > appropriate but sometime this is not appropriate.
>> >
>> > 2016-07-06 18:11 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>> >>
>> >> Then double the upper limit you have set until the processing time
>> >> approaches the batch time.
>> >>
>> >> On Wed, Jul 6, 2016 at 11:06 AM, rss rss <rssde...@gmail.com> wrote:
>> >> > Ok, with:
>> >> >
>> >> > .set("spark.streaming.backpressure.enabled","true")
>> >> > .set("spark.streaming.receiver.maxRate", "10000")
>> >> > .set("spark.streaming.kafka.maxRatePerPartition", "10000")
>> >> >
>> >> > I have something like
>> >> >
>> >> >
>> >> >
>> >> > ***************************************************************************
>> >> > Processing time: 5626
>> >> > Expected time: 10000
>> >> > Processed messages: 100000
>> >> > Message example: {"message": 950002,
>> >> > "uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> >> > Recovered json:
>> >> > {"message":950002,"uid":"81e2d447-69f2-4ce6-a13d-50a1a8b569a0"}
>> >> >
>> >> > That is yes, it works but throughput is much less than without
>> >> > limitations
>> >> > because of this is an absolute upper limit. And time of processing is
>> >> > half
>> >> > of available.
>> >> >
>> >> > Regarding Spark 2.0 structured streaming I will look it some later.
>> >> > Now
>> >> > I
>> >> > don't know how to strictly measure throughput and latency of this
>> >> > high
>> >> > level
>> >> > API. My aim now is to compare streaming processors.
>> >> >
>> >> >
>> >> > 2016-07-06 17:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>> >> >>
>> >> >> The configuration you set is spark.streaming.receiver.maxRate.  The
>> >> >> direct stream is not a receiver.  As I said in my first message in
>> >> >> this thread, and as the pages at
>> >> >>
>> >> >>
>> >> >>
>> >> >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>> >> >> and
>> >> >>
>> >> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>> >> >> also say, use maxRatePerPartition for the direct stream.
>> >> >>
>> >> >> Bottom line, if you have more information than your system can
>> >> >> process
>> >> >> in X amount of time, after X amount of time you can either give the
>> >> >> wrong answer, or take longer to process.  Flink can't violate the
>> >> >> laws
>> >> >> of physics.  If the tradeoffs that Flink make are better for your
>> >> >> use
>> >> >> case than the tradeoffs that DStreams make, you may be better off
>> >> >> using Flink (or testing out spark 2.0 structured streaming, although
>> >> >> there's no kafka integration available for that yet)
>> >> >>
>> >> >> On Wed, Jul 6, 2016 at 10:25 AM, rss rss <rssde...@gmail.com> wrote:
>> >> >> > ok, thanks. I tried  to set minimum max rate for beginning.
>> >> >> > However
>> >> >> > in
>> >> >> > general I don't know initial throughput. BTW it would be very
>> >> >> > useful
>> >> >> > to
>> >> >> > explain it in
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
>> >> >> >
>> >> >> > And really with
>> >> >> >
>> >> >> > .set("spark.streaming.backpressure.enabled","true")
>> >> >> > .set("spark.streaming.receiver.maxRate", "10000")
>> >> >> >
>> >> >> > I have same problem:
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > ***************************************************************************
>> >> >> > Processing time: 36994
>> >> >> > Expected time: 10000
>> >> >> > Processed messages: 3015830
>> >> >> > Message example: {"message": 1,
>> >> >> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >> >> > Recovered json:
>> >> >> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >> >> >
>> >> >> >
>> >> >> > Regarding auto.offset.reset smallest, now it is because of a test
>> >> >> > and
>> >> >> > I
>> >> >> > want
>> >> >> > to get same messages for each run. But in any case I expect to
>> >> >> > read
>> >> >> > all
>> >> >> > new
>> >> >> > messages from queue.
>> >> >> >
>> >> >> > Regarding backpressure detection. What is to do then a process
>> >> >> > time
>> >> >> > is
>> >> >> > much
>> >> >> > more then input rate? Now I see growing time of processing instead
>> >> >> > of
>> >> >> > stable
>> >> >> > 10 second and decreasing number of processed messages. Where is a
>> >> >> > limit
>> >> >> > of
>> >> >> > of backpressure algorithm?
>> >> >> >
>> >> >> > Regarding Flink. I don't know how works core of Flink but you can
>> >> >> > check
>> >> >> > self
>> >> >> > that Flink will strictly terminate processing of messages by time.
>> >> >> > Deviation
>> >> >> > of the time window from 10 seconds to several minutes is
>> >> >> > impossible.
>> >> >> >
>> >> >> > PS: I prepared this example to make possible easy observe the
>> >> >> > problem
>> >> >> > and
>> >> >> > fix it if it is a bug. For me it is obvious. May I ask you to be
>> >> >> > near
>> >> >> > to
>> >> >> > this simple source code? In other case I have to think that this
>> >> >> > is a
>> >> >> > technical limitation of Spark to work with unstable data flows.
>> >> >> >
>> >> >> > Cheers
>> >> >> >
>> >> >> > 2016-07-06 16:40 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>> >> >> >>
>> >> >> >> The direct stream determines batch sizes on the driver, in
>> >> >> >> advance
>> >> >> >> of
>> >> >> >> processing.  If you haven't specified a maximum batch size, how
>> >> >> >> would
>> >> >> >> you suggest the backpressure code determine how to limit the
>> >> >> >> first
>> >> >> >> batch?  It has no data on throughput until at least one batch is
>> >> >> >> completed.
>> >> >> >>
>> >> >> >> Again, this is why I'm saying test by producing messages into
>> >> >> >> kafka
>> >> >> >> at
>> >> >> >> a rate comparable to production, not loading a ton of messages in
>> >> >> >> and
>> >> >> >> starting from auto.offset.reset smallest.
>> >> >> >>
>> >> >> >> Even if you're unwilling to take that advice for some reason, and
>> >> >> >> unwilling to empirically determine a reasonable maximum partition
>> >> >> >> size, you should be able to estimate an upper bound such that the
>> >> >> >> first batch does not encompass your entire kafka retention.
>> >> >> >> Backpressure will kick in once it has some information to work
>> >> >> >> with.
>> >> >> >>
>> >> >> >> On Wed, Jul 6, 2016 at 8:45 AM, rss rss <rssde...@gmail.com>
>> >> >> >> wrote:
>> >> >> >> > Hello,
>> >> >> >> >
>> >> >> >> >   thanks, I tried to
>> >> >> >> > .set("spark.streaming.backpressure.enabled","true")
>> >> >> >> > but
>> >> >> >> > result is negative. Therefore I have prepared small test
>> >> >> >> > https://github.com/rssdev10/spark-kafka-streaming
>> >> >> >> >
>> >> >> >> >   How to run:
>> >> >> >> >   git clone
>> >> >> >> > https://github.com/rssdev10/spark-kafka-streaming.git
>> >> >> >> >   cd spark-kafka-streaming
>> >> >> >> >
>> >> >> >> >   # downloads kafka and zookeeper
>> >> >> >> >   ./gradlew setup
>> >> >> >> >
>> >> >> >> >   # run zookeeper, kafka, and run messages generation
>> >> >> >> >   ./gradlew test_data_prepare
>> >> >> >> >
>> >> >> >> > And in other console just run:
>> >> >> >> >    ./gradlew test_spark
>> >> >> >> >
>> >> >> >> > It is easy to break data generation by CTRL-C. And continue by
>> >> >> >> > same
>> >> >> >> > command
>> >> >> >> > ./gradlew test_data_prepare
>> >> >> >> >
>> >> >> >> > To stop all run:
>> >> >> >> >   ./gradlew stop_all
>> >> >> >> >
>> >> >> >> > Spark test must generate messages each 10 seconds like:
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > ***************************************************************************
>> >> >> >> > Processing time: 33477
>> >> >> >> > Expected time: 10000
>> >> >> >> > Processed messages: 2897866
>> >> >> >> > Message example: {"message": 1,
>> >> >> >> > "uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >> >> >> > Recovered json:
>> >> >> >> > {"message":1,"uid":"dde09b16-248b-4a2b-8936-109c72eb64cc"}
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > message is number of fist message in the window. Time values
>> >> >> >> > are
>> >> >> >> > in
>> >> >> >> > milliseconds.
>> >> >> >> >
>> >> >> >> > Brief results:
>> >> >> >> >
>> >> >> >> > Spark always reads all messaged from Kafka after first
>> >> >> >> > connection
>> >> >> >> > independently on dstream or window size time. It looks like a
>> >> >> >> > bug.
>> >> >> >> > When processing speed in Spark's app is near to speed of data
>> >> >> >> > generation
>> >> >> >> > all
>> >> >> >> > is ok.
>> >> >> >> > I added delayFactor in
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > https://github.com/rssdev10/spark-kafka-streaming/blob/master/src/main/java/SparkStreamingConsumer.java
>> >> >> >> > to emulate slow processing. And streaming process is in
>> >> >> >> > degradation.
>> >> >> >> > When
>> >> >> >> > delayFactor=0 it looks like stable process.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > Cheers
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > 2016-07-05 17:51 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>> >> >> >> >>
>> >> >> >> >> Test by producing messages into kafka at a rate comparable to
>> >> >> >> >> what
>> >> >> >> >> you
>> >> >> >> >> expect in production.
>> >> >> >> >>
>> >> >> >> >> Test with backpressure turned on, it doesn't require you to
>> >> >> >> >> specify
>> >> >> >> >> a
>> >> >> >> >> fixed limit on number of messages and will do its best to
>> >> >> >> >> maintain
>> >> >> >> >> batch timing.  Or you could empirically determine a reasonable
>> >> >> >> >> fixed
>> >> >> >> >> limit.
>> >> >> >> >>
>> >> >> >> >> Setting up a kafka topic with way more static messages in it
>> >> >> >> >> than
>> >> >> >> >> your
>> >> >> >> >> system can handle in one batch, and then starting a stream
>> >> >> >> >> from
>> >> >> >> >> the
>> >> >> >> >> beginning of it without turning on backpressure or limiting
>> >> >> >> >> the
>> >> >> >> >> number
>> >> >> >> >> of messages... isn't a reasonable way to test steady state
>> >> >> >> >> performance.  Flink can't magically give you a correct answer
>> >> >> >> >> under
>> >> >> >> >> those circumstances either.
>> >> >> >> >>
>> >> >> >> >> On Tue, Jul 5, 2016 at 10:41 AM, rss rss <rssde...@gmail.com>
>> >> >> >> >> wrote:
>> >> >> >> >> > Hi, thanks.
>> >> >> >> >> >
>> >> >> >> >> >    I know about possibility to limit number of messages. But
>> >> >> >> >> > the
>> >> >> >> >> > problem
>> >> >> >> >> > is
>> >> >> >> >> > I don't know number of messages which the system able to
>> >> >> >> >> > process.
>> >> >> >> >> > It
>> >> >> >> >> > depends
>> >> >> >> >> > on data. The example is very simple. I need a strict
>> >> >> >> >> > response
>> >> >> >> >> > after
>> >> >> >> >> > specified time. Something like soft real time. In case of
>> >> >> >> >> > Flink
>> >> >> >> >> > I
>> >> >> >> >> > able
>> >> >> >> >> > to
>> >> >> >> >> > setup strict time of processing like this:
>> >> >> >> >> >
>> >> >> >> >> > KeyedStream<Event, Integer> keyed =
>> >> >> >> >> > eventStream.keyBy(event.userId.getBytes()[0] % partNum);
>> >> >> >> >> > WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
>> >> >> >> >> > keyed.timeWindow(
>> >> >> >> >> > Time.seconds(10) );
>> >> >> >> >> > DataStream<Aggregator> uniqUsers =
>> >> >> >> >> > uniq.trigger(ProcessingTimeTrigger.create())
>> >> >> >> >> >         .fold(new Aggregator(), new FoldFunction<Event,
>> >> >> >> >> > Aggregator>()
>> >> >> >> >> > {
>> >> >> >> >> >             @Override
>> >> >> >> >> >             public Aggregator fold(Aggregator accumulator,
>> >> >> >> >> > Event
>> >> >> >> >> > value)
>> >> >> >> >> > throws Exception {
>> >> >> >> >> >                 accumulator.add(event.userId);
>> >> >> >> >> >                 return accumulator;
>> >> >> >> >> >             }
>> >> >> >> >> >         });
>> >> >> >> >> >
>> >> >> >> >> > uniq.print();
>> >> >> >> >> >
>> >> >> >> >> > And I can see results every 10 seconds independently on
>> >> >> >> >> > input
>> >> >> >> >> > data
>> >> >> >> >> > stream.
>> >> >> >> >> > Is it possible something in Spark?
>> >> >> >> >> >
>> >> >> >> >> > Regarding zeros in my example the reason I have prepared
>> >> >> >> >> > message
>> >> >> >> >> > queue
>> >> >> >> >> > in
>> >> >> >> >> > Kafka for the tests. If I add some messages after I able to
>> >> >> >> >> > see
>> >> >> >> >> > new
>> >> >> >> >> > messages. But in any case I need first response after 10
>> >> >> >> >> > second.
>> >> >> >> >> > Not
>> >> >> >> >> > minutes
>> >> >> >> >> > or hours after.
>> >> >> >> >> >
>> >> >> >> >> > Thanks.
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >> > 2016-07-05 17:12 GMT+02:00 Cody Koeninger
>> >> >> >> >> > <c...@koeninger.org>:
>> >> >> >> >> >>
>> >> >> >> >> >> If you're talking about limiting the number of messages per
>> >> >> >> >> >> batch
>> >> >> >> >> >> to
>> >> >> >> >> >> try and keep from exceeding batch time, see
>> >> >> >> >> >>
>> >> >> >> >> >> http://spark.apache.org/docs/latest/configuration.html
>> >> >> >> >> >>
>> >> >> >> >> >> look for backpressure and maxRatePerParition
>> >> >> >> >> >>
>> >> >> >> >> >>
>> >> >> >> >> >> But if you're only seeing zeros after your job runs for a
>> >> >> >> >> >> minute,
>> >> >> >> >> >> it
>> >> >> >> >> >> sounds like something else is wrong.
>> >> >> >> >> >>
>> >> >> >> >> >>
>> >> >> >> >> >> On Tue, Jul 5, 2016 at 10:02 AM, rss rss
>> >> >> >> >> >> <rssde...@gmail.com>
>> >> >> >> >> >> wrote:
>> >> >> >> >> >> > Hello,
>> >> >> >> >> >> >
>> >> >> >> >> >> >   I'm trying to organize processing of messages from
>> >> >> >> >> >> > Kafka.
>> >> >> >> >> >> > And
>> >> >> >> >> >> > there
>> >> >> >> >> >> > is
>> >> >> >> >> >> > a
>> >> >> >> >> >> > typical case when a number of messages in kafka's queue
>> >> >> >> >> >> > is
>> >> >> >> >> >> > more
>> >> >> >> >> >> > then
>> >> >> >> >> >> > Spark
>> >> >> >> >> >> > app's possibilities to process. But I need a strong time
>> >> >> >> >> >> > limit
>> >> >> >> >> >> > to
>> >> >> >> >> >> > prepare
>> >> >> >> >> >> > result for at least for a part of data.
>> >> >> >> >> >> >
>> >> >> >> >> >> > Code example:
>> >> >> >> >> >> >
>> >> >> >> >> >> >         SparkConf sparkConf = new SparkConf()
>> >> >> >> >> >> >                 .setAppName("Spark")
>> >> >> >> >> >> >                 .setMaster("local");
>> >> >> >> >> >> >
>> >> >> >> >> >> >         JavaStreamingContext jssc = new
>> >> >> >> >> >> > JavaStreamingContext(sparkConf,
>> >> >> >> >> >> > Milliseconds.apply(5000));
>> >> >> >> >> >> >
>> >> >> >> >> >> >         jssc.checkpoint("/tmp/spark_checkpoint");
>> >> >> >> >> >> >
>> >> >> >> >> >> >         Set<String> topicMap = new
>> >> >> >> >> >> > HashSet<>(Arrays.asList(topicList.split(",")));
>> >> >> >> >> >> >         Map<String, String> kafkaParams = new
>> >> >> >> >> >> > HashMap<String,
>> >> >> >> >> >> > String>()
>> >> >> >> >> >> > {
>> >> >> >> >> >> >             {
>> >> >> >> >> >> >                 put("metadata.broker.list",
>> >> >> >> >> >> > bootstrapServers);
>> >> >> >> >> >> >                 put("auto.offset.reset", "smallest");
>> >> >> >> >> >> >             }
>> >> >> >> >> >> >         };
>> >> >> >> >> >> >
>> >> >> >> >> >> >         JavaPairInputDStream<String, String> messages =
>> >> >> >> >> >> >                 KafkaUtils.createDirectStream(jssc,
>> >> >> >> >> >> >                         String.class,
>> >> >> >> >> >> >                         String.class,
>> >> >> >> >> >> >                         StringDecoder.class,
>> >> >> >> >> >> >                         StringDecoder.class,
>> >> >> >> >> >> >                         kafkaParams,
>> >> >> >> >> >> >                         topicMap);
>> >> >> >> >> >> >
>> >> >> >> >> >> >         messages.countByWindow(Seconds.apply(10),
>> >> >> >> >> >> > Milliseconds.apply(5000))
>> >> >> >> >> >> >                 .map(x -> {System.out.println(x); return
>> >> >> >> >> >> > x;})
>> >> >> >> >> >> >                 .dstream().saveAsTextFiles("/tmp/spark",
>> >> >> >> >> >> > "spark-streaming");
>> >> >> >> >> >> >
>> >> >> >> >> >> >
>> >> >> >> >> >> >   I need to see a result of window operation each 10
>> >> >> >> >> >> > seconds
>> >> >> >> >> >> > (this
>> >> >> >> >> >> > is
>> >> >> >> >> >> > only
>> >> >> >> >> >> > simplest example). But really with my test data ~10M
>> >> >> >> >> >> > messages I
>> >> >> >> >> >> > have
>> >> >> >> >> >> > first
>> >> >> >> >> >> > result a minute after and further I see only zeros. Is a
>> >> >> >> >> >> > way
>> >> >> >> >> >> > to
>> >> >> >> >> >> > limit
>> >> >> >> >> >> > processing time to guarantee a response in specified time
>> >> >> >> >> >> > like
>> >> >> >> >> >> > Apache
>> >> >> >> >> >> > Flink's triggers?
>> >> >> >> >> >> >
>> >> >> >> >> >> > Thanks.
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >
>> >> >> >
>> >> >
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to