Lin,
Il giorno mer 8 set 2021 alle ore 09:40 Lin Lin <[email protected]> ha
scritto:
>
> > I also share this problem, because if you want to efficiently implement
> > message filtering you need to do it in the broker side.
> >
> > I am not sure that making the full Dispatcher pluggable is a good idea,
> > because the code is too complex and also
> > it really depends on the internals of the Broker.
> >
> > If we make this pluggable that we must define a limited private but
> > "stable" API.
> >
> > My suggestion is to define particular needs and then add features to make
> > pluggable single specific parts
> > of the dispatcher.
> >
> > For instance I would add some support for "Message filtering", leaving
> the
> > implementation of the "filter" to a plugin.
> > This way you could implement filtering using JMS rules, or using other
> > metadata or security related information
> >
> > Regards
> >
> > Enrico
> >
>
>
>
> Hi, Enrico:
>
> Thank you for your feedback.
>
> We now have this method AbstractBaseDispatcher#filterEntriesForConsumer
> I think we can plug-in this method. Do you think this is okay?
>
> Provider:
> ```
> public interface EntriesFilterProvider {
>
> // Use `EntriesFilterProvider` to create `EntriesFilter`
> EntriesFilter createEntriesFilter(Subscription subscription);
>
> static EntriesFilterProvider
> createEntriesFilterProvider(ServiceConfiguration serviceConfiguration)
> {
> // According to `EntriesFilterProviderClassName`, create
> Provider through reflection
> }
>
> }
> ```
>
> Add an interface for filtering:
>
> public interface EntriesFilter {
> filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper,
> int entryWrapperOffset,
> List<Entry> entries, EntryBatchSizes batchSizes,
> SendMessageInfo sendMessageInfo,
> EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor,
> boolean isReplayRead)
>
I believe that this is still using too many internal Pulsar classes.
In your usecases, what do you need for filtering ?
In the usecases I know I need:
- the Message (metadata, headers, payload)
- something about the Consumer (this should connect to adding some
"metadata" to the Consumer and to the Subcription, but this would be
another story)
What about:
public interface MessageFilter {
enum FilterOutcome {
ACCEPT, -> deliver to the Consumer
REJECT, -> skip the message
SYSTEM -> use standard system processing
}
public FilterOutcome filterMessages(List<MessageWrapper> messages,
FilterContext context) throws Exception;
}
interface MessageWrapper {
....allow to access Message payload, metadata, headers...
}
interface FilterContext {
...isReplayRead,
...access acks
...access ManagedCursor
}
This way the implementation of the filter will not use internal APIs that
evolve in Pulsar sometimes even in point releases.
Enrico
> }
>
>
> Regards
>
> Lin
>