I wrote a simple helper class, the redis connection are initialized in
set and get methods to store and retreive values from your map
to do this please share :). I am using redis scala client.
object class RedisHelper
Even thought there are multiple instance of map object transient value
object state is accessible across the object, so as the stream is flowing
in the value can be updated based on application logic.
I don't think the valuestate defined in one map function is accessible in
other map function this is my understanding, also you need to be aware
there will be instance of map function created for each of your tuple in
your stream, I had a similar use case where I had to pass in some state
I have a requirement where I want to do aggregation on one data stream
every 5 minutes, a different data stream every 1 minute. I wrote a example
code to test this out but the behavior is different from what I expected ,
I expected the window2 to be called 5 times, and window 1 to called once ,
> How heavy is the processing you are doing? 4500 events/second sounds not
> like a lot of throughput.
What is the flink documentation you were following to set up your cluster , can you point to that ?
can you point to that ?
I am using the flink connector to read from a kafka stream, I ran into the
problem where the flink job went down due to some application error, it was
down for sometime, meanwhile the kafka queue was growing as expected no
consumer to consume from the given group , and when I started the flink it
>> The are two files in the /usr/share/flink/conf directory, and I was
>> trying to do the rolling of application logs which goes to following
>> It does not seem to fully work if there is no data in the kafka stream,
>> the flink application emits this error and bails, could this be missed use
In your pom.xml add the maven.plugins like this, and you will have to add
all the dependent artifacts, this works for me, if you fire mvn clean
compile package, the created jar is a fat jar.
The are two files in the /usr/share/flink/conf directory, and I was trying
to do the rolling of application logs which goes to following directory in
task nodes.
Changing the logback.xml and logback-yarn.xml has no
I am facing this exception repeatedly while trying to consume from kafka
topic. It seems it was reported in 1.0.0 and fixed in 1.0.0, how can I be
sure that is fixed in the version of flink that I am using, does it require
me to install patch updates ?
I had fought with kafka and flink 0.10.2 scala version 2.11, was
never able to get it working confounded with noclassdeffounderror, moved to
flink 1.0.0 with kafka scala version 2.11 things worked for me, if
moving to flink 1.0.0 is an option for you do so.
Answered based on my understanding.
Not a solution for your problem,but an alternative, I wrote my own sink
function where I handle all sql activities(insert/update/select), used a
3rd lib for connection pooling, the code has been running stable in
production without any issue.
You can implement join in flink (which is a inner join) the below mentioned
pseudo code . The below join is for a 5 minute interval, yes will be some
corners cases when the data coming after 5 minutes will be missed out in
the join window, I actually had solved this problem but storing some data
I am trying to use AWS EMR yarn cluster where the flink code runs, in one
of apply window function, I try to set some values in redis it fails. I
have tried to access the same redis with no flink code and get/set works,
but from the flink I get into this exception. Any inputs on what might be
In your pom file you can mention against which version of scala you want to
build and also remember to add the scala version in the artifactId in all
the dependencies which takes scala version, there might be some libraries
which are scala agnostic there you do not have to specify the scala
Nice write up, one question though my understanding of keyed stream is that
it will fork n number of streams from one stream based on n keys, if that
is true it can be pictorially depicted as well and the apply function will
can be shown to operate over the time period by clearly marking a time
I had a similar use case and ended writing the aggregation logic in the
apply function, could not find any better solution.
I have tested kafka with flink 1.0.0 and it works for me. Can't talk about kafka
You will have to include dependent jackson jar in flink server lib folder, or create a fat jar.
or create a fat jar.
val stream:DataStream[String] = env
.addSource(new FlinkKafkaConsumer08[String]("topic_name", new
SimpleStringSchema, prop))
val event:DataStream[SomeEventObj] = stream.map(MyMapFunction)
val tenMinute:DataStream[AggEvents] =
Never mind Till figured out a way, instead of doing the aggregation in
reduce, I moved that logic to apply of the window function.
I have keyed input stream on DateStream(String,Int) and wrote a reduce on
the keyedStream. The reduce is simple one summing up the integer values of
the same key.
val stream = DataStream(String,Int)
val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
Never mind I understand what is going on Aljoscha for each unique key the value count is reset to 0.
value count is reset to 0.
I wrote the below code which will increment a counter for the data in the
datastream, and when I print the counter each time it seems the value is
reinitialised to 0, and it is not incrementing, any thoughts.
class BookingCntFlatMapFunction extends
This kind of class not found exception is a little bit misleading, it is
not the class is not found is the real problem rather than the combination
of the different libraries that are using there is a version compatibility
mismatch, so you will have to go back and check if there is any version
The easier way to debug this would be have prints in the
projectjoinresultmapper and see what data you are getting. It is possible
your original dataset has duplicate rows ?
If it is a one time thing, you could run the kafka consumer script which
has the --from-beginning option, and redirect the output as a socketstream
and consume in flink.
There was a similar question before the answer was to use
org.apache.flink.api.common.io.OutputFormat to do the conversion.
I got it working for 1.0.0.
Yep the same issue as before(class not found) with flink 0.10.2 with scala
version 2.11. I was not able to use scala 2.10 since connector for
flink_connector_kafka for 0.10.2 is not available.
What I noticied was that, if I remove the dependency on
flink-connector-kafka so it is clearly to do something with that
I am tyring to use the flink kafka connector, for this I have specified the
kafka connector dependency and created a fat jar since default flink
installation does not contain kafka connector jars. I have made sure that
You could try this link.
You could I suppose write the dateset to a sink a file and then read the file to a data stream.
file to a data stream.
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment =
try {
val data1:DataStream[String] = env.readTextFile(“somefile.txt”);
