I’ve been thinking about this quite a bit today and what an implementation on 
the spark side would look like.

After some deliberation I concluded:

We should instead have an `onQueryTriggerStart` method that is published every 
time a MicroBatch is triggered
This should of course be disabled by default and have a minimum interval 
configuration that users can set. This minimum interval  config should have a 
sane default that will not overwhelm the ListenerBus
We can follow `onQueryProgress` and `onQueryIdle` leads and leave the 
publishing logic inside `ProgressReporter.ProgressContext 
<https://github.com/apache/spark/blob/75d80c7795ca71d24229010ab04ae740473126aa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L130>`,
 more specifically inside the `startTrigger 
<https://github.com/apache/spark/blob/75d80c7795ca71d24229010ab04ae740473126aa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L211>`
 method.

I decided not to pursue a modification of onQueryIdle since extending it to my 
use case would complicate It and exposing that a query is actively triggering 
is what I’m aiming for.

I’ve started an initial implementation on a fork here 
<https://github.com/JevonCowell/spark/pull/1> and created a Jira Ticket here 
<https://issues.apache.org/jira/browse/SPARK-51731> since I’m not sure if the 
mailing list is the proper place to discuss this. 

Regards,
Jevon C

P.S Just noticed it’s your onQueryIdle implementation that I’m following 
throughout the codebase! 

> On Apr 4, 2025, at 10:00 PM, Jevon Cowell <jcow...@atlassian.com> wrote:
> 
> Hey Jungtaek!
> 
> Wanted to update the mailing list on my current approach in case others 
> wanted something similar.
> I created an asynchronous poller iterates though all active queries and 
> checks of the isTriggering boolean value is true.
> 
> Here’s an example code snippet: 
> 
> ```java
> public static void checkAndUpdateSubscribers(SparkSession sparkSession) {
>     StreamingQuery[] activeQueries = sparkSession.streams().active();
> 
>     boolean anyTriggering = false;
> 
>     for (StreamingQuery query : activeQueries) {
>         if (query.isActive() && query.recentProgress().length > 0) {
>             boolean isTriggering = query.lastProgress() != null &&
>                                    query.lastProgress().numInputRows() > 0;
> 
>             if (isTriggering) {
>                 anyTriggering = true;
>                 break;
>             }
>         }
>     }
> 
>     notifySubscribers(anyTriggering);
> }
> 
> private static void notifySubscribers(boolean isTriggering) {
>     Iterator<WeakReference<Consumer<Boolean>>> iterator = 
> subscribers.iterator();
> 
>     while (iterator.hasNext()) {
>         WeakReference<Consumer<Boolean>> weakRef = iterator.next();
>         Consumer<Boolean> consumer = weakRef.get();
> 
>         if (consumer != null) {
>             consumer.accept(isTriggering);
>         } else {
>             iterator.remove();
>         }
>     }
> }
> ```
> 
> I have yet to look at QueryListener’s implementation but people i’ll take a 
> stab at it.
> 
> Regards,
> Jevon C
> 
>> On Mar 27, 2025, at 6:04 PM, Jungtaek Lim <kabhwan.opensou...@gmail.com> 
>> wrote:
>> 
>> 
>> Hi Jevon,
>> 
>> > From testing, I see that `onQueryIdle` does not trigger when a query is 
>> > waiting for the next trigger interval.
>> 
>> Yeah it's based on trigger - if no trigger has been triggered, the event 
>> cannot be sent.
>> 
>> > I wanted to get thoughts on whether it’s worth implementing a new 
>> > QueryListener method (something like `onQueryWait`) that will report when 
>> > a streaming query is awaiting a new trigger.
>> 
>> If it's not hard and non-perf regression to implement what you said about 
>> "onQueryWait", I think this is the ideal behavior of "onQueryIdle" and you 
>> are welcome to modify the criteria of onQueryIdle rather than introducing 
>> new event. It just needs to coordinate with the current trigger and not 
>> produce the idle event if it somehow starts executing microbatch - this is 
>> tricky (as now we are talking about threading), but if there is an easy way 
>> to make it work, that would be ideal.
>> 
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>> 
>> On Thu, Mar 27, 2025 at 5:50 AM Jevon Cowell <jcow...@atlassian.com.invalid> 
>> wrote:
>>> Hello!
>>> 
>>> This is my first time ever utilizing a mailing list, so I apologize if I’m 
>>> not conforming to any standards or rules (and please correct me where 
>>> obvious). I’m looking to inquire about Spark’s StreamingQueryListener.
>>> 
>>> I currently have a Spark Streaming job with a trigger interval of 10 
>>> minutes in a cluster. I want to periodically execute maintenance jobs 
>>> (OPTIMIZE, DELETE, VACUUM) in the same cluster to save on compute 
>>> resources. Ideally, I don’t want all of these jobs running concurrently or 
>>> when the Spark Streaming job is processing data. I want to implement a 
>>> `StreamingQueryListener` to detect when any streaming queries are running 
>>> and delay the execution of the maintenance jobs. From testing, I see that 
>>> `onQueryIdle` does not trigger when a query is waiting for the next trigger 
>>> interval. Before diving into the Apache Spark code, I wanted to get 
>>> thoughts on whether it’s worth implementing a new QueryListener method 
>>> (something like `onQueryWait`) that will report when a streaming query is 
>>> awaiting a new trigger.
>>> 
>>> Thoughts? Is this too naive?
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 

Reply via email to