Hi,I'm new to spark, and am facing a peculiar problem.
I'm writing a simple Java Driver program where i'm creating Key / Value data
structure and collecting them, once created. The problem i'm facing is that,
when i increase the iterations of a for loop which creates the ArrayList of
Long Values which i have to put into the Key / Value data structure and save
in Spark as a Java Collection, the serialized size of tasks also increases
proportionately.
e.g: for Loop count : 10 Task Size : 1120 bytes
for Loop Count : 10000 Task Size : 33402 bytes for Loop
Count : 10000000 Task Size : 453434 bytes etc.
I'm not able to understand why Task size increases, i tried to run the same
example via Spark Shell, and i noticed the Task size remains the same,
irrespective of the loop iteration count.
Code :
@Override
public void execute() {
// do something
List numbers = new ArrayList();
JavaRDD distData = null;
JavaPairRDD<String, Long> mapOfKeys = null;
JavaRDD keysRDD = null;
class ByKeyImpl implements Function<Long, String>, Serializable {
/** * */
private static final long serialVersionUID = 5749098182016143296L;
public String call(Long paramT1) throws Exception {
// TODO Auto-generated method stub
StringBuilder builder = new
StringBuilder();
builder.append(paramT1).append(',').append(paramT1 +
1); return builder.toString();
} }
System.out.println(" ************** STARTING BENCHMARK EXAMPLE
...*****************");
while(true) { System.out.println(" ************** DO
YOU WANT TO CONTINUE
? (YES/NO) *****************");
BufferedReader reader = new BufferedReader(new
InputStreamReader(System.in)); try {
String continueString =
reader.readLine();
if("yes".equalsIgnoreCase(continueString)) {
if( numbers.size() == 0 ) {
// List not populated
for (long i = 0; i < num; i++) {
numbers.add(i);
}
}
// at this time numbers has
long values in it. // check for RDD
if already created or not.
if( distData == null) {
System.out.println("********************
NEW RDD CREATED.********************");
if ( numPartitions > 0) {
distData =
sc.parallelize(numbers,numPartitions)
; } else
{ distData
= sc.parallelize(numbers)
; }
}
// at this time, RDD is already
present or newly created //
check if map is null or not
if(mapOfKeys == null) {
mapOfKeys = distData
.keyBy(new ByKeyImpl());
keysRDD = mapOfKeys.keys();
keysRDD.persist(StorageLevel.MEMORY_ONLY());
}
System.out.println("******** DO
YOU WANT TO COUNT OR COLLECT THE
COLLECTION ? *******************");
String inputOperation =
reader.readLine();
if("count".equalsIgnoreCase(inputOperation)) {
long startTime =
Calendar.getInstance().getTimeInMillis();
System.out.println(" START
Time of Function ... *** " + startTime);
System.out.println("*************** KEYS COUNT IS **************** "
+
keysRDD.count())
;
long endTime = Calendar.getInstance().getTimeInMillis();
System.out.println(" END Time of Function ... *** " + endTime
+ "
and difference is ************"
+ ((endTime - startTime) / 1000) + "
sec(s)..."); } else if
("collect".equalsIgnoreCase(inputOperation)) {
long startTime = Calendar.getInstance().getTimeInMillis();
System.out.println(" START Time of Function ... *** " + startTime);
System.out.println("************* AFTER COLLECTING KEYS COUNT IS
**************** "
+
keysRDD.collect().size()
);
long endTime =
Calendar.getInstance().getTimeInMillis();
System.out.println(" END Time of Function ... *** " + endTime
+ "
and difference is ************"
+ ((endTime - startTime) / 1000) + "
sec(s)...");
}
} else
if("no".equalsIgnoreCase(continueString)) {
System.exit(0);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace(); }
} }
}
In the code, the more the count of loop iterations, the collect / count,
sends more data along with the Task size. Is there any way to reduce this
????????-- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-Size-Increases-when-using-loops-tp17694.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
