Annnnd... it's NOT working.
Here's the code:
val bytes = kafkaStream.map({ case (key, messageBytes) =>
messageBytes}) // Map to just get the bytes part out...
val things = bytes.flatMap(bytesArrayToThings) // convert to a thing
val srcDestinations = things.map(thing =>
(ipToString(thing.getSourceIp), Set(ipToString(thing.getDestinationIp))))
// up to this point works fine.
// this fails to print
val srcDestinationSets = srcDestinations.reduceByKey((exist:
Set[String], addl: Set[String]) => exist ++ addl)
What it does...
>From a kafka message containing many "things", convert the message to an
array of said "things", flatMap them out to a stream of 1 "thing" at a
time, pull out and make a Tuple of a (SourceIP, DestinationIP).
ALL THAT WORKS. If I do a "srcDestinations.print()" I get output like the
following, every 5 seconds, which is my batch size.
-------------------------------------------
Time: 1402582000000 ms
-------------------------------------------
(10.30.51.216,Set(10.20.1.1))
(10.20.11.3,Set(10.10.61.98))
(10.20.11.3,Set(10.10.61.95))
...
What I want is a SET of (sourceIP -> Set(all the destination Ips)) Using a
set because as you can see above, the same source may have the same
destination multiple times and I want to eliminate dupes on the destination
side.
When I call the reduceByKey() method, I never get any output. When I do a
"srcDestinationSets.print()" NOTHING EVER PRINTS. Ever. Never.
What am I doing wrong? (The same happens for "reduceByKeyAndWindow(...,
Seconds(5))".)
I'm sure this is something I've done, but I cannot figure out what it was.
Help, please?