I’ve been thinking about this quite a bit today and what an implementation on the spark side would look like.
After some deliberation I concluded: We should instead have an `onQueryTriggerStart` method that is published every time a MicroBatch is triggered This should of course be disabled by default and have a minimum interval configuration that users can set. This minimum interval config should have a sane default that will not overwhelm the ListenerBus We can follow `onQueryProgress` and `onQueryIdle` leads and leave the publishing logic inside `ProgressReporter.ProgressContext <https://github.com/apache/spark/blob/75d80c7795ca71d24229010ab04ae740473126aa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L130>`, more specifically inside the `startTrigger <https://github.com/apache/spark/blob/75d80c7795ca71d24229010ab04ae740473126aa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L211>` method. I decided not to pursue a modification of onQueryIdle since extending it to my use case would complicate It and exposing that a query is actively triggering is what I’m aiming for. I’ve started an initial implementation on a fork here <https://github.com/JevonCowell/spark/pull/1> and created a Jira Ticket here <https://issues.apache.org/jira/browse/SPARK-51731> since I’m not sure if the mailing list is the proper place to discuss this. Regards, Jevon C P.S Just noticed it’s your onQueryIdle implementation that I’m following throughout the codebase! > On Apr 4, 2025, at 10:00 PM, Jevon Cowell <jcow...@atlassian.com> wrote: > > Hey Jungtaek! > > Wanted to update the mailing list on my current approach in case others > wanted something similar. > I created an asynchronous poller iterates though all active queries and > checks of the isTriggering boolean value is true. > > Here’s an example code snippet: > > ```java > public static void checkAndUpdateSubscribers(SparkSession sparkSession) { > StreamingQuery[] activeQueries = sparkSession.streams().active(); > > boolean anyTriggering = false; > > for (StreamingQuery query : activeQueries) { > if (query.isActive() && query.recentProgress().length > 0) { > boolean isTriggering = query.lastProgress() != null && > query.lastProgress().numInputRows() > 0; > > if (isTriggering) { > anyTriggering = true; > break; > } > } > } > > notifySubscribers(anyTriggering); > } > > private static void notifySubscribers(boolean isTriggering) { > Iterator<WeakReference<Consumer<Boolean>>> iterator = > subscribers.iterator(); > > while (iterator.hasNext()) { > WeakReference<Consumer<Boolean>> weakRef = iterator.next(); > Consumer<Boolean> consumer = weakRef.get(); > > if (consumer != null) { > consumer.accept(isTriggering); > } else { > iterator.remove(); > } > } > } > ``` > > I have yet to look at QueryListener’s implementation but people i’ll take a > stab at it. > > Regards, > Jevon C > >> On Mar 27, 2025, at 6:04 PM, Jungtaek Lim <kabhwan.opensou...@gmail.com> >> wrote: >> >> >> Hi Jevon, >> >> > From testing, I see that `onQueryIdle` does not trigger when a query is >> > waiting for the next trigger interval. >> >> Yeah it's based on trigger - if no trigger has been triggered, the event >> cannot be sent. >> >> > I wanted to get thoughts on whether it’s worth implementing a new >> > QueryListener method (something like `onQueryWait`) that will report when >> > a streaming query is awaiting a new trigger. >> >> If it's not hard and non-perf regression to implement what you said about >> "onQueryWait", I think this is the ideal behavior of "onQueryIdle" and you >> are welcome to modify the criteria of onQueryIdle rather than introducing >> new event. It just needs to coordinate with the current trigger and not >> produce the idle event if it somehow starts executing microbatch - this is >> tricky (as now we are talking about threading), but if there is an easy way >> to make it work, that would be ideal. >> >> Thanks, >> Jungtaek Lim (HeartSaVioR) >> >> On Thu, Mar 27, 2025 at 5:50 AM Jevon Cowell <jcow...@atlassian.com.invalid> >> wrote: >>> Hello! >>> >>> This is my first time ever utilizing a mailing list, so I apologize if I’m >>> not conforming to any standards or rules (and please correct me where >>> obvious). I’m looking to inquire about Spark’s StreamingQueryListener. >>> >>> I currently have a Spark Streaming job with a trigger interval of 10 >>> minutes in a cluster. I want to periodically execute maintenance jobs >>> (OPTIMIZE, DELETE, VACUUM) in the same cluster to save on compute >>> resources. Ideally, I don’t want all of these jobs running concurrently or >>> when the Spark Streaming job is processing data. I want to implement a >>> `StreamingQueryListener` to detect when any streaming queries are running >>> and delay the execution of the maintenance jobs. From testing, I see that >>> `onQueryIdle` does not trigger when a query is waiting for the next trigger >>> interval. Before diving into the Apache Spark code, I wanted to get >>> thoughts on whether it’s worth implementing a new QueryListener method >>> (something like `onQueryWait`) that will report when a streaming query is >>> awaiting a new trigger. >>> >>> Thoughts? Is this too naive? >>> --------------------------------------------------------------------- >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> <mailto:user-unsubscr...@spark.apache.org> >>>