Hi

Finally, i found a configuraiton parameter:  spark.default.parallelism
Change this parmater will finally change the parallel running exeutor 
amount,  although log file still says first 15 tasks ... blabla. 


Any way, my problem is solved.









------------------ Original ------------------
From:                                                                           
                                             "????"                             
                                                       <jun.shen....@qq.com&gt;;
Date:&nbsp;Tue, Dec 22, 2020 10:34 AM
To:&nbsp;"Sean Owen"<sro...@gmail.com&gt;;
Cc:&nbsp;"user"<user@spark.apache.org&gt;;
Subject:&nbsp;Re: No matter how many instances and cores configured for spark 
on k8s, only one executor is reading file



Hi&nbsp; Sean Owen

Great thanks for your reply and suggestion.&nbsp; 
I tried add the second parmater to 30 to func parallelize.&nbsp; But it still 
keeps single executor running tasks. 
But if i add two lines, then can have multiple executors to run tasks.
&nbsp;&nbsp;&nbsp; parallelTasks = spark.sparkContext.parallelize(filenames,30)
&nbsp;&nbsp;&nbsp; allSplitedTasks&nbsp; = parallelTasks.flatMap(lambda&nbsp;x: 
SplitToTasksByExecutorAmount(x,workerAmount))
&nbsp;&nbsp;&nbsp; allSplitedTasks =allSplitedTasks.map(lambda&nbsp;x: 
addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs))
&nbsp;&nbsp;&nbsp; listSplitedTasks = allSplitedTasks.collect()
&nbsp;&nbsp;&nbsp; allSplitedTasks = 
spark.sparkContext.parallelize(listSplitedTasks)
&nbsp;&nbsp;&nbsp; out = allSplitedTasks.flatMap(readPiecesNew)


The added two lines is to terminiate previous rdd via collect() method, then 
create a new rdd based on collected data.&nbsp; &nbsp;Then i see a new issue 
from logs: max 15 tasks per batch.
20/12/22&nbsp;02:07:06&nbsp;INFO MemoryStore: Block broadcast_1 stored 
as&nbsp;values in&nbsp;memory (estimated size 8.4&nbsp;KiB, free 3.4&nbsp;GiB)
20/12/22&nbsp;02:07:06&nbsp;INFO MemoryStore: Block broadcast_1_piece0 stored 
as&nbsp;bytes&nbsp;in&nbsp;memory (estimated size 5.6&nbsp;KiB, free 
3.4&nbsp;GiB)
20/12/22&nbsp;02:07:06&nbsp;INFO BlockManagerInfo: Added broadcast_1_piece0 
in&nbsp;memory on 
shenjun-2ee1217688323457-driver-svc.default.svc:7079&nbsp;(size: 5.6&nbsp;KiB, 
free: 3.4&nbsp;GiB)
20/12/22&nbsp;02:07:06&nbsp;INFO SparkContext: Created broadcast 
1&nbsp;from&nbsp;broadcast at DAGScheduler.scala:1223
20/12/22&nbsp;02:07:06&nbsp;INFO DAGScheduler: Submitting 80&nbsp;missing tasks 
from&nbsp;ResultStage 1&nbsp;(PythonRDD[3] at count at /mdata/iostat.py:354) 
(first 15&nbsp;tasks are for&nbsp;partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 
9, 10, 11, 12, 13, 14))
20/12/22&nbsp;02:07:06&nbsp;INFO TaskSchedulerImpl: Adding task 
set&nbsp;1.0&nbsp;with&nbsp;80&nbsp;tasks


This 15 tasks could be seen from the last second line.&nbsp; &nbsp;Actually, 
not mattter how many elements in rdd allSplitedTasks&nbsp;this value is always 
15.&nbsp; 
Any approach to change it dynamically by programmers?







------------------ Original ------------------
From:                                                                           
                                             "Sean Owen"                        
                                                            
<sro...@gmail.com&gt;;
Date:&nbsp;Mon, Dec 21, 2020 09:53 PM
To:&nbsp;"????"<jun.shen....@qq.com&gt;;
Cc:&nbsp;"user"<user@spark.apache.org&gt;;
Subject:&nbsp;Re: No matter how many instances and cores configured for spark 
on k8s, only one executor is reading file



Pass more partitions to the second argument of parallelize()?

On Mon, Dec 21, 2020 at 7:39 AM ???? <jun.shen....@qq.com&gt; wrote:

Hi


I am now trying to use spark to do tcpdump pcap file analysis.&nbsp; The first 
step is to read the file and parse the content to dataframe according to 
analysis requirements. 


I've made a public folder for all executors so that they can access it directly 
like a local file system. 
Here is the main code:
&nbsp;&nbsp;&nbsp; filename =["/mdata/400m.pcap"]
    #filenames = 
["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2","/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5","/mdata/400m.pcap6",]
&nbsp;&nbsp;&nbsp; tsharkFilter =&nbsp; conf.tsharkFilter
&nbsp;&nbsp;&nbsp; tsharkArgs =&nbsp;&nbsp; conf.tsharkArgs
&nbsp;&nbsp;&nbsp; workerAmount = conf.workerAmount

&nbsp;&nbsp;&nbsp; parallelTasks = spark.sparkContext.parallelize(filenames)
&nbsp;&nbsp;&nbsp; allSplitedTasks&nbsp; = parallelTasks.flatMap(lambda&nbsp;x: 
SplitToTasksByExecutorAmount(x,workerAmount))
&nbsp;&nbsp;&nbsp; allSplitedTasks =allSplitedTasks.map(lambda&nbsp;x: 
addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs))
&nbsp;&nbsp;&nbsp; out = allSplitedTasks.flatMap(readPieces)




Then, the file parsing part is here.
def&nbsp;readPieces(param):
&nbsp;&nbsp;&nbsp; try:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; filename = param['filename']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #batchID = param['batchID']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; startPos = param['startPos']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; endPos = param['endPos']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; count = param['count']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tsharkFilter = param['tsharkFilter'] 
if&nbsp;'tsharkFilter'&nbsp;in&nbsp;param else&nbsp;None
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tsharkArgs = param['tsharkArgs'] 
if&nbsp;'tsharkArgs'&nbsp;in&nbsp;param else&nbsp;None
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; with&nbsp;open(filename, "rb") 
as&nbsp;f:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
if&nbsp;endPos == 0:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 endPos = f.seek(0,2)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 f.seek(0)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; hdr = 
f.read(24)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
f.seek(startPos)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; readLen = 
endPos - startPos
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; content = 
f.read(readLen)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
if&nbsp;count: 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 cmd = ["tshark", "-r", "-","-c", str(count)]
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; else:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 cmd = ["tshark", "-r", "-"]
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
if&nbsp;tsharkArgs: cmd.extend(tsharkArgs)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
if&nbsp;tsharkFilter: cmd.extend(tsharkFilter)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; childProcess 
= Popen(cmd, stdin=PIPE, stdout=PIPE)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; raw 
=b''.join([hdr,content])
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; outStr = 
childProcess.communicate(input=raw )[0]
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; print(outStr)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; lines 
=&nbsp; outStr.split(b'\n')
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
return&nbsp;[ x.split(b'\t') for&nbsp;x in&nbsp;lines if&nbsp;x !=b'']
&nbsp;&nbsp;&nbsp; except:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;[[str(e)]]




The SplitToTasksByExecutorAmount function will go through the file and then 
output a list of dictionary elements. So that, i supporse multiple executors 
would read the file from different startPos and only read to endPos. 
{"filename":filename, "batchID":batchID, "startPos":startPos, "endPos":endPos, 
"count":count}



Then when application is running, i can only see single tshark process running 
in all my k8s nodes.&nbsp; &nbsp; &nbsp;
If i add more filenames into the main code, then the running tshark process 
equals to filename amount.&nbsp; 


Is there some global lock somethere in spark so that same file would only be 
read by single executor per time?&nbsp; Is it possible to enable multiple 
executors to read same file at the same time?




Thanks
Shen Jun

Reply via email to