Hi,

I have written a simple test program as below

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.kafka
object md_streaming
{
  private var zookeeperUrl = "rhes75:2181"
  private var requestConsumerId = null
  private var impressionConsumerId = null
  private var clickConsumerId = null
  private var conversionConsumerId = null
  private var requestTopicName = null
  private var impressionTopicName = null
  private var clickTopicName = null
  private var conversionTopicName = null
  private var requestThreads = 0
  private var impressionThreads = 0
  private var clickThreads = 0
  private var conversionThreads = 0
  private var flinkAppName = "md_streaming"
  private var bootstrapServers = "rhes75:9092, rhes75:9093, rhes75:9094"
  private var schemaRegistryURL = "http://rhes75:8081";
  private var zookeeperConnect = "rhes75:2181"
  private var zookeeperConnectionTimeoutMs = "10000"
  private var rebalanceBackoffMS = "15000"
  private var zookeeperSessionTimeOutMs = "15000"
  private var autoCommitIntervalMS = "12000"
  private var topicsValue = "final"
  private var memorySet = "F"
  private var enableHiveSupport = null
  private var enableHiveSupportValue = "true"
  private var sparkStreamingReceiverMaxRateValue = "0"
  private var checkpointdir = "/checkpoint"
  private var hbaseHost = "rhes75"
  private var zookeeperHost = "rhes564"
  private var zooKeeperClientPort = "2181"
  private var writeDirectory = "hdfs://rhes75:9000/tmp/flink/"
  private var fileName = "md_streaming.txt"
  private val maxServingDelay = 60
  private val servingSpeedFactor = 600f
  private var batchInterval = 2
  private val countWindowLength = 4 // window size in sec
  private val countWindowFrequency =  2   // window trigger interval in sec
  private val earlyCountThreshold = 50
  private val writeToElasticsearch = false // set to true to write results
to Elasticsearch
  private val elasticsearchHost = "" // look-up hostname in Elasticsearch
log output
  private val elasticsearchPort = 9300

  def main(args: Array[String])
  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", bootstrapServers)
    properties.setProperty("zookeeper.connect", zookeeperConnect)
    properties.setProperty("group.id", flinkAppName)
    properties.setProperty("auto.offset.reset", "latest")

    val myConsumer = env.addSource(new
FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(),
properties))

    myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
    env.addSource(myConsumer).print()
    //val sink = kafkaSource.writeAsText(writeDirectory+fileName,
FileSystem.WriteMode.OVERWRITE)
    //env.execute("Flink Kafka Example writing to "+writeDirectory+fileName)
    env.execute("Flink simple output")
  }
}

However, when compiling I am getting the following errors

[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:98:
not found: type CustomWatermarkEmitter
[error]     myConsumer.assignTimestampsAndWatermarks(new
CustomWatermarkEmitter())
[error]                                                  ^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99:
type mismatch;
[error]  found   :
org.apache.flink.streaming.api.datastream.DataStreamSource[String]
[error]  required:
org.apache.flink.streaming.api.functions.source.SourceFunction[?]
[error]     env.addSource(myConsumer).print()
[error]                   ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
Completed compiling
Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
Could not build the program from JAR file.

I don't see why it is failing. Appreciate any suggestions.

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to