Hello,
I am currently working on my masters and I encountered a difficult problem. Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal mechanisms of creating custom sinks and sources to receive from and send to different data stream processors. I am starting a separate process (message-buffer-process) in those custom sinks and sources to communicate and buffer data into that message-buffer-process. My implementation is created with Maven and it could potentially be added as an dependency. Problem: I already tested my implementation by adding it as an dependency to a simple Flink word-count example. The test was within an IDE which works perfectly fine. But when i package that Flink work-count example and try to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found: In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht gefunden oder geladen werden" Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be found or loaded" Code snipplets: Example - Adding my custom sink to send data to another data stream processor: dataStream.addSink( (SinkFunction)DSPConnectorFactory .getInstance() .createSinkConnector( new DSPConnectorConfig .Builder("localhost", 9656) .withDSP("flink") .withBufferConnectorString("buffer-connection-string") .withHWM(20) .withTimeout(10000) .build())); The way i am trying to start the separate buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, addSentMessagesFrame); How JavaProcessBuilder.exec looks like: public static Process exec(Class javaClass, String connectionString, boolean addSentMessagesFrame) throws IOException, InterruptedException { String javaHome = System.getProperty("java.home"); String javaBin = javaHome + File.separator + "bin" + File.separator + "java"; String classpath = System.getProperty("java.class.path"); String className = javaClass.getCanonicalName(); System.out.println("Trying to build process " + classpath + " " + className); ProcessBuilder builder = new ProcessBuilder( javaBin, "-cp", classpath, className, connectionString, Boolean.toString(addSentMessagesFrame)); builder.redirectOutput(ProcessBuilder.Redirect.INHERIT); builder.redirectError(ProcessBuilder.Redirect.INHERIT); Process process = builder.start(); return process; } I also tried running that message-buffer process separately in another maven project and its packaged .jar file. That worked perfectly fine too. That is why I am assuming that my approach is not appropriate for running in Flink. Did I miss something and starting my approach doesn't actually work within Flink's context? I hope the information I gave you is sufficient to help understanding my issue. If you need any more information feel free to message me! Thanks for any help! With best regards