Hi,

Regarding your last question, sorry I don’t know about ActiveMQ connectors.

I’m not sure exactly how you are implementing your SourceFunction. Generally 
speaking `run()` method is executed in one thread, and other operations like 
checkpointing, timers (if any) are executed from another thread. In order to 
synchronise between those, user is expected to acquire checkpoint lock in the 
`run()` method as it’s documented [1].

Note that if you want your state (your HashMap) to be actually checkpointed, it 
must be either already defined as Flink manage’d state (like `ListState` in the 
example [1]), or you must copy content of your `HashMap` to Flink managed state 
during `snapshotState` call.

Note 2, also keep in mind we are in the process of reimplementing source 
interfaces [2] and probably Flink 1.11 will offer a new and better API for that 
(SourceReader instead of SourceFunction). 

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
 
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html>
 
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>

> On 29 Jan 2020, at 13:08, OskarM <pent...@gmail.com> wrote:
> 
> Hi all,
> 
> I am using Flink with Bahir's Apache ActiveMQ connector. However it's quite
> dated and poses many limitations, most notably the source supports only
> ByteMessages, does not support parallelism and has a bug that is only fixed
> in a snapshot version.
> 
> So I started implementing my own SourceFunction (still with parallelism of
> only 1) based on AMQSource.
> I want it to support Flink's checkpointing and make it work with ActiveMQ
> acks.
> AMQSource uses ordinary HashMap to store Messages to be acked in the broker
> and this is where my question arises.
> 
> Is the HashMap safe to use here?
> 
> Please correct me if I'm wrong, but my understanding is that /run/ method is
> executed in one thread and /acknowledgeIDs/ in another so there is a
> possibility of thread race (even if we assume all the message ids are
> unique).
> 
> Also, do you know of any ActiveMQ specific (or JMS in general), more
> up-to-date connectors I could use which do not have the issues mentioned
> above?
> 
> Thanks,
> Oskar
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to