Pass more partitions to the second argument of parallelize()? On Mon, Dec 21, 2020 at 7:39 AM 沈俊 <jun.shen....@qq.com> wrote:
> Hi > > I am now trying to use spark to do tcpdump pcap file analysis. The first > step is to read the file and parse the content to dataframe according to > analysis requirements. > > I've made a public folder for all executors so that they can access it > directly like a local file system. > Here is the main code: > filename =["/mdata/400m.pcap"] > #filenames = ["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2", > "/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5", > "/mdata/400m.pcap6",] > tsharkFilter = conf.tsharkFilter > tsharkArgs = conf.tsharkArgs > workerAmount = conf.workerAmount > > parallelTasks = spark.sparkContext.parallelize(filenames) > allSplitedTasks = parallelTasks.flatMap(lambda x > : SplitToTasksByExecutorAmount(x,workerAmount)) > allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x, > tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs)) > out = allSplitedTasks.flatMap(readPieces) > > Then, the file parsing part is here. > def readPieces(param): > try: > filename = param['filename'] > #batchID = param['batchID'] > startPos = param['startPos'] > endPos = param['endPos'] > count = param['count'] > tsharkFilter = param['tsharkFilter'] if 'tsharkFilter' in param > else None > tsharkArgs = param['tsharkArgs'] if 'tsharkArgs' in param else > None > > with open(filename, "rb") as f: > if endPos == 0: > endPos = f.seek(0,2) > f.seek(0) > hdr = f.read(24) > f.seek(startPos) > readLen = endPos - startPos > content = f.read(readLen) > if count: > cmd = ["tshark", "-r", "-","-c", str(count)] > else: > cmd = ["tshark", "-r", "-"] > if tsharkArgs: cmd.extend(tsharkArgs) > if tsharkFilter: cmd.extend(tsharkFilter) > childProcess = Popen(cmd, stdin=PIPE, stdout=PIPE) > raw =b''.join([hdr,content]) > outStr = childProcess.communicate(input=raw )[0] > print(outStr) > lines = outStr.split(b'\n') > return [ x.split(b'\t') for x in lines if x !=b''] > except: > return [[str(e)]] > > The SplitToTasksByExecutorAmount function will go through the file and > then output a list of dictionary elements. So that, i supporse multiple > executors would read the file from different startPos and only read to > endPos. > {"filename":filename, "batchID":batchID, "startPos":startPos, "endPos" > :endPos, "count":count} > > Then when application is running, i can only see single tshark process > running in all my k8s nodes. > If i add more filenames into the main code, then the running tshark > process equals to filename amount. > > Is there some global lock somethere in spark so that same file would only > be read by single executor per time? Is it possible to enable multiple > executors to read same file at the same time? > > > Thanks > Shen Jun > > >