Hi Akshay, Thanks very much for the reply!
1) The topics have 12 partitions (both input and output) 2-3) I read that "trigger" is used for microbatching, but it you would like the stream to truly process as a "stream" as quickly as possible, then to leave this opted out? In any case, I am using a minimum maxOffsetsPerTrigger (varying from 20-200 just to test this argument) Could you please link me to documentation or an example of the progress status display you mention? I see a horizontal status bar on my Spark UI for the running job but it doesn't appear to give me any specific metrics other than a ui display. Where exactly would I see the start/end offset values per batch, is that in the spark logs? Any example (documentation, sample log) would be very helpful to point me in the right direction. I think debugging the offsets per batch is exactly what I need right now. Thanks, Austin On Wed, May 8, 2019 at 9:59 AM Akshay Bhardwaj < akshay.bhardwaj1...@gmail.com> wrote: > Hi Austin, > > A few questions: > > 1. What is the partition of the kafka topic that used for input and > output data? > 2. In the write stream, I will recommend to use "trigger" with a > defined interval, if you prefer micro-batching strategy, > 3. along with defining "maxOffsetsPerTrigger" in kafka readStream > options, which lets you choose the amount of messages you want per trigger. > (Helps in maintaining the expected threshold of executors/memory for the > cluster) > > For repeated log messages, notice in your logs the streaming query > progress published. This progress status displays a lot of metrics that > shall be your first diagnosis to identify issues. > The progress status with kafka stream displays the "startOffset" and > "endOffset" values per batch. This is listed topic-partition wise the start > to end offsets per trigger batch of streaming query. > > > Akshay Bhardwaj > +91-97111-33849 > > > On Tue, May 7, 2019 at 8:02 PM Austin Weaver <aus...@flyrlabs.com> wrote: > >> Hey Spark Experts, >> >> After listening to some of you, and the presentations at Spark Summit in >> SF, I am transitioning from d-streams to structured streaming however I am >> seeing some weird results. >> >> My use case is as follows: I am reading in a stream from a kafka topic, >> transforming a message, and writing the transformed message to another >> kafka topic. >> >> While running my stream, I can see the transformed messages on the output >> topic so I know the basic structure of my stream seems to be running as >> intended. >> >> Inside my transformation, I am logging the total transform time as well >> as the raw message being transformed. (Java by the way) >> >> The 2 weird things I am seeing: >> 1) I am seeing that the consumer lag for this particular consumer group >> on the input topic is increasing. This does not make sense to me - looking >> at the transform time from the logs, it should easily be able to handle the >> incoming feed. To give an example the transform times are < 10 ms per >> record and the sample of data does not contain > 100 messages per second. >> The stream should be reducing consumer lag as it runs (especially >> considering multiple workers and partitions) >> >> 2) I am seeing the same log transformation messages over and over on the >> dataproc spark cluster logs. For example, I am currently looking at my logs >> and the last 20+ log messages are the exact same >> >> I thought 2 may be due to offsets not being handled correctly, but I am >> seeing a reasonable range of transformed messages on the target topic, and >> I'm using the built in checkpointing for spark to handle the offsets for me. >> >> In terms of 1, why would I be seeing the same log messages over and over? >> It doesnt make sense to me - wouldnt the message only be transformed once >> and it's offset committed? >> >> If anything stands out as incorrect, or something you've seen please let >> me know - this is rather new to me and my code seems to be following the >> same as other examples I see across the net >> >> Here's a redacted snippet of my stream: >> >> spark.readStream().format("kafka").option("kafka.bootstrap.servers", >> "XXXXX") >> .option("kafka.partition.assignment.strategy", >> RoundRobinAssignor.class.getName()) >> .option("subscribe", ""XXXX"") >> .option("startingOffsets", "earliest") >> .load() >> .select("value") >> .as(Encoders.STRING()) >> .map((MapFunction<String, String>) value -> transform(value), >> Encoders.STRING()) >> .writeStream() >> .format("kafka") >> .option("kafka.bootstrap.servers", "XXXXX") >> .option("topic", ""XXXXX"") >> .outputMode("append") >> .option("checkpointLocation", "/checkpoints/testCheckpoint") >> .start() >> .awaitTermination(); >> >> Thanks! >> Austin >> > -- Austin Weaver Software Engineer FLYR, Inc. www.flyrlabs.com