As I think this will possibly help others, I wanted to provide what I found
that ended up solving this problem.

The key thing is that the JVM needed to be forked, so needed this to be
added in:
ThisBuild / fork := true

final build.sbt file:

ThisBuild / scalaVersion     := "3.7.1"
ThisBuild / version          := "0.1.0-SNAPSHOT"
ThisBuild / scalacOptions    += "-release:17"
ThisBuild / javacOptions     ++= Seq("--release", "17")
ThisBuild / fork             := true


val flinkVersion = "2.0.0"
val flinkConnectorVersion = "4.0.0-2.0"


lazy val flink_simple_testing = project
  .in(file("flink_simple_testing"))
  .settings(
    name := "flink-testing",
    libraryDependencies ++= Seq(
      "org.apache.flink" % "flink-core" % flinkVersion,
      "org.apache.flink" % "flink-streaming-java" % flinkVersion,
      "org.apache.flink" % "flink-clients" % flinkVersion,
      "org.apache.flink" % "flink-runtime" % flinkVersion,
    )
  )


final script file:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.datastream.DataStreamSource


@main def main(): Unit = {
  val env = StreamExecutionEnvironment
    .getExecutionEnvironment
    .setParallelism(1)

  val data: DataStreamSource[String] = env.fromData(
    "this", "is a", "test"
  )

  data.print()

  val p = env.getExecutionPlan
  println(p)

  env.execute()
}





On Thu, Jul 3, 2025 at 3:27 PM Jeremie Doehla <jeremie.doe...@gmail.com>
wrote:

> Hello --
> I was hoping to get some help getting things set up as something isn't
> working quite right and not sure what's missing.
> Thought I would try and just copy something cited as an example in case I
> missed something with my own code, so I pulled the example I see here:
> https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/datastream/overview/
>
> Updated slightly to this:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.time.Duration;
> import org.apache.flink.util.Collector;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
>
>
> public class main {
>
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         DataStream<Tuple2<String, Integer>> dataStream = env
>                 .socketTextStream("localhost", 9999)
>                 .flatMap(new Splitter())
>                 .keyBy(value -> value.f0)
>                 
> .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
>                 .sum(1);
>
>         dataStream.print();
>
>         env.execute("Window WordCount");
>     }
>
>     public static class Splitter implements FlatMapFunction<String, 
> Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String sentence, Collector<Tuple2<String, 
> Integer>> out) throws Exception {
>             for (String word: sentence.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
>
> }
>
>
> As I'm ultimately intending to use scala, I'm leveraging sbt to try and
> run the java code. With that, I've got the following setup in my build.sbt
> file:
>
> ThisBuild / scalaVersion     := "3.7.1"
> ThisBuild / version          := "0.1.0-SNAPSHOT"
> ThisBuild / scalacOptions    += "-release:17"
> ThisBuild / javacOptions     ++= Seq("--release", "17")
>
>
> val flinkVersion = "2.0.0"
> val flinkConnectorVersion = "4.0.0-2.0"
>
> lazy val websiteExample = project
>   .in(file("website_example"))
>   .settings(
>     name := "flink-testing",
>     libraryDependencies ++= Seq(
>       "org.apache.flink" % "flink-core" % flinkVersion,
>       "org.apache.flink" % "flink-streaming-java" % flinkVersion,
>       "org.apache.flink" % "flink-clients" % flinkVersion,
>       "org.apache.flink" % "flink-runtime" % flinkVersion,
>       "org.apache.flink" % "flink-datastream" % flinkVersion
>     )
>   )
>
>
> Everything compiles fine, but when running I get this error:
>
> [error] org.apache.flink.util.FlinkException: Failed to execute job
> 'Window WordCount'.
>
> Which traces back to this error:
>
> [error] Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory
>
> [error]  at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>
> Which makes me think I'm missing some package to get the
> SimpleUdfStreamOperatorFactory to be in scope somewhere, but best I can
> tell that should have been captured with the flink-runtime package, right?
>
> So, my question is what might I be missing here to get this to work?
> Perhaps we can update the website documentation to help others out that
> might run into this issue too.
>
>
> In case this helps, I've also got this code:
>
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.datastream.DataStreamSource
>
>
> @main def main(): Unit = {
>   val env = StreamExecutionEnvironment
>     .getExecutionEnvironment
>     .setParallelism(1)
>
>   val data: DataStreamSource[String] = env.fromData(
>     "this", "is a", "test"
>   )
>
>   data.print()
>
>   val p = env.getExecutionPlan
>   println(p)
>
>   env.execute()
> }
>
> which if I execute returns the same error as the example mentioned above
> but might be a bit simpler.
>
> Thanks in advance,
> --Jeremie
>

Reply via email to