This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push: new e9acd15a8 [#5112] feat(core): support pre event for event listener systems (#5110) e9acd15a8 is described below commit e9acd15a8586f75a2e95a95753030620906aa20b Author: FANNG <xiaoj...@datastrato.com> AuthorDate: Tue Oct 15 15:42:28 2024 +0800 [#5112] feat(core): support pre event for event listener systems (#5110) ### What changes were proposed in this pull request? support pre event for event listener systems 1. add new `PreEvent` to represent Pre event and only `SYNC` event listeners could process `PreEvent` 2. keep `Event` as post event to keep compatibility. 3. `EventBus` dispatch event to corresponding event listeners. ### Why are the changes needed? Fix: #5112 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add UT --- .../gravitino/listener/AsyncQueueListener.java | 49 +++++--- .../org/apache/gravitino/listener/EventBus.java | 37 ++++-- .../listener/EventListenerPluginWrapper.java | 37 +++++- .../listener/api/EventListenerPlugin.java | 30 +++-- .../api/event/{Event.java => BaseEvent.java} | 4 +- .../apache/gravitino/listener/api/event/Event.java | 57 +-------- .../gravitino/listener/api/event/PreEvent.java | 31 +++++ .../gravitino/listener/DummyEventListener.java | 37 ++++-- .../listener/TestEventListenerManager.java | 133 ++++++++++++++++++--- .../listener/api/event/TestCatalogEvent.java | 24 ++-- .../listener/api/event/TestFilesetEvent.java | 26 ++-- .../listener/api/event/TestMetalakeEvent.java | 20 ++-- .../listener/api/event/TestPartitionEvent.java | 24 ++-- .../listener/api/event/TestSchemaEvent.java | 20 ++-- .../listener/api/event/TestTableEvent.java | 24 ++-- .../listener/api/event/TestTopicEvent.java | 20 ++-- 16 files changed, 375 insertions(+), 198 deletions(-) diff --git a/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java b/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java index 641bc3eb5..18043964d 100644 --- a/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java +++ b/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java @@ -29,7 +29,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.gravitino.listener.api.EventListenerPlugin; +import org.apache.gravitino.listener.api.event.BaseEvent; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +46,7 @@ public class AsyncQueueListener implements EventListenerPlugin { private static final String NAME_PREFIX = "async-queue-listener-"; private final List<EventListenerPlugin> eventListeners; - private final BlockingQueue<Event> queue; + private final BlockingQueue<BaseEvent> queue; private final Thread asyncProcessor; private final int dispatcherJoinSeconds; private final AtomicBoolean stopped = new AtomicBoolean(false); @@ -68,20 +70,13 @@ public class AsyncQueueListener implements EventListenerPlugin { } @Override - public void onPostEvent(Event event) { - if (stopped.get()) { - LOG.warn( - "{} drop event: {}, since AsyncQueueListener is stopped", - asyncQueueListenerName, - event.getClass().getSimpleName()); - return; - } - - if (queue.offer(event)) { - return; - } + public void onPreEvent(PreEvent event) { + enqueueEvent(event); + } - logDropEventsIfNecessary(); + @Override + public void onPostEvent(Event event) { + enqueueEvent(event); } @Override @@ -117,8 +112,14 @@ public class AsyncQueueListener implements EventListenerPlugin { private void processEvents() { while (!Thread.currentThread().isInterrupted()) { try { - Event event = queue.take(); - this.eventListeners.forEach(listener -> listener.onPostEvent(event)); + BaseEvent baseEvent = queue.take(); + if (baseEvent instanceof PreEvent) { + this.eventListeners.forEach(listener -> listener.onPreEvent((PreEvent) baseEvent)); + } else if (baseEvent instanceof Event) { + this.eventListeners.forEach(listener -> listener.onPostEvent((Event) baseEvent)); + } else { + LOG.warn("Unknown event type: {}", baseEvent.getClass().getSimpleName()); + } } catch (InterruptedException e) { LOG.warn("{} event dispatcher thread is interrupted.", asyncQueueListenerName); break; @@ -154,4 +155,20 @@ public class AsyncQueueListener implements EventListenerPlugin { } } } + + private void enqueueEvent(BaseEvent baseEvent) { + if (stopped.get()) { + LOG.warn( + "{} drop event: {}, since AsyncQueueListener is stopped", + asyncQueueListenerName, + baseEvent.getClass().getSimpleName()); + return; + } + + if (queue.offer(baseEvent)) { + return; + } + + logDropEventsIfNecessary(); + } } diff --git a/core/src/main/java/org/apache/gravitino/listener/EventBus.java b/core/src/main/java/org/apache/gravitino/listener/EventBus.java index 6b18f9a5a..d851dc292 100644 --- a/core/src/main/java/org/apache/gravitino/listener/EventBus.java +++ b/core/src/main/java/org/apache/gravitino/listener/EventBus.java @@ -21,8 +21,11 @@ package org.apache.gravitino.listener; import com.google.common.annotations.VisibleForTesting; import java.util.List; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.api.EventListenerPlugin; +import org.apache.gravitino.listener.api.event.BaseEvent; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; /** * The {@code EventBus} class serves as a mechanism to dispatch events to registered listeners. It @@ -34,26 +37,32 @@ public class EventBus { // EventListenerPluginWrapper, // which are meant for synchronous event listening, or AsyncQueueListener, designed for // asynchronous event processing. - private final List<EventListenerPlugin> postEventListeners; + private final List<EventListenerPlugin> eventListeners; /** * Constructs an EventBus with a predefined list of event listeners. * - * @param postEventListeners A list of {@link EventListenerPlugin} instances that are to be - * registered with this EventBus for event dispatch. + * @param eventListeners A list of {@link EventListenerPlugin} instances that are to be registered + * with this EventBus for event dispatch. */ - public EventBus(List<EventListenerPlugin> postEventListeners) { - this.postEventListeners = postEventListeners; + public EventBus(List<EventListenerPlugin> eventListeners) { + this.eventListeners = eventListeners; } /** * Dispatches an event to all registered listeners. Each listener processes the event based on its * implementation, which could be either synchronous or asynchronous. * - * @param event The event to be dispatched to all registered listeners. + * @param baseEvent The event to be dispatched to all registered listeners. */ - public void dispatchEvent(Event event) { - postEventListeners.forEach(postEventListener -> postEventListener.onPostEvent(event)); + public void dispatchEvent(BaseEvent baseEvent) { + if (baseEvent instanceof PreEvent) { + dispatchPreEvent((PreEvent) baseEvent); + } else if (baseEvent instanceof Event) { + dispatchPostEvent((Event) baseEvent); + } else { + throw new RuntimeException("Unknown event type:" + baseEvent.getClass().getSimpleName()); + } } /** @@ -64,7 +73,15 @@ public class EventBus { * EventBus. */ @VisibleForTesting - List<EventListenerPlugin> getPostEventListeners() { - return postEventListeners; + List<EventListenerPlugin> getEventListeners() { + return eventListeners; + } + + private void dispatchPostEvent(Event postEvent) { + eventListeners.forEach(eventListener -> eventListener.onPostEvent(postEvent)); + } + + private void dispatchPreEvent(PreEvent preEvent) throws ForbiddenException { + eventListeners.forEach(eventListener -> eventListener.onPreEvent(preEvent)); } } diff --git a/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java b/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java index a14833963..8e0a2ffbc 100644 --- a/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java +++ b/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java @@ -21,8 +21,11 @@ package org.apache.gravitino.listener; import com.google.common.annotations.VisibleForTesting; import java.util.Map; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.api.EventListenerPlugin; +import org.apache.gravitino.listener.api.event.BaseEvent; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,11 +70,27 @@ public class EventListenerPluginWrapper implements EventListenerPlugin { try { userEventListener.onPostEvent(event); } catch (Exception e) { - LOG.warn( - "Event listener {} process event {} failed,", - listenerName, - event.getClass().getSimpleName(), - e); + printExceptionInEventProcess(listenerName, event, e); + } + } + + @Override + public void onPreEvent(PreEvent preEvent) { + try { + userEventListener.onPreEvent(preEvent); + } catch (ForbiddenException e) { + if (Mode.SYNC.equals(mode())) { + LOG.warn( + "Event listener {} process pre event {} throws ForbiddenException, will skip the " + + "operation.", + listenerName, + preEvent.getClass().getSimpleName(), + e); + throw e; + } + printExceptionInEventProcess(listenerName, preEvent, e); + } catch (Exception e) { + printExceptionInEventProcess(listenerName, preEvent, e); } } @@ -79,4 +98,12 @@ public class EventListenerPluginWrapper implements EventListenerPlugin { EventListenerPlugin getUserEventListener() { return userEventListener; } + + private void printExceptionInEventProcess(String listenerName, BaseEvent baseEvent, Exception e) { + LOG.warn( + "Event listener {} process event {} failed,", + listenerName, + baseEvent.getClass().getSimpleName(), + e); + } } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/EventListenerPlugin.java b/core/src/main/java/org/apache/gravitino/listener/api/EventListenerPlugin.java index 8a0b8d982..06d5b4440 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/EventListenerPlugin.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/EventListenerPlugin.java @@ -21,7 +21,9 @@ package org.apache.gravitino.listener.api; import java.util.Map; import org.apache.gravitino.annotation.DeveloperApi; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; /** * Defines an interface for event listeners that manage the lifecycle and state of a plugin, @@ -95,17 +97,29 @@ public interface EventListenerPlugin { void stop() throws RuntimeException; /** - * Handles events generated after the completion of an operation. Implementers are responsible for - * processing these events, which may involve additional logic to respond to the operation - * outcomes. + * Handle post-events generated after the completion of an operation. * - * <p>This method provides a hook for post-operation event processing, allowing plugins to react - * or adapt based on the event details. + * <p>This method provides a hook for post-operation event processing, you couldn't change the + * resource in the event. * - * @param event The event to be processed. - * @throws RuntimeException Indicates issues encountered during event processing. + * @param postEvent The post event to be processed. + * @throws RuntimeException Indicates issues encountered during event processing, this has no + * affect to the operation. */ - void onPostEvent(Event event) throws RuntimeException; + default void onPostEvent(Event postEvent) throws RuntimeException {} + + /** + * Handle pre-events generated before the operation. + * + * <p>This method handles pre-operation events in SYNC or ASYNC mode, any changes to resources in + * the event will affect the subsequent operations. + * + * @param preEvent The pre event to be processed. + * @throws ForbiddenException The subsequent operation will be skipped if and only if the event + * listener throwing {@code org.apache.gravitino.exceptions.ForbiddenException} and the event + * listener is SYNC mode, the exception will be ignored and logged only in other conditions. + */ + default void onPreEvent(PreEvent preEvent) throws ForbiddenException {} /** * Specifies the default operational mode for event processing by the plugin. The default diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java b/core/src/main/java/org/apache/gravitino/listener/api/event/BaseEvent.java similarity index 96% copy from core/src/main/java/org/apache/gravitino/listener/api/event/Event.java copy to core/src/main/java/org/apache/gravitino/listener/api/event/BaseEvent.java index 89e233b43..973323a05 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/BaseEvent.java @@ -29,7 +29,7 @@ import org.apache.gravitino.annotation.DeveloperApi; * should provide specific details related to their individual event types. */ @DeveloperApi -public abstract class Event { +public abstract class BaseEvent { private final String user; @Nullable private final NameIdentifier identifier; private final long eventTime; @@ -42,7 +42,7 @@ public abstract class Event { * @param identifier The resource identifier associated with this event. This may refer to various * types of resources such as a metalake, catalog, schema, or table, etc. */ - protected Event(String user, NameIdentifier identifier) { + protected BaseEvent(String user, NameIdentifier identifier) { this.user = user; this.identifier = identifier; this.eventTime = System.currentTimeMillis(); diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java b/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java index 89e233b43..7dba616d4 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java @@ -19,64 +19,13 @@ package org.apache.gravitino.listener.api.event; -import javax.annotation.Nullable; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.annotation.DeveloperApi; -/** - * The abstract base class for all events. It encapsulates common information such as the user who - * generated the event and the identifier for the resource associated with the event. Subclasses - * should provide specific details related to their individual event types. - */ +/** Represents a post event. */ @DeveloperApi -public abstract class Event { - private final String user; - @Nullable private final NameIdentifier identifier; - private final long eventTime; - - /** - * Constructs an Event instance with the specified user and resource identifier details. - * - * @param user The user associated with this event. It provides context about who triggered the - * event. - * @param identifier The resource identifier associated with this event. This may refer to various - * types of resources such as a metalake, catalog, schema, or table, etc. - */ +public abstract class Event extends BaseEvent { protected Event(String user, NameIdentifier identifier) { - this.user = user; - this.identifier = identifier; - this.eventTime = System.currentTimeMillis(); - } - - /** - * Retrieves the user associated with this event. - * - * @return A string representing the user associated with this event. - */ - public String user() { - return user; - } - - /** - * Retrieves the resource identifier associated with this event. - * - * <p>For list operations within a namespace, the identifier is the identifier corresponds to that - * namespace. For metalake list operation, identifier is null. - * - * @return A NameIdentifier object that represents the resource, like a metalake, catalog, schema, - * table, etc., associated with the event. - */ - @Nullable - public NameIdentifier identifier() { - return identifier; - } - - /** - * Returns the timestamp when the event was created. - * - * @return The event creation time in milliseconds since epoch. - */ - public long eventTime() { - return eventTime; + super(user, identifier); } } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/PreEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/PreEvent.java new file mode 100644 index 000000000..52e26aec3 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/PreEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents a pre event. */ +@DeveloperApi +public abstract class PreEvent extends BaseEvent { + protected PreEvent(String user, NameIdentifier identifier) { + super(user, identifier); + } +} diff --git a/core/src/test/java/org/apache/gravitino/listener/DummyEventListener.java b/core/src/test/java/org/apache/gravitino/listener/DummyEventListener.java index 17e3e4249..4ec7ab715 100644 --- a/core/src/test/java/org/apache/gravitino/listener/DummyEventListener.java +++ b/core/src/test/java/org/apache/gravitino/listener/DummyEventListener.java @@ -24,14 +24,17 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.Getter; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.api.EventListenerPlugin; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; public class DummyEventListener implements EventListenerPlugin { Map<String, String> properties; - @Getter LinkedList<Event> events = new LinkedList<>(); + @Getter LinkedList<Event> postEvents = new LinkedList<>(); + @Getter LinkedList<PreEvent> preEvents = new LinkedList<>(); @Override public void init(Map<String, String> properties) { @@ -46,7 +49,17 @@ public class DummyEventListener implements EventListenerPlugin { @Override public void onPostEvent(Event event) { - this.events.add(event); + postEvents.add(event); + } + + @Override + public void onPreEvent(PreEvent preEvent) { + if (preEvent.equals(TestEventListenerManager.DUMMY_FORBIDDEN_PRE_EVENT_INSTANCE)) { + throw new ForbiddenException(""); + } else if (preEvent.equals(TestEventListenerManager.DUMMY_EXCEPTION_PRE_EVENT_INSTANCE)) { + throw new RuntimeException(""); + } + preEvents.add(preEvent); } @Override @@ -54,18 +67,26 @@ public class DummyEventListener implements EventListenerPlugin { return Mode.SYNC; } - public Event popEvent() { - Assertions.assertTrue(events.size() > 0, "No events to pop"); - return events.removeLast(); + public Event popPostEvent() { + Assertions.assertTrue(postEvents.size() > 0, "No events to pop"); + return postEvents.removeLast(); } public static class DummyAsyncEventListener extends DummyEventListener { - public List<Event> tryGetEvents() { + public List<Event> tryGetPostEvents() { + Awaitility.await() + .atMost(20, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> getPostEvents().size() > 0); + return getPostEvents(); + } + + public List<PreEvent> tryGetPreEvents() { Awaitility.await() .atMost(20, TimeUnit.SECONDS) .pollInterval(10, TimeUnit.MILLISECONDS) - .until(() -> getEvents().size() > 0); - return getEvents(); + .until(() -> getPreEvents().size() > 0); + return getPreEvents(); } @Override diff --git a/core/src/test/java/org/apache/gravitino/listener/TestEventListenerManager.java b/core/src/test/java/org/apache/gravitino/listener/TestEventListenerManager.java index d0dda8878..fd7a61272 100644 --- a/core/src/test/java/org/apache/gravitino/listener/TestEventListenerManager.java +++ b/core/src/test/java/org/apache/gravitino/listener/TestEventListenerManager.java @@ -26,22 +26,42 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.DummyEventListener.DummyAsyncEventListener; import org.apache.gravitino.listener.DummyEventListener.DummyAsyncIsolatedEventListener; import org.apache.gravitino.listener.api.EventListenerPlugin; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class TestEventListenerManager { - static class DummyEvent extends Event { - protected DummyEvent(String user, NameIdentifier identifier) { + + static class DummyPostEvent extends Event { + + protected DummyPostEvent(String user, NameIdentifier identifier) { super(user, identifier); } } - private static final DummyEvent DUMMY_EVENT_INSTANCE = - new DummyEvent("user", NameIdentifier.of("a", "b")); + static class DummyPreEvent extends PreEvent { + + protected DummyPreEvent(String user, NameIdentifier identifier) { + super(user, identifier); + } + } + + private static final DummyPostEvent DUMMY_POST_EVENT_INSTANCE = + new DummyPostEvent("user", NameIdentifier.of("a", "b")); + + private static final DummyPreEvent DUMMY_PRE_EVENT_INSTANCE = + new DummyPreEvent("user2", NameIdentifier.of("a2", "b2")); + + public static final DummyPreEvent DUMMY_FORBIDDEN_PRE_EVENT_INSTANCE = + new DummyPreEvent("user3", NameIdentifier.of("a3", "b3")); + + public static final DummyPreEvent DUMMY_EXCEPTION_PRE_EVENT_INSTANCE = + new DummyPreEvent("user4", NameIdentifier.of("a4", "b4")); @Test void testSyncListener() { @@ -54,9 +74,10 @@ public class TestEventListenerManager { eventListenerManager.start(); EventBus eventBus = eventListenerManager.createEventBus(); - eventBus.dispatchEvent(DUMMY_EVENT_INSTANCE); - List<EventListenerPlugin> listeners = eventBus.getPostEventListeners(); + // test post event + eventBus.dispatchEvent(DUMMY_POST_EVENT_INSTANCE); + List<EventListenerPlugin> listeners = eventBus.getEventListeners(); Assertions.assertEquals(2, listeners.size()); Set<String> names = listeners.stream() @@ -66,7 +87,27 @@ public class TestEventListenerManager { EventListenerPluginWrapper wrapper = (EventListenerPluginWrapper) listener; EventListenerPlugin userListener = wrapper.getUserEventListener(); Assertions.assertTrue(userListener instanceof DummyEventListener); - checkEvents(((DummyEventListener) userListener).getEvents()); + checkPostEvents(((DummyEventListener) userListener).getPostEvents()); + Assertions.assertEquals( + 0, ((DummyEventListener) userListener).getPreEvents().size()); + return ((DummyEventListener) userListener).properties.get("name"); + }) + .collect(Collectors.toSet()); + Assertions.assertEquals(ImmutableSet.of(sync1, sync2), names); + + // test pre event + eventBus.dispatchEvent(DUMMY_PRE_EVENT_INSTANCE); + names = + listeners.stream() + .map( + listener -> { + Assertions.assertTrue(listener instanceof EventListenerPluginWrapper); + EventListenerPluginWrapper wrapper = (EventListenerPluginWrapper) listener; + EventListenerPlugin userListener = wrapper.getUserEventListener(); + Assertions.assertTrue(userListener instanceof DummyEventListener); + checkPreEvents(((DummyEventListener) userListener).getPreEvents()); + Assertions.assertEquals( + 0, ((DummyEventListener) userListener).getPostEvents().size()); return ((DummyEventListener) userListener).properties.get("name"); }) .collect(Collectors.toSet()); @@ -84,10 +125,11 @@ public class TestEventListenerManager { EventListenerManager eventListenerManager = new EventListenerManager(); eventListenerManager.init(properties); eventListenerManager.start(); - EventBus eventBus = eventListenerManager.createEventBus(); - eventBus.dispatchEvent(DUMMY_EVENT_INSTANCE); - List<EventListenerPlugin> listeners = eventBus.getPostEventListeners(); + + // Test post event + eventBus.dispatchEvent(DUMMY_POST_EVENT_INSTANCE); + List<EventListenerPlugin> listeners = eventBus.getEventListeners(); Assertions.assertEquals(1, listeners.size()); Assertions.assertTrue(listeners.get(0) instanceof AsyncQueueListener); @@ -102,12 +144,27 @@ public class TestEventListenerManager { EventListenerPlugin userListener = ((EventListenerPluginWrapper) shareQueueListener).getUserEventListener(); Assertions.assertTrue(userListener instanceof DummyAsyncEventListener); - checkEvents(((DummyAsyncEventListener) userListener).tryGetEvents()); + checkPostEvents(((DummyAsyncEventListener) userListener).tryGetPostEvents()); + Assertions.assertEquals( + 0, ((DummyAsyncEventListener) userListener).getPreEvents().size()); return ((DummyAsyncEventListener) userListener).properties.get("name"); }) .collect(Collectors.toSet()); Assertions.assertEquals(ImmutableSet.of(async1, async2), sharedQueueListenerNames); + // Test pre event + eventBus.dispatchEvent(DUMMY_PRE_EVENT_INSTANCE); + shareQueueListeners.forEach( + shareQueueListener -> { + Assertions.assertTrue(shareQueueListener instanceof EventListenerPluginWrapper); + EventListenerPlugin userListener = + ((EventListenerPluginWrapper) shareQueueListener).getUserEventListener(); + Assertions.assertTrue(userListener instanceof DummyAsyncEventListener); + checkPreEvents(((DummyAsyncEventListener) userListener).tryGetPreEvents()); + Assertions.assertEquals( + 0, ((DummyAsyncEventListener) userListener).getPostEvents().size()); + }); + eventListenerManager.stop(); } @@ -122,8 +179,8 @@ public class TestEventListenerManager { eventListenerManager.start(); EventBus eventBus = eventListenerManager.createEventBus(); - eventBus.dispatchEvent(DUMMY_EVENT_INSTANCE); - List<EventListenerPlugin> listeners = eventBus.getPostEventListeners(); + eventBus.dispatchEvent(DUMMY_POST_EVENT_INSTANCE); + List<EventListenerPlugin> listeners = eventBus.getEventListeners(); Assertions.assertEquals(2, listeners.size()); Set<String> isolatedListenerNames = @@ -141,12 +198,49 @@ public class TestEventListenerManager { ((EventListenerPluginWrapper) internalListeners.get(0)) .getUserEventListener(); Assertions.assertTrue(userListener instanceof DummyAsyncEventListener); - checkEvents(((DummyAsyncEventListener) userListener).tryGetEvents()); + checkPostEvents(((DummyAsyncEventListener) userListener).tryGetPostEvents()); + Assertions.assertEquals( + 0, ((DummyAsyncEventListener) userListener).getPreEvents().size()); return ((DummyAsyncEventListener) userListener).properties.get("name"); }) .collect(Collectors.toSet()); Assertions.assertEquals(ImmutableSet.of(async1, async2), isolatedListenerNames); + eventBus.dispatchEvent(DUMMY_PRE_EVENT_INSTANCE); + listeners.forEach( + listener -> { + Assertions.assertTrue(listener instanceof AsyncQueueListener); + AsyncQueueListener asyncQueueListener = (AsyncQueueListener) listener; + List<EventListenerPlugin> internalListeners = asyncQueueListener.getEventListeners(); + Assertions.assertEquals(1, internalListeners.size()); + Assertions.assertTrue(internalListeners.get(0) instanceof EventListenerPluginWrapper); + EventListenerPlugin userListener = + ((EventListenerPluginWrapper) internalListeners.get(0)).getUserEventListener(); + Assertions.assertTrue(userListener instanceof DummyAsyncEventListener); + checkPreEvents(((DummyAsyncEventListener) userListener).tryGetPreEvents()); + Assertions.assertEquals( + 0, ((DummyAsyncEventListener) userListener).getPostEvents().size()); + }); + + eventListenerManager.stop(); + } + + @Test + void testForbiddenPreEvent() { + String sync1 = "sync1"; + String sync2 = "sync2"; + Map<String, String> properties = createSyncEventListenerConfig(sync1, sync2); + + EventListenerManager eventListenerManager = new EventListenerManager(); + eventListenerManager.init(properties); + eventListenerManager.start(); + + EventBus eventBus = eventListenerManager.createEventBus(); + + Assertions.assertThrowsExactly( + ForbiddenException.class, () -> eventBus.dispatchEvent(DUMMY_FORBIDDEN_PRE_EVENT_INSTANCE)); + + Assertions.assertDoesNotThrow(() -> eventBus.dispatchEvent(DUMMY_EXCEPTION_PRE_EVENT_INSTANCE)); eventListenerManager.stop(); } @@ -206,8 +300,15 @@ public class TestEventListenerManager { return config; } - private void checkEvents(List<Event> events) { + private void checkPostEvents(List<Event> events) { + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(DUMMY_POST_EVENT_INSTANCE, events.get(0)); + events.clear(); + } + + private void checkPreEvents(List<PreEvent> events) { Assertions.assertEquals(1, events.size()); - Assertions.assertEquals(DUMMY_EVENT_INSTANCE, events.get(0)); + Assertions.assertEquals(DUMMY_PRE_EVENT_INSTANCE, events.get(0)); + events.clear(); } } diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestCatalogEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestCatalogEvent.java index ae5407329..d20508943 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestCatalogEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestCatalogEvent.java @@ -65,7 +65,7 @@ public class TestCatalogEvent { NameIdentifier identifier = NameIdentifier.of("metalake", catalog.name()); dispatcher.createCatalog( identifier, catalog.type(), catalog.provider(), catalog.comment(), catalog.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateCatalogEvent.class, event.getClass()); CatalogInfo catalogInfo = ((CreateCatalogEvent) event).createdCatalogInfo(); @@ -76,7 +76,7 @@ public class TestCatalogEvent { void testLoadCatalogEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", catalog.name()); dispatcher.loadCatalog(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadCatalogEvent.class, event.getClass()); CatalogInfo catalogInfo = ((LoadCatalogEvent) event).loadedCatalogInfo(); @@ -88,7 +88,7 @@ public class TestCatalogEvent { NameIdentifier identifier = NameIdentifier.of("metalake", catalog.name()); CatalogChange catalogChange = CatalogChange.setProperty("a", "b"); dispatcher.alterCatalog(identifier, catalogChange); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterCatalogEvent.class, event.getClass()); CatalogInfo catalogInfo = ((AlterCatalogEvent) event).updatedCatalogInfo(); @@ -102,7 +102,7 @@ public class TestCatalogEvent { void testDropCatalogEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", catalog.name()); dispatcher.dropCatalog(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropCatalogEvent.class, event.getClass()); Assertions.assertEquals(true, ((DropCatalogEvent) event).isExists()); @@ -112,7 +112,7 @@ public class TestCatalogEvent { void testListCatalogEvent() { Namespace namespace = Namespace.of("metalake"); dispatcher.listCatalogs(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListCatalogEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListCatalogEvent) event).namespace()); @@ -122,7 +122,7 @@ public class TestCatalogEvent { void testListCatalogInfoEvent() { Namespace namespace = Namespace.of("metalake"); dispatcher.listCatalogsInfo(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListCatalogEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListCatalogEvent) event).namespace()); @@ -140,7 +140,7 @@ public class TestCatalogEvent { catalog.provider(), catalog.comment(), catalog.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -154,7 +154,7 @@ public class TestCatalogEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadCatalog(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -168,7 +168,7 @@ public class TestCatalogEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterCatalog(identifier, catalogChange)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -183,7 +183,7 @@ public class TestCatalogEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropCatalog(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -195,7 +195,7 @@ public class TestCatalogEvent { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listCatalogs(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, ((ListCatalogFailureEvent) event).exception().getClass()); @@ -207,7 +207,7 @@ public class TestCatalogEvent { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listCatalogsInfo(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, ((ListCatalogFailureEvent) event).exception().getClass()); diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java index efc073b19..321088711 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java @@ -75,7 +75,7 @@ public class TestFilesetEvent { fileset.type(), fileset.storageLocation(), fileset.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateFilesetEvent.class, event.getClass()); FilesetInfo filesetInfo = ((CreateFilesetEvent) event).createdFilesetInfo(); @@ -86,7 +86,7 @@ public class TestFilesetEvent { void testLoadFilesetEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", fileset.name()); dispatcher.loadFileset(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadFilesetEvent.class, event.getClass()); FilesetInfo filesetInfo = ((LoadFilesetEvent) event).loadedFilesetInfo(); @@ -98,7 +98,7 @@ public class TestFilesetEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", fileset.name()); FilesetChange change = FilesetChange.setProperty("a", "b"); dispatcher.alterFileset(identifier, change); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterFilesetEvent.class, event.getClass()); FilesetInfo filesetInfo = ((AlterFilesetEvent) event).updatedFilesetInfo(); @@ -111,7 +111,7 @@ public class TestFilesetEvent { void testDropFilesetEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", fileset.name()); dispatcher.dropFileset(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropFilesetEvent.class, event.getClass()); Assertions.assertTrue(((DropFilesetEvent) event).isExists()); @@ -121,7 +121,7 @@ public class TestFilesetEvent { void testListFilesetEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); dispatcher.listFilesets(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListFilesetEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListFilesetEvent) event).namespace()); @@ -136,7 +136,7 @@ public class TestFilesetEvent { fileset.type(), fileset.storageLocation(), fileset.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateFilesetEvent.class, event.getClass()); FilesetInfo filesetInfo = ((CreateFilesetEvent) event).createdFilesetInfo(); @@ -152,7 +152,7 @@ public class TestFilesetEvent { CallerContext callerContext = CallerContext.builder().withContext(contextMap).build(); CallerContext.CallerContextHolder.set(callerContext); String fileLocation = dispatcher.getFileLocation(identifier, "test"); - Event event1 = dummyEventListener.popEvent(); + Event event1 = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event1.identifier()); Assertions.assertEquals(GetFileLocationEvent.class, event1.getClass()); String actualFileLocation = ((GetFileLocationEvent) event1).actualFileLocation(); @@ -180,7 +180,7 @@ public class TestFilesetEvent { fileset.type(), fileset.storageLocation(), fileset.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -194,7 +194,7 @@ public class TestFilesetEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "fileset"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadFileset(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -207,7 +207,7 @@ public class TestFilesetEvent { FilesetChange change = FilesetChange.setProperty("a", "b"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterFileset(identifier, change)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -221,7 +221,7 @@ public class TestFilesetEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "fileset"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropFileset(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -233,7 +233,7 @@ public class TestFilesetEvent { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listFilesets(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -247,7 +247,7 @@ public class TestFilesetEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.getFileLocation(identifier, "/test")); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(GetFileLocationFailureEvent.class, event.getClass()); Assertions.assertEquals( diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestMetalakeEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestMetalakeEvent.java index a31ce9338..319ac641f 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestMetalakeEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestMetalakeEvent.java @@ -63,7 +63,7 @@ public class TestMetalakeEvent { void testCreateMetalakeEvent() { NameIdentifier identifier = NameIdentifier.of("metalake"); dispatcher.createMetalake(identifier, metalake.comment(), metalake.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateMetalakeEvent.class, event.getClass()); MetalakeInfo metalakeInfo = ((CreateMetalakeEvent) event).createdMetalakeInfo(); @@ -74,7 +74,7 @@ public class TestMetalakeEvent { void testLoadMetalakeEvent() { NameIdentifier identifier = NameIdentifier.of("metalake"); dispatcher.loadMetalake(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadMetalakeEvent.class, event.getClass()); MetalakeInfo metalakeInfo = ((LoadMetalakeEvent) event).loadedMetalakeInfo(); @@ -86,7 +86,7 @@ public class TestMetalakeEvent { NameIdentifier identifier = NameIdentifier.of("metalake"); MetalakeChange metalakeChange = MetalakeChange.setProperty("a", "b"); dispatcher.alterMetalake(identifier, metalakeChange); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterMetalakeEvent.class, event.getClass()); MetalakeInfo metalakeInfo = ((AlterMetalakeEvent) event).updatedMetalakeInfo(); @@ -100,7 +100,7 @@ public class TestMetalakeEvent { void testDropMetalakeEvent() { NameIdentifier identifier = NameIdentifier.of("metalake"); dispatcher.dropMetalake(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropMetalakeEvent.class, event.getClass()); Assertions.assertTrue(((DropMetalakeEvent) event).isExists()); @@ -109,7 +109,7 @@ public class TestMetalakeEvent { @Test void testListMetalakeEvent() { dispatcher.listMetalakes(); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertNull(event.identifier()); Assertions.assertEquals(ListMetalakeEvent.class, event.getClass()); } @@ -122,7 +122,7 @@ public class TestMetalakeEvent { () -> failureDispatcher.createMetalake( identifier, metalake.comment(), metalake.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -136,7 +136,7 @@ public class TestMetalakeEvent { NameIdentifier identifier = NameIdentifier.of(metalake.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadMetalake(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -150,7 +150,7 @@ public class TestMetalakeEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterMetalake(identifier, metalakeChange)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -166,7 +166,7 @@ public class TestMetalakeEvent { NameIdentifier identifier = NameIdentifier.of(metalake.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropMetalake(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -177,7 +177,7 @@ public class TestMetalakeEvent { void testListMetalakeFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listMetalakes()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertNull(event.identifier()); Assertions.assertEquals(ListMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestPartitionEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestPartitionEvent.java index 408330a40..a1aa8aab2 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestPartitionEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestPartitionEvent.java @@ -110,7 +110,7 @@ public class TestPartitionEvent { void testAddPartitionEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.addPartition(identifier, partition); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AddPartitionEvent.class, event.getClass()); PartitionInfo partitionInfo = ((AddPartitionEvent) event).createdPartitionInfo(); @@ -121,7 +121,7 @@ public class TestPartitionEvent { void testDropPartitionEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.dropPartition(identifier, partition.name()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropPartitionEvent.class, event.getClass()); Assertions.assertEquals(false, ((DropPartitionEvent) event).isExists()); @@ -131,7 +131,7 @@ public class TestPartitionEvent { void testPartitionExistsEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.partitionExists(identifier, partition.name()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(PartitionExistsEvent.class, event.getClass()); Assertions.assertEquals(false, ((PartitionExistsEvent) event).isExists()); @@ -141,7 +141,7 @@ public class TestPartitionEvent { void testListPartitionEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.listPartitions(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(ListPartitionEvent.class, event.getClass()); Assertions.assertEquals(identifier, ((ListPartitionEvent) event).identifier()); @@ -151,7 +151,7 @@ public class TestPartitionEvent { void testListPartitionNamesEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.listPartitionNames(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(ListPartitionNamesEvent.class, event.getClass()); Assertions.assertEquals(identifier, ((ListPartitionNamesEvent) event).identifier()); @@ -161,7 +161,7 @@ public class TestPartitionEvent { void testPurgePartitionEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.purgePartition(identifier, partition.name()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(PurgePartitionEvent.class, event.getClass()); Assertions.assertEquals(identifier, ((PurgePartitionEvent) event).identifier()); @@ -173,7 +173,7 @@ public class TestPartitionEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.addPartition(identifier, partition)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(AddPartitionFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, ((AddPartitionFailureEvent) event).exception().getClass()); @@ -187,7 +187,7 @@ public class TestPartitionEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropPartition(identifier, partition.name())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(DropPartitionFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, @@ -201,7 +201,7 @@ public class TestPartitionEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.partitionExists(identifier, partition.name())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(PartitionExistsFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, @@ -214,7 +214,7 @@ public class TestPartitionEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listPartitions(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListPartitionFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, @@ -227,7 +227,7 @@ public class TestPartitionEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listPartitionNames(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListPartitionNamesFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, @@ -241,7 +241,7 @@ public class TestPartitionEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.purgePartition(identifier, partition.name())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(PurgePartitionFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestSchemaEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestSchemaEvent.java index d9af6a155..c2c0d7e44 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestSchemaEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestSchemaEvent.java @@ -66,7 +66,7 @@ public class TestSchemaEvent { void testCreateSchemaEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); dispatcher.createSchema(identifier, "", ImmutableMap.of()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateSchemaEvent.class, event.getClass()); SchemaInfo schemaInfo = ((CreateSchemaEvent) event).createdSchemaInfo(); @@ -77,7 +77,7 @@ public class TestSchemaEvent { void testLoadSchemaEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); dispatcher.loadSchema(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadSchemaEvent.class, event.getClass()); SchemaInfo schemaInfo = ((LoadSchemaEvent) event).loadedSchemaInfo(); @@ -88,7 +88,7 @@ public class TestSchemaEvent { void testListSchemaEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); dispatcher.listSchemas(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListSchemaEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListSchemaEvent) event).namespace()); } @@ -98,7 +98,7 @@ public class TestSchemaEvent { SchemaChange schemaChange = SchemaChange.setProperty("a", "b"); NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); dispatcher.alterSchema(identifier, schemaChange); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterSchemaEvent.class, event.getClass()); @@ -113,7 +113,7 @@ public class TestSchemaEvent { void testDropSchemaEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); dispatcher.dropSchema(identifier, true); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropSchemaEvent.class, event.getClass()); Assertions.assertEquals(true, ((DropSchemaEvent) event).cascade()); @@ -126,7 +126,7 @@ public class TestSchemaEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.createSchema(identifier, schema.comment(), schema.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -139,7 +139,7 @@ public class TestSchemaEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadSchema(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -154,7 +154,7 @@ public class TestSchemaEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterSchema(identifier, schemaChange)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -168,7 +168,7 @@ public class TestSchemaEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropSchema(identifier, true)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -181,7 +181,7 @@ public class TestSchemaEvent { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listSchemas(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTableEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTableEvent.java index bf427f01f..11507c343 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTableEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTableEvent.java @@ -84,7 +84,7 @@ public class TestTableEvent { table.distribution(), table.sortOrder(), table.index()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateTableEvent.class, event.getClass()); TableInfo tableInfo = ((CreateTableEvent) event).createdTableInfo(); @@ -95,7 +95,7 @@ public class TestTableEvent { void testLoadTableEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", table.name()); dispatcher.loadTable(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadTableEvent.class, event.getClass()); TableInfo tableInfo = ((LoadTableEvent) event).loadedTableInfo(); @@ -107,7 +107,7 @@ public class TestTableEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", table.name()); TableChange change = TableChange.setProperty("a", "b"); dispatcher.alterTable(identifier, change); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterTableEvent.class, event.getClass()); TableInfo tableInfo = ((AlterTableEvent) event).updatedTableInfo(); @@ -120,7 +120,7 @@ public class TestTableEvent { void testDropTableEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", table.name()); dispatcher.dropTable(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropTableEvent.class, event.getClass()); Assertions.assertEquals(true, ((DropTableEvent) event).isExists()); @@ -130,7 +130,7 @@ public class TestTableEvent { void testPurgeTableEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", table.name()); dispatcher.purgeTable(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(PurgeTableEvent.class, event.getClass()); Assertions.assertEquals(true, ((PurgeTableEvent) event).isExists()); @@ -140,7 +140,7 @@ public class TestTableEvent { void testListTableEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); dispatcher.listTables(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListTableEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListTableEvent) event).namespace()); @@ -161,7 +161,7 @@ public class TestTableEvent { table.distribution(), table.sortOrder(), table.index())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -174,7 +174,7 @@ public class TestTableEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "table", table.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadTable(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -187,7 +187,7 @@ public class TestTableEvent { TableChange change = TableChange.setProperty("a", "b"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterTable(identifier, change)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -201,7 +201,7 @@ public class TestTableEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "table", table.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropTable(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -213,7 +213,7 @@ public class TestTableEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "table", table.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.purgeTable(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(PurgeTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -225,7 +225,7 @@ public class TestTableEvent { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listTables(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListTableFailureEvent.class, event.getClass()); Assertions.assertEquals( diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java index cf6100648..268c628c5 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java @@ -65,7 +65,7 @@ public class TestTopicEvent { void testCreateTopicEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); dispatcher.createTopic(identifier, topic.comment(), null, topic.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateTopicEvent.class, event.getClass()); TopicInfo topicInfo = ((CreateTopicEvent) event).createdTopicInfo(); @@ -76,7 +76,7 @@ public class TestTopicEvent { void testLoadTopicEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); dispatcher.loadTopic(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadTopicEvent.class, event.getClass()); TopicInfo topicInfo = ((LoadTopicEvent) event).loadedTopicInfo(); @@ -88,7 +88,7 @@ public class TestTopicEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); TopicChange topicChange = TopicChange.setProperty("a", "b"); dispatcher.alterTopic(identifier, topicChange); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterTopicEvent.class, event.getClass()); TopicInfo topicInfo = ((AlterTopicEvent) event).updatedTopicInfo(); @@ -101,7 +101,7 @@ public class TestTopicEvent { void testDropTopicEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); dispatcher.dropTopic(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropTopicEvent.class, event.getClass()); Assertions.assertEquals(true, ((DropTopicEvent) event).isExists()); @@ -111,7 +111,7 @@ public class TestTopicEvent { void testListTopicEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); dispatcher.listTopics(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListTopicEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListTopicEvent) event).namespace()); @@ -123,7 +123,7 @@ public class TestTopicEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.createTopic(identifier, topic.comment(), null, topic.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateTopicFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -136,7 +136,7 @@ public class TestTopicEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadTopic(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadTopicFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -150,7 +150,7 @@ public class TestTopicEvent { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterTopic(identifier, topicChange)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterTopicFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -164,7 +164,7 @@ public class TestTopicEvent { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropTopic(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropTopicFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -176,7 +176,7 @@ public class TestTopicEvent { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listTopics(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListTopicFailureEvent.class, event.getClass()); Assertions.assertEquals(