Hi

I am now trying to use spark to do tcpdump pcap file analysis.  The first 
step is to read the file and parse the content to dataframe according to 
analysis requirements. 


I've made a public folder for all executors so that they can access it directly 
like a local file system. 
Here is the main code:
    filename =["/mdata/400m.pcap"]
    
#filenames = ["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2","/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5","/mdata/400m.pcap6",]
    tsharkFilter =  conf.tsharkFilter
    tsharkArgs =   conf.tsharkArgs
    workerAmount = conf.workerAmount

    parallelTasks = spark.sparkContext.parallelize(filenames)
    allSplitedTasks  = parallelTasks.flatMap(lambda x: SplitToTasksByExecutorAmount(x,workerAmount))
    allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs))
    out = allSplitedTasks.flatMap(readPieces)




Then, the file parsing part is here.
def readPieces(param):
    try:
        filename = param['filename']
        #batchID = param['batchID']
        startPos = param['startPos']
        endPos = param['endPos']
        count = param['count']
        tsharkFilter = param['tsharkFilter'] if 'tsharkFilter' in param else None
        tsharkArgs = param['tsharkArgs'] if 'tsharkArgs' in param else None
        
        with open(filename, "rb") as f:
            if endPos == 0:
                endPos = f.seek(0,2)
                f.seek(0)
            hdr = f.read(24)
            f.seek(startPos)
            readLen = endPos - startPos
            content = f.read(readLen)
            if count: 
                cmd = ["tshark", "-r", "-","-c", str(count)]
            else:
                cmd = ["tshark", "-r", "-"]
            if tsharkArgs: cmd.extend(tsharkArgs)
            if tsharkFilter: cmd.extend(tsharkFilter)
            childProcess = Popen(cmd, stdin=PIPE, stdout=PIPE)
            raw =b''.join([hdr,content])
            outStr = childProcess.communicate(input=raw )[0]
            print(outStr)
            lines =  outStr.split(b'\n')
            return [ x.split(b'\t') for x in lines if x !=b'']
    except:
        return [[str(e)]]




The SplitToTasksByExecutorAmount function will go through the file and 
then output a list of dictionary elements. So that, i supporse multiple 
executors would read the file from different startPos and only read to 
endPos. 
{"filename":filename, "batchID":batchID, "startPos":startPos, "endPos":endPos, "count":count}



Then when application is running, i can only see single tshark process running 
in all my k8s nodes.     
If i add more filenames into the main code, then the running tshark process 
equals to filename amount.  


Is there some global lock somethere in spark so that same file would only be 
read by single executor per time?  Is it possible to enable multiple 
executors to read same file at the same time?




Thanks
Shen Jun

Reply via email to