On Sun, Jul 12, 2020 at 5:48 PM Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com> wrote: > > > > > Hi Bharath, > > > > I was looking forward to review this patch-set but unfortunately it is > > showing a reject in copy.c, and might need a rebase. > > I was applying on master over the commit- > > cd22d3cdb9bd9963c694c01a8c0232bbae3ddcfb. > > > > Thanks for showing interest. Please find the patch set rebased to > latest commit b1e48bbe64a411666bb1928b9741e112e267836d. >
Few comments: ==================== 0001-Copy-code-readjustment-to-support-parallel-copy I am not sure converting the code to macros is a good idea, it makes this code harder to read. Also, there are a few changes which I am not sure are necessary. 1. +/* + * CLEAR_EOL_FROM_COPIED_DATA - Clear EOL from the copied data. + */ +#define CLEAR_EOL_FROM_COPIED_DATA(copy_line_data, copy_line_pos, copy_line_size) \ +{ \ + /* \ + * If we didn't hit EOF, then we must have transferred the EOL marker \ + * to line_buf along with the data. Get rid of it. \ + */ \ + switch (cstate->eol_type) \ + { \ + case EOL_NL: \ + Assert(copy_line_size >= 1); \ + Assert(copy_line_data[copy_line_pos - 1] == '\n'); \ + copy_line_data[copy_line_pos - 1] = '\0'; \ + copy_line_size--; \ + break; \ + case EOL_CR: \ + Assert(copy_line_size >= 1); \ + Assert(copy_line_data[copy_line_pos - 1] == '\r'); \ + copy_line_data[copy_line_pos - 1] = '\0'; \ + copy_line_size--; \ + break; \ + case EOL_CRNL: \ + Assert(copy_line_size >= 2); \ + Assert(copy_line_data[copy_line_pos - 2] == '\r'); \ + Assert(copy_line_data[copy_line_pos - 1] == '\n'); \ + copy_line_data[copy_line_pos - 2] = '\0'; \ + copy_line_size -= 2; \ + break; \ + case EOL_UNKNOWN: \ + /* shouldn't get here */ \ + Assert(false); \ + break; \ + } \ +} In the original code, we are using only len and buffer, here we are using position, length/size and buffer. Is it really required or can we do with just len and buffer? 2. +/* + * INCREMENTPROCESSED - Increment the lines processed. + */ +#define INCREMENTPROCESSED(processed) \ +processed++; + +/* + * GETPROCESSED - Get the lines processed. + */ +#define GETPROCESSED(processed) \ +return processed; + I don't like converting above to macros. I don't think converting such things to macros will buy us much. 0002-Framework-for-leader-worker-in-parallel-copy 3. /* + * Copy data block information. + */ +typedef struct ParallelCopyDataBlock It is better to add a few comments atop this data structure to explain how it is used? 4. + * ParallelCopyLineBoundary is common data structure between leader & worker, + * this is protected by the following sequence in the leader & worker. + * Leader should operate in the following order: + * 1) update first_block, start_offset & cur_lineno in any order. + * 2) update line_size. + * 3) update line_state. + * Worker should operate in the following order: + * 1) read line_size. + * 2) only one worker should choose one line for processing, this is handled by + * using pg_atomic_compare_exchange_u32, worker will change the sate to + * LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED. + * 3) read first_block, start_offset & cur_lineno in any order. + */ +typedef struct ParallelCopyLineBoundary Here, you have mentioned how workers and leader should operate to make sure access to the data is sane. However, you have not explained what is the problem if they don't do so and it is not apparent to me. Also, it is not very clear what is the purpose of this data structure from comments. 5. +/* + * Circular queue used to store the line information. + */ +typedef struct ParallelCopyLineBoundaries +{ + /* Position for the leader to populate a line. */ + uint32 leader_pos; I don't think the variable needs to be named as leader_pos, it is okay to name it is as 'pos' as the comment above it explains its usage. 7. +#define DATA_BLOCK_SIZE RAW_BUF_SIZE +#define RINGSIZE (10 * 1000) +#define MAX_BLOCKS_COUNT 1000 +#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */ It would be good if you can write a few comments to explain why you have chosen these default values. 8. ParallelCopyCommonKeyData, shall we name this as SerializedParallelCopyState or something like that? For example, see SerializedSnapshotData which has been used to pass snapshot information to passed to workers. 9. +CopyCommonInfoForWorker(CopyState cstate, ParallelCopyCommonKeyData *shared_cstate) If you agree with point-8, then let's name this as SerializeParallelCopyState. See, if there is more usage of similar types in the patch then lets change those as well. 10. + * in the DSM. The specified number of workers will then be launched. + * + */ +static ParallelContext* +BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid) No need of an extra line with only '*' in the above multi-line comment. 11. BeginParallelCopy(..) { .. + EstimateLineKeysStr(pcxt, cstate->null_print); + EstimateLineKeysStr(pcxt, cstate->null_print_client); + EstimateLineKeysStr(pcxt, cstate->delim); + EstimateLineKeysStr(pcxt, cstate->quote); + EstimateLineKeysStr(pcxt, cstate->escape); .. } Why do we need to do this separately for each variable of cstate? Can't we serialize it along with other members of SerializeParallelCopyState (a new name for ParallelCopyCommonKeyData)? 12. BeginParallelCopy(..) { .. + LaunchParallelWorkers(pcxt); + if (pcxt->nworkers_launched == 0) + { + EndParallelCopy(pcxt); + elog(WARNING, + "No workers available, copy will be run in non-parallel mode"); .. } I don't see the need to issue a WARNING if we are not able to launch workers. We don't do that for other cases where we fail to launch workers. 13. +} +/* + * ParallelCopyMain - .. +} +/* + * ParallelCopyLeader One line space is required before starting a new function. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com