Hi
Finally, i found a configuraiton parameter: spark.default.parallelism Change this parmater will finally change the parallel running exeutor amount, although log file still says first 15 tasks ... blabla. Any way, my problem is solved. ------------------ Original ------------------ From: "????" <jun.shen....@qq.com>; Date: Tue, Dec 22, 2020 10:34 AM To: "Sean Owen"<sro...@gmail.com>; Cc: "user"<user@spark.apache.org>; Subject: Re: No matter how many instances and cores configured for spark on k8s, only one executor is reading file Hi Sean Owen Great thanks for your reply and suggestion. I tried add the second parmater to 30 to func parallelize. But it still keeps single executor running tasks. But if i add two lines, then can have multiple executors to run tasks. parallelTasks = spark.sparkContext.parallelize(filenames,30) allSplitedTasks = parallelTasks.flatMap(lambda x: SplitToTasksByExecutorAmount(x,workerAmount)) allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs)) listSplitedTasks = allSplitedTasks.collect() allSplitedTasks = spark.sparkContext.parallelize(listSplitedTasks) out = allSplitedTasks.flatMap(readPiecesNew) The added two lines is to terminiate previous rdd via collect() method, then create a new rdd based on collected data. Then i see a new issue from logs: max 15 tasks per batch. 20/12/22 02:07:06 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.4 KiB, free 3.4 GiB) 20/12/22 02:07:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.6 KiB, free 3.4 GiB) 20/12/22 02:07:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on shenjun-2ee1217688323457-driver-svc.default.svc:7079 (size: 5.6 KiB, free: 3.4 GiB) 20/12/22 02:07:06 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1223 20/12/22 02:07:06 INFO DAGScheduler: Submitting 80 missing tasks from ResultStage 1 (PythonRDD[3] at count at /mdata/iostat.py:354) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)) 20/12/22 02:07:06 INFO TaskSchedulerImpl: Adding task set 1.0 with 80 tasks This 15 tasks could be seen from the last second line. Actually, not mattter how many elements in rdd allSplitedTasks this value is always 15. Any approach to change it dynamically by programmers? ------------------ Original ------------------ From: "Sean Owen" <sro...@gmail.com>; Date: Mon, Dec 21, 2020 09:53 PM To: "????"<jun.shen....@qq.com>; Cc: "user"<user@spark.apache.org>; Subject: Re: No matter how many instances and cores configured for spark on k8s, only one executor is reading file Pass more partitions to the second argument of parallelize()? On Mon, Dec 21, 2020 at 7:39 AM ???? <jun.shen....@qq.com> wrote: Hi I am now trying to use spark to do tcpdump pcap file analysis. The first step is to read the file and parse the content to dataframe according to analysis requirements. I've made a public folder for all executors so that they can access it directly like a local file system. Here is the main code: filename =["/mdata/400m.pcap"] #filenames = ["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2","/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5","/mdata/400m.pcap6",] tsharkFilter = conf.tsharkFilter tsharkArgs = conf.tsharkArgs workerAmount = conf.workerAmount parallelTasks = spark.sparkContext.parallelize(filenames) allSplitedTasks = parallelTasks.flatMap(lambda x: SplitToTasksByExecutorAmount(x,workerAmount)) allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs)) out = allSplitedTasks.flatMap(readPieces) Then, the file parsing part is here. def readPieces(param): try: filename = param['filename'] #batchID = param['batchID'] startPos = param['startPos'] endPos = param['endPos'] count = param['count'] tsharkFilter = param['tsharkFilter'] if 'tsharkFilter' in param else None tsharkArgs = param['tsharkArgs'] if 'tsharkArgs' in param else None with open(filename, "rb") as f: if endPos == 0: endPos = f.seek(0,2) f.seek(0) hdr = f.read(24) f.seek(startPos) readLen = endPos - startPos content = f.read(readLen) if count: cmd = ["tshark", "-r", "-","-c", str(count)] else: cmd = ["tshark", "-r", "-"] if tsharkArgs: cmd.extend(tsharkArgs) if tsharkFilter: cmd.extend(tsharkFilter) childProcess = Popen(cmd, stdin=PIPE, stdout=PIPE) raw =b''.join([hdr,content]) outStr = childProcess.communicate(input=raw )[0] print(outStr) lines = outStr.split(b'\n') return [ x.split(b'\t') for x in lines if x !=b''] except: return [[str(e)]] The SplitToTasksByExecutorAmount function will go through the file and then output a list of dictionary elements. So that, i supporse multiple executors would read the file from different startPos and only read to endPos. {"filename":filename, "batchID":batchID, "startPos":startPos, "endPos":endPos, "count":count} Then when application is running, i can only see single tshark process running in all my k8s nodes. If i add more filenames into the main code, then the running tshark process equals to filename amount. Is there some global lock somethere in spark so that same file would only be read by single executor per time? Is it possible to enable multiple executors to read same file at the same time? Thanks Shen Jun