In the test application, I create a DStream by connect with a socket.
Then I want to count the RDDs in the DStream which matches with another
reference RDD.
Below is the Java code for my application.
======
public class TestSparkStreaming {
public static void main(String[] args) {
// Function to make a pair of String
class StringToPair implements PairFunction<String, String,
String> {
String value_;
StringToPair(String value) {
value_ = value;
}
@Override
public Tuple2<String, String> call(String arg0)
throws Exception {
return new Tuple2<String, String>(arg0,
value_);
}
}
JavaStreamingContext jssc = new
JavaStreamingContext("local", "TestSparkStreaming", new Duration(1000));
JavaReceiverInputDStream<String> networkevents =
jssc.socketTextStream("localhost", 9999);
// Pair input line with "world"
JavaPairDStream<String, String> streamEvents =
networkevents.mapToPair(new StringToPair("world"));
// Construct "hello" -> "spark" pair for input line to join with
JavaSparkContext sc = new JavaSparkContext(new SparkConf());
List<String> list = Arrays.asList("hello");
JavaRDD<String> reference = sc.parallelize(list);
final JavaPairRDD<String, String> referenceData =
reference.mapToPair(new StringToPair("spark"));
class MatchInputLine implements
PairFunction<Tuple2<String, String>, String, Long> {
@Override
public Tuple2<String, Long> call(
Tuple2<String, String> t) throws
Exception {
final String inputKey = t._1;
final String inputValue = t._2;
final List<String> ret =
referenceData.lookup(inputKey);
return new Tuple2<String, Long>(inputKey,
new Long((ret != null) ? ret.size() : 0));
}
}
// Construct an output DStream if matched
JavaPairDStream<String, Long> joinedStream =
streamEvents.mapToPair(new MatchInputLine());
// Count the output
class Count implements Function2<Long, Long, Long> {
@Override
public Long call(Long v1, Long v2) throws Exception
{
return v1 + v2;
}
}
JavaPairDStream<String, Long> aggregatedJoinedStream =
joinedStream.reduceByKey(new Count());
// Print the output
aggregatedJoinedStream.count().print();
jssc.start();
jssc.awaitTermination();
}
}
======
I'm testing on Windows in local mode (1.0.0). After I start the socket
server (the "nc" program mentioned in Spark's document) and submit the
packaged jar into Spark, I expect to see the output when I type "hello" in.
However, I didn't see any output. I saw below message in the console where I
submit the jar.
======
14/06/18 18:17:48 INFO JobScheduler: Added jobs for time 1403086668000 ms
14/06/18 18:17:48 INFO MemoryStore: ensureFreeSpace(12) called with
curMem=0, maxMem=1235327385
14/06/18 18:17:48 INFO MemoryStore: Block input-0-1403086668400 stored as
bytesto memory (size 12.0 B, free 1178.1 MB)
14/06/18 18:17:48 INFO BlockManagerInfo: Added input-0-1403086668400 in
memory on PEK-WKST68449:60769 (size: 12.0 B, free: 1178.1 MB)
14/06/18 18:17:48 INFO BlockManagerMaster: Updated info of block
input-0-1403086668400
14/06/18 18:17:48 INFO SendingConnection: Initiating connection to
[PEK-WKST68449/10.101.3.75:60769]
14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from
[PEK-WKST68449/10.101.3.75]
14/06/18 18:17:48 INFO SendingConnection: Connected to
[PEK-WKST68449/10.101.3.75:60769], 1 messages pending
14/06/18 18:17:48 WARN BlockManager: Block input-0-1403086668400 already
existson this machine; not re-adding it
14/06/18 18:17:48 INFO SendingConnection: Initiating connection to
[/127.0.0.1:60789]
14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from
[127.0.0.1/127.0.0.1]
14/06/18 18:17:48 INFO SendingConnection: Connected to [/127.0.0.1:60789], 1
messages pending
14/06/18 18:17:48 INFO BlockGenerator: Pushed block input-0-1403086668400
14/06/18 18:17:49 INFO ReceiverTracker: Stream 0 received 1 blocks
14/06/18 18:17:49 INFO JobScheduler: Added jobs for time 1403086669000 ms
======
I see one "Waiting Batches" in Spark's monitoring UI. I'm not sure if that's
related with the problem.
Can you suggest about the problem? I guess this is a basic question about
reduce function.
I will appreciate any help, thank you!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-print-a-derived-DStream-after-reduceByKey-tp7837.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.