Hi,
I would like to < unit test > a job flink with Kafka as source (and Sink). I am
trying to use the library scalatest-embedded-kafka to simulate a Kafka for my
test.
For example, I would like to get data (string stream) from Kafka, convert it
intro uppercase and put it into another topic.
Now, I am just trying to use Flink's kafka consumer to read into a topic (with
embedded kafka).
Here is the code for example :
```scala
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.scalatest.{Matchers, WordSpec}
import scala.util.Random
object SimpleFlinkKafkaTest {
SimpleFlinkKafkaTest
val kafkaPort = 9092
val zooKeeperPort = 2181
val groupId = Random.nextInt(1000000).toString
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("zookeeper.connect", "localhost:2181")
props.put("auto.offset.reset", "earliest")
props.put("group.id", groupId)
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
val propsMap = Map(
"bootstrap.servers" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"auto.offset.reset" -> "earliest",
"group.id" -> groupId,
"key.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer"
)
val inputString = "mystring"
val expectedString = "MYSTRING"
}
class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {
"runs with embedded kafka" should {
"work" in {
implicit val config = EmbeddedKafkaConfig(
kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,
customConsumerProperties = SimpleFlinkKafkaTest.propsMap
)
withRunningKafka {
publishStringMessageToKafka("input-topic",
SimpleFlinkKafkaTest.inputString)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val kafkaConsumer = new FlinkKafkaConsumer011(
"input-topic",
new SimpleStringSchema,
SimpleFlinkKafkaTest.props
)
val inputStream = env.addSource(kafkaConsumer)
val outputStream = inputStream.map { msg =>
msg.toUpperCase
}
outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)
env.execute()
consumeFirstStringMessageFrom("output-topic") shouldEqual
SimpleFlinkKafkaTest.expectedString
}
}
}
}
```
The flink process si running but nothing happen. I try ot write into a text
file to see any output but there is nothing into the file.
Any idea ? Does anybody use this library to test a Flink Job using Kafka ?
Thanks in advance,
Thomas