Hello,

I am currently working on my masters and I encountered a difficult problem.


Background (for context): I am trying to connect different data stream 
processors. Therefore i am using Flink's internal mechanisms of creating custom 
sinks and sources to receive from and send to different data stream processors. 
I am starting a separate

process (message-buffer-process) in those custom sinks and sources to 
communicate and buffer data into that message-buffer-process.  My 
implementation is created with Maven and it could potentially be added as an 
dependency.


Problem: I already tested my implementation by adding it as an dependency to a 
simple Flink word-count example. The test was within an IDE which works 
perfectly fine. But when i package that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting it as a job, it 
tells me that my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse 
de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht 
gefunden oder geladen werden"

Roughly translated: "Error: Main class 
de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be 
found or loaded"


Code snipplets:

Example - Adding my custom sink to send data to another data stream processor:

dataStream.addSink(
        (SinkFunction)DSPConnectorFactory
                .getInstance()
                .createSinkConnector(
                        new DSPConnectorConfig
                                .Builder("localhost", 9656)
                                .withDSP("flink")
                                
.withBufferConnectorString("buffer-connection-string")
                                .withHWM(20)
                                .withTimeout(10000)
                                .build()));



The way i am trying to start the separate buffer-process: 
JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, 
addSentMessagesFrame);
How JavaProcessBuilder.exec looks like:
public static Process exec(Class javaClass, String connectionString, boolean 
addSentMessagesFrame) throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome +
                File.separator + "bin" +
                File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = javaClass.getCanonicalName();

        System.out.println("Trying to build process " + classpath + " " + 
className);

        ProcessBuilder builder = new ProcessBuilder(
                javaBin, "-cp", classpath, className, connectionString, 
Boolean.toString(addSentMessagesFrame));

        builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        builder.redirectError(ProcessBuilder.Redirect.INHERIT);

        Process process = builder.start();
        return process;
}

I also tried running that message-buffer process separately in another maven 
project and its packaged .jar file. That worked perfectly fine too. That is why 
I am assuming that my approach is not appropriate for running in Flink.
Did I miss something and starting my approach doesn't actually work within 
Flink's context? I hope the information I gave you is sufficient to help 
understanding my issue. If you need any more information feel free to message 
me!

Thanks for any help!

 With best regards

Reply via email to