Pass more partitions to the second argument of parallelize()?

On Mon, Dec 21, 2020 at 7:39 AM 沈俊 <jun.shen....@qq.com> wrote:

> Hi
>
> I am now trying to use spark to do tcpdump pcap file analysis.  The first
> step is to read the file and parse the content to dataframe according to
> analysis requirements.
>
> I've made a public folder for all executors so that they can access it
> directly like a local file system.
> Here is the main code:
>     filename =["/mdata/400m.pcap"]
> #filenames = ["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2",
> "/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5",
> "/mdata/400m.pcap6",]
>     tsharkFilter =  conf.tsharkFilter
>     tsharkArgs =   conf.tsharkArgs
>     workerAmount = conf.workerAmount
>
>     parallelTasks = spark.sparkContext.parallelize(filenames)
>     allSplitedTasks  = parallelTasks.flatMap(lambda x
> : SplitToTasksByExecutorAmount(x,workerAmount))
>     allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x,
> tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs))
>     out = allSplitedTasks.flatMap(readPieces)
>
> Then, the file parsing part is here.
> def readPieces(param):
>     try:
>         filename = param['filename']
>         #batchID = param['batchID']
>         startPos = param['startPos']
>         endPos = param['endPos']
>         count = param['count']
>         tsharkFilter = param['tsharkFilter'] if 'tsharkFilter' in param
> else None
>         tsharkArgs = param['tsharkArgs'] if 'tsharkArgs' in param else
> None
>
>         with open(filename, "rb") as f:
>             if endPos == 0:
>                 endPos = f.seek(0,2)
>                 f.seek(0)
>             hdr = f.read(24)
>             f.seek(startPos)
>             readLen = endPos - startPos
>             content = f.read(readLen)
>             if count:
>                 cmd = ["tshark", "-r", "-","-c", str(count)]
>             else:
>                 cmd = ["tshark", "-r", "-"]
>             if tsharkArgs: cmd.extend(tsharkArgs)
>             if tsharkFilter: cmd.extend(tsharkFilter)
>             childProcess = Popen(cmd, stdin=PIPE, stdout=PIPE)
>             raw =b''.join([hdr,content])
>             outStr = childProcess.communicate(input=raw )[0]
>             print(outStr)
>             lines =  outStr.split(b'\n')
>             return [ x.split(b'\t') for x in lines if x !=b'']
>     except:
>         return [[str(e)]]
>
> The SplitToTasksByExecutorAmount function will go through the file and
> then output a list of dictionary elements. So that, i supporse multiple
> executors would read the file from different startPos and only read to
> endPos.
> {"filename":filename, "batchID":batchID, "startPos":startPos, "endPos"
> :endPos, "count":count}
>
> Then when application is running, i can only see single tshark process
> running in all my k8s nodes.
> If i add more filenames into the main code, then the running tshark
> process equals to filename amount.
>
> Is there some global lock somethere in spark so that same file would only
> be read by single executor per time?  Is it possible to enable multiple
> executors to read same file at the same time?
>
>
> Thanks
> Shen Jun
>
>
>

Reply via email to