yiguolei commented on code in PR #30746: URL: https://github.com/apache/doris/pull/30746#discussion_r1479105367
########## be/src/vec/exec/scan/scanner_context.cpp: ########## @@ -220,138 +162,224 @@ std::string ScannerContext::parent_name() { vectorized::BlockUPtr ScannerContext::get_free_block() { vectorized::BlockUPtr block; if (_free_blocks.try_dequeue(block)) { + std::lock_guard<std::mutex> fl(_free_blocks_lock); DCHECK(block->mem_reuse()); - _free_blocks_memory_usage->add(-block->allocated_bytes()); - _serving_blocks_num++; + _free_blocks_memory_usage -= block->allocated_bytes(); + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); return block; } - block = vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, - true /*ignore invalid slots*/); - - COUNTER_UPDATE(_newly_create_free_blocks_num, 1); - - _serving_blocks_num++; - return block; + _newly_create_free_blocks_num->update(1); + return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, + true /*ignore invalid slots*/); } -void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block) { - _serving_blocks_num--; - if (block->mem_reuse()) { - // Only put blocks with schema to free blocks, because colocate blocks - // need schema. - _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); +void ScannerContext::return_free_block(vectorized::BlockUPtr block) { + std::lock_guard<std::mutex> fl(_free_blocks_lock); + if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) { block->clear_column_data(); - _free_blocks_memory_usage->add(block->allocated_bytes()); + _free_blocks_memory_usage += block->allocated_bytes(); + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); _free_blocks.enqueue(std::move(block)); } } -void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) { - std::lock_guard l(_transfer_lock); - auto old_bytes_in_queue = _cur_bytes_in_queue; - for (auto& b : blocks) { - auto st = validate_block_schema(b.get()); - if (!st.ok()) { - set_status_on_error(st, false); +bool ScannerContext::empty_in_queue(int id) { + std::lock_guard<std::mutex> l(_transfer_lock); + return _blocks_queue.empty(); +} + +void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) { + _scanner_sched_counter->update(1); + _num_scheduled_scanners++; + _scanner_scheduler->submit(shared_from_this(), scan_task); +} + +void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) { + Status st = validate_block_schema(scan_task->current_block.get()); + if (!st.ok()) { + scan_task->set_status(st); + } + // We can only know the block size after reading at least one block + // Just take the size of first block as `_estimated_block_size` + if (scan_task->first_block) { + std::lock_guard<std::mutex> fl(_free_blocks_lock); + size_t block_size = scan_task->current_block->allocated_bytes(); + _free_blocks_memory_usage += block_size; + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + scan_task->first_block = false; + if (block_size > _estimated_block_size) { + _estimated_block_size = block_size; } - _cur_bytes_in_queue += b->allocated_bytes(); - _blocks_queue.push_back(std::move(b)); } - blocks.clear(); - if (_dependency) { - _dependency->set_ready(); + std::lock_guard<std::mutex> l(_transfer_lock); + if (!scan_task->status_ok()) { + _process_status = scan_task->get_status(); + } + if (_last_scale_up_time == 0) { + _last_scale_up_time = UnixMillis(); + } + if (_blocks_queue.empty() && _last_fetch_time != 0) { + // there's no block in queue before current block, so the consumer is waiting + _total_wait_block_time += UnixMillis() - _last_fetch_time; + } + if (scan_task->is_eos()) { Review Comment: Should move eos check to get block from queue, then all logic is unified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org