Hello --
I was hoping to get some help getting things set up as something isn't
working quite right and not sure what's missing.
Thought I would try and just copy something cited as an example in case I
missed something with my own code, so I pulled the example I see here:
https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/datastream/overview/

Updated slightly to this:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import org.apache.flink.util.Collector;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;


public class main {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String,
Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}


As I'm ultimately intending to use scala, I'm leveraging sbt to try and run
the java code. With that, I've got the following setup in my build.sbt file:

ThisBuild / scalaVersion     := "3.7.1"
ThisBuild / version          := "0.1.0-SNAPSHOT"
ThisBuild / scalacOptions    += "-release:17"
ThisBuild / javacOptions     ++= Seq("--release", "17")


val flinkVersion = "2.0.0"
val flinkConnectorVersion = "4.0.0-2.0"

lazy val websiteExample = project
  .in(file("website_example"))
  .settings(
    name := "flink-testing",
    libraryDependencies ++= Seq(
      "org.apache.flink" % "flink-core" % flinkVersion,
      "org.apache.flink" % "flink-streaming-java" % flinkVersion,
      "org.apache.flink" % "flink-clients" % flinkVersion,
      "org.apache.flink" % "flink-runtime" % flinkVersion,
      "org.apache.flink" % "flink-datastream" % flinkVersion
    )
  )


Everything compiles fine, but when running I get this error:

[error] org.apache.flink.util.FlinkException: Failed to execute job 'Window
WordCount'.

Which traces back to this error:

[error] Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory

[error]  at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)

Which makes me think I'm missing some package to get the
SimpleUdfStreamOperatorFactory to be in scope somewhere, but best I can
tell that should have been captured with the flink-runtime package, right?

So, my question is what might I be missing here to get this to work?
Perhaps we can update the website documentation to help others out that
might run into this issue too.


In case this helps, I've also got this code:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.datastream.DataStreamSource


@main def main(): Unit = {
  val env = StreamExecutionEnvironment
    .getExecutionEnvironment
    .setParallelism(1)

  val data: DataStreamSource[String] = env.fromData(
    "this", "is a", "test"
  )

  data.print()

  val p = env.getExecutionPlan
  println(p)

  env.execute()
}

which if I execute returns the same error as the example mentioned above
but might be a bit simpler.

Thanks in advance,
--Jeremie

Reply via email to