On Sat, 11 Jul 2020 at 08:55, Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com> wrote: > > Thanks Vignesh for the review. Addressed the comments in 0006 patch. > > > > > we can create a local variable and use in place of > > cstate->pcdata->curr_data_block. > > Done. > > > + if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - > > 1)) > > + AdjustFieldInfo(cstate, 1); > > + > > + memcpy(&fld_count, > > &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], > > sizeof(fld_count)); > > Should this be like below, as the remaining size can fit in current block: > > if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE) > > > > + if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE > > - 1)) > > + { > > + AdjustFieldInfo(cstate, 2); > > + *new_block_pos = pcshared_info->cur_block_pos; > > + } > > Same like above. > > Yes you are right. Changed. > > > > > + movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index; > > + > > + cstate->pcdata->curr_data_block->skip_bytes = movebytes; > > + > > + data_block = &pcshared_info->data_blocks[block_pos]; > > + > > + if (movebytes > 0) > > Instead of the above check, we can have an assert check for movebytes. > > No, we can't use assert here. For the edge case where the current data > block is full to the size DATA_BLOCK_SIZE, then movebytes will be 0, > but we need to get a new data block. We avoid memmove by having > movebytes>0 check. > > > + if (mode == 1) > > + { > > + cstate->pcdata->curr_data_block = data_block; > > + cstate->raw_buf_index = 0; > > + } > > + else if(mode == 2) > > + { > > + ParallelCopyDataBlock *prev_data_block = NULL; > > + prev_data_block = cstate->pcdata->curr_data_block; > > + prev_data_block->following_block = block_pos; > > + cstate->pcdata->curr_data_block = data_block; > > + > > + if (prev_data_block->curr_blk_completed == false) > > + prev_data_block->curr_blk_completed = true; > > + > > + cstate->raw_buf_index = 0; > > + } > > > > This code is common for both, keep in common flow and remove if (mode == 1) > > cstate->pcdata->curr_data_block = data_block; > > cstate->raw_buf_index = 0; > > > > Done. > > > +#define CHECK_FIELD_COUNT \ > > +{\ > > + if (fld_count == -1) \ > > + { \ > > + if (IsParallelCopy() && \ > > + !IsLeader()) \ > > + return true; \ > > + else if (IsParallelCopy() && \ > > + IsLeader()) \ > > + { \ > > + if > > (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index + > > sizeof(fld_count)] != 0) \ > > + ereport(ERROR, \ > > + > > (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ > > + errmsg("received copy > > data after EOF marker"))); \ > > + return true; \ > > + } \ > > We only copy sizeof(fld_count), Shouldn't we check fld_count != > > cstate->max_fields? Am I missing something here? > > fld_count != cstate->max_fields check is done after the above checks. > > > + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) > > + { > > + cstate->raw_buf_index = cstate->raw_buf_index + fld_size; > > + } > > We can keep the check like cstate->raw_buf_index + fld_size < ..., for > > better readability and consistency. > > > > I think this is okay. It gives a good meaning that available bytes in > the current data block is greater or equal to fld_size then, the tuple > lies in the current data block. > > > +static pg_attribute_always_inline void > > +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo, > > + Oid typioparam, int32 typmod, uint32 *new_block_pos, > > + int m, ParallelCopyTupleInfo *tuple_start_info_ptr, > > + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size) > > flinfo, typioparam & typmod is not used, we can remove the parameter. > > > > Done. > > > +static pg_attribute_always_inline void > > +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo, > > + Oid typioparam, int32 typmod, uint32 *new_block_pos, > > + int m, ParallelCopyTupleInfo *tuple_start_info_ptr, > > + ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size) > > I felt this function need not be an inline function. > > Yes. Changed. > > > > > + /* binary format */ > > + /* for paralle copy leader, fill in the error > > There are some typos, run spell check > > Done. > > > > > + /* raw_buf_index should never cross data block size, > > + * as the required number of data blocks would have > > + * been obtained in the above while loop. > > + */ > > There are few places, commenting style should be changed to postgres style > > Changed. > > > > > + if (cstate->pcdata->curr_data_block == NULL) > > + { > > + block_pos = WaitGetFreeCopyBlock(pcshared_info); > > + > > + cstate->pcdata->curr_data_block = > > &pcshared_info->data_blocks[block_pos]; > > + > > + cstate->raw_buf_index = 0; > > + > > + readbytes = CopyGetData(cstate, > > &cstate->pcdata->curr_data_block->data, 1, DATA_BLOCK_SIZE); > > + > > + elog(DEBUG1, "LEADER - bytes read from file %d", readbytes); > > + > > + if (cstate->reached_eof) > > + return true; > > + } > > There are many empty lines, these are not required. > > > > Removed. > > > > > + > > + fld_count = (int16) pg_ntoh16(fld_count); > > + > > + CHECK_FIELD_COUNT; > > + > > + cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count); > > + new_block_pos = pcshared_info->cur_block_pos; > > You can run pg_indent once for the changes. > > > > I ran pg_indent and observed that there are many places getting > modified by pg_indent. If we need to run pg_indet on copy.c for > parallel copy alone, then first, we need to run on plane copy.c and > take those changes and then run for all parallel copy files. I think > we better run pg_indent, for all the parallel copy patches once and > for all, maybe just before we kind of finish up all the code reviews. > > > + if (mode == 1) > > + { > > + cstate->pcdata->curr_data_block = data_block; > > + cstate->raw_buf_index = 0; > > + } > > + else if(mode == 2) > > + { > > Could use macros for 1 & 2 for better readability. > > Done. > > > > > + > > + if (following_block_id == -1) > > + break; > > + } > > + > > + if (following_block_id != -1) > > + > > pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts, > > 1); > > + > > + *line_size = *line_size + > > tuple_end_info_ptr->offset + 1; > > + } > > We could calculate the size as we parse and identify one record, if we > > do that way this can be removed. > > > > Done.
Hi Bharath, I was looking forward to review this patch-set but unfortunately it is showing a reject in copy.c, and might need a rebase. I was applying on master over the commit- cd22d3cdb9bd9963c694c01a8c0232bbae3ddcfb. -- Regards, Rafia Sabih