Hi
I am now trying to use spark to do tcpdump pcap file analysis. The first step is to read the file and parse the content to dataframe according to analysis requirements. I've made a public folder for all executors so that they can access it directly like a local file system. Here is the main code: filename =["/mdata/400m.pcap"] #filenames = ["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2","/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5","/mdata/400m.pcap6",] tsharkFilter = conf.tsharkFilter tsharkArgs = conf.tsharkArgs workerAmount = conf.workerAmount parallelTasks = spark.sparkContext.parallelize(filenames) allSplitedTasks = parallelTasks.flatMap(lambda x: SplitToTasksByExecutorAmount(x,workerAmount)) allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs)) out = allSplitedTasks.flatMap(readPieces) Then, the file parsing part is here. def readPieces(param): try: filename = param['filename'] #batchID = param['batchID'] startPos = param['startPos'] endPos = param['endPos'] count = param['count'] tsharkFilter = param['tsharkFilter'] if 'tsharkFilter' in param else None tsharkArgs = param['tsharkArgs'] if 'tsharkArgs' in param else None with open(filename, "rb") as f: if endPos == 0: endPos = f.seek(0,2) f.seek(0) hdr = f.read(24) f.seek(startPos) readLen = endPos - startPos content = f.read(readLen) if count: cmd = ["tshark", "-r", "-","-c", str(count)] else: cmd = ["tshark", "-r", "-"] if tsharkArgs: cmd.extend(tsharkArgs) if tsharkFilter: cmd.extend(tsharkFilter) childProcess = Popen(cmd, stdin=PIPE, stdout=PIPE) raw =b''.join([hdr,content]) outStr = childProcess.communicate(input=raw )[0] print(outStr) lines = outStr.split(b'\n') return [ x.split(b'\t') for x in lines if x !=b''] except: return [[str(e)]] The SplitToTasksByExecutorAmount function will go through the file and then output a list of dictionary elements. So that, i supporse multiple executors would read the file from different startPos and only read to endPos. {"filename":filename, "batchID":batchID, "startPos":startPos, "endPos":endPos, "count":count} Then when application is running, i can only see single tshark process running in all my k8s nodes. If i add more filenames into the main code, then the running tshark process equals to filename amount. Is there some global lock somethere in spark so that same file would only be read by single executor per time? Is it possible to enable multiple executors to read same file at the same time? Thanks Shen Jun