Hi There! I have a question regarding the ingestion of files from S3 that are not motivated by the ContinuousFileMonitoringFunction. For a variety of reasons I cannot use the ContinuousFileMonitoringFunction so I'd like to use a Kafka source where a message contains both the path in S3, and the metadata required to consume the S3 files.
It's easy enough to create a KafkaSource and an operator to generate the FileSplits but it seems reusing the ContinuousFileReaderOperator is tricky. This operator requires a reference to the MailboxExecutor that isn't available in the Scala Stream API's. I guess my question is, is it safe to re-create my own version of the ContinuousFileReaderOperator in Scala without the MailboxExecutor? Meaning, is that pattern even necessary anymore? Or is safe to convert to the Java API's who's transform function can provide a reference to the Mailbox executor? Thanks! Darin Amos