Hi Jark Wu, Thanks for your answer,
Here is what I have so far import java.util.Properties import org.apache.flink.table.descriptors.Json import org.apache.flink.table.descriptors.{Kafka, Schema} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types} import org.apache.flink.table.sources import org.apache.flink.table.sinks //import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink object FlinkToSQL extends App{ val b_servers = "..." val zk = "..." val properties = new Properties() properties.setProperty("bootstrap.servers",b_servers) properties.setProperty("zookeeper.connect",zk) properties.setProperty("group.id", "very_small_test") .... more properties val kafkaSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0) val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env, settings) val schema = new Schema().field("fullVisitorId",Types.STRING) .field("eventID",Types.STRING) .field("CustomDimensions",Types.STRING) .field("page",Types.STRING) tableEnv.connect(new Kafka() .version("universal") .topic("very_small_test") .properties(properties) .startFromEarliest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema(schema) .inAppendMode() .registerTableSource("sql_table") tableEnv.connect( new Kafka() .properties(properties) .topic("mafh-output") .sinkPartitionerRoundRobin() ).withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema(schema) .inAppendMode() .registerTableSink("sql_sink") val sqlStatement = "SELECT * from sql_table" val result:Table = tableEnv.sqlQuery(sqlStatement) result.insertInto("sql_sink"); result.printSchema() System.out.println("======================= " + result.toString) tableEnv.execute("SQL test") } Here is my error message: The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at FlinkToSQL$.delayedEndpoint$FlinkToSQL$1(FlinkToSQL.scala:65) at FlinkToSQL$delayedInit$body.apply(FlinkToSQL.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at FlinkToSQL$.main(FlinkToSQL.scala:15) at FlinkToSQL.main(FlinkToSQL.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 12 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No context matches. The following properties are requested: connector.properties.0.key=security.protocol connector.properties.0.value=SSL connector.properties.1.key=key.deserializer connector.properties.1.value=org.apache.kafka.common.serialization.ByteArrayDeserializer connector.properties.2.key=value.deserializer connector.properties.2.value=org.apache.kafka.common.serialization.ByteArrayDeserializer connector.properties.3.key=zookeeper.connect connector.properties.3.value=z-1.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:2181,z-2.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:2181,z-3.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:2181 connector.properties.4.key=ssl.endpoint.identification.algorithm connector.properties.4.value= connector.properties.5.key=group.id connector.properties.5.value=very_small_test connector.properties.6.key=bootstrap.servers connector.properties.6.value=b-2.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:9094,b-1.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:9094,b-3.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:9094 connector.property-version=1 connector.startup-mode=earliest-offset connector.topic=very_small_test connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=jsonschema.0.name=fullVisitorId schema.0.type=VARCHARschema.1.name=eventID schema.1.type=VARCHARschema.2.name=CustomDimensions schema.2.type=VARCHARschema.3.name=page schema.3.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) ... 30 more Command exiting with ret '1' Den lør. 16. maj 2020 kl. 04.57 skrev Jark Wu <imj...@gmail.com>: > Hi, > > Could you share the SQL DDL and the full exception message? It might be > you are using the wrong `connector.version` or other options. > > Best, > Jark > > On Fri, 15 May 2020 at 20:14, Martin Frank Hansen <m...@berlingskemedia.dk> > wrote: > >> Hi, >> >> I am trying to connect to kafka through flink, but having some difficulty >> getting the right table-factory-source. >> >> I currently get the error: NoMatchingTableFactoryException: Could not >> find a suitable table factory for >> 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. my >> sbt file looks like this: >> >> name := "writeToSQL" >> >> version := "0.1" >> >> scalaVersion := "2.11.12" >> val flinkVersion = "1.9.1" >> val hadoopVersion = "3.0.0" >> >> libraryDependencies ++= Seq( >> >> "org.slf4j" % "slf4j-api" % "1.7.15" % "runtime", >> "org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "compile", >> "org.apache.flink" %% "flink-sql-connector-kafka" % flinkVersion % >> "compile", >> "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", >> "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion % >> "provided", >> "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % >> "provided", >> "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided", >> "org.apache.flink" % "flink-table-common" % flinkVersion % "provided", >> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", >> "org.apache.flink" % "flink-table" % flinkVersion % "compile", >> "org.apache.flink" % "flink-json" % flinkVersion % "compile", >> "org.slf4j" % "slf4j-log4j12" % "1.7.25" % "runtime" >> ) >> >> assemblyMergeStrategy in assembly := { >> case path if path.contains("META-INF/services") => MergeStrategy.concat >> case PathList("META-INF", _*) => MergeStrategy.discard >> case _ => MergeStrategy.first >> } >> >> >> From the documentation >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#define-a-tablefactory >> I >> can see what is missing, but I do not know how to solve it. >> >> The documentation says the following: >> Define a TableFactory >> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#define-a-tablefactory> >> >> A TableFactory allows to create different table-related instances from >> string-based properties. All available factories are called for matching to >> the given set of properties and a corresponding factory class. >> >> Factories leverage Java’s Service Provider Interfaces (SPI) >> <https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html> for >> discovering. This means that every dependency and JAR file should contain a >> file org.apache.flink.table.factories.TableFactory in the >> META_INF/services resource directory that lists all available table >> factories that it provides. >> >> But how do I do that? I thought the sbt-file would take care of this. >> >> Any help is highly appreciated! >> >> Best regards >> >> Martin Frank Hansen >> >> >> -- Martin Frank Hansen Data Engineer Digital Service M: +45 25 57 14 18 E: m...@berlingskemedia.dk Pilestræde 34 | DK-1147 København K | T: +45 33 75 75 75 | berlingskemedia.dk