Hello, Does anyone have a simple example of Flink Kafka written in Scala?
I've been struggling to make my test program working. Below is my program which has error in addSink (the part of KafkaWordCountProducer is copied from Spark sample program): import java.util.HashMap import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} import org.apache.flink.streaming.api.environment._ import org.apache.flink.streaming.connectors.kafka import org.apache.flink.streaming.connectors.kafka.api._ import org.apache.flink.streaming.util.serialization._ object TestKafka { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) .addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema)) .print println("Hi TestKafka") env.execute("Test Kafka") } } object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + "<messagesPerSec> <wordsPerMessage>") System.exit(1) } val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeeper connection properties val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) // Send some messages while(true) { (1 to messagesPerSec.toInt).foreach { messageNum => val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) .mkString(" ") val message = new ProducerRecord[String, String](topic, null, str) producer.send(message) } Thread.sleep(1000) } } } There is compilation error: [error] .................TestKafka.scala:15: type mismatch; [error] found : org.apache.flink.streaming.util.serialization.SimpleStringSchema [error] required: org.apache.flink.streaming.util.serialization.SerializationSchema[String,Array[Byte]] [error] .addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema)) I changed SimpleStringSchema to SerializationSchema which still doesn't work. I am trying to transit from Spark to Flink, but the samples in Flink are far less than those in Spark. It would be very helpful if there is an example of KafkaWordCount in Scala similar to that in Spark. Thanks, Wendong -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.