On Thu, 20 Feb 2020 at 18:43, David Fetter <da...@fetter.org> wrote:>
> On Thu, Feb 20, 2020 at 02:36:02PM +0100, Tomas Vondra wrote:
> > I think the wc2 is showing that maybe instead of parallelizing the
> > parsing, we might instead try using a different tokenizer/parser and
> > make the implementation more efficient instead of just throwing more
> > CPUs on it.
>
> That was what I had in mind.
>
> > I don't know if our code is similar to what wc does, maytbe parsing
> > csv is more complicated than what wc does.
>
> CSV parsing differs from wc in that there are more states in the state
> machine, but I don't see anything fundamentally different.

The trouble with a state machine based approach is that the state
transitions form a dependency chain, which means that at best the
processing rate will be 4-5 cycles per byte (L1 latency to fetch the
next state).

I whipped together a quick prototype that uses SIMD and bitmap
manipulations to do the equivalent of CopyReadLineText() in csv mode
including quotes and escape handling, this runs at 0.25-0.5 cycles per
byte.

Regards,
Ants Aasma
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
#include <inttypes.h>
#include <immintrin.h>
#include <string.h>
#include <time.h>

#define likely(x)       __builtin_expect((x),1)
#define unlikely(x)     __builtin_expect((x),0)

/*
 * Create a bitmap of matching characters in the next 64 bytes
 **/
static inline uint64_t
find_chars(__m256i *data, char c)
{
	const __m256i mask = _mm256_set1_epi8(c);
	uint64_t result = (uint32_t) _mm256_movemask_epi8(_mm256_cmpeq_epi8(data[0], mask));
	result |= ((uint64_t) _mm256_movemask_epi8(_mm256_cmpeq_epi8(data[1], mask))) << 32;
	return result;
}

/*
 * Creates a bitmap of unpaired escape characters
 **/
static inline uint64_t
find_unpaired_escapes(uint64_t escapes)
{
	// TODO: handle unpaired escape from end of last iteration
	uint64_t p, e, r;
	p = escapes;
	e = escapes;
	r = escapes;
	while (e) {
		p = e;
		e = (e << 1) & escapes;
		r ^= e;
	}
	return r & p;
}

/*
 * Creates a bitmap mask of quoted sections given locations of 
 * quote chatacters.
 **/
static inline uint64_t
find_quote_mask(uint64_t quote_bits, uint64_t *prev_inside_quote)
{
	uint64_t mask = _mm_cvtsi128_si64(_mm_clmulepi64_si128(
			_mm_set_epi64x(0ULL, quote_bits), _mm_set1_epi8(0xFF), 0));
	mask ^= *prev_inside_quote;
	*prev_inside_quote = ((int64_t) mask) >> 63;
	return mask;
}

/*
 * Parses len bytes from buf according to csv rules and writes start positions of
 * records to output. Returns number of rows found.
 **/
int64_t
parseIntoLines(char *buf, size_t len, size_t *output)
{
	__m256i* input = (__m256i*) buf;
	uint64_t prev_inside_quote = 0;
	size_t pos = 0;
	uint64_t numfound = 0;

	*output++ = 0;
	numfound++;

	while (pos < len - 64) {
		uint64_t quotes = find_chars(input, '"');
		uint64_t escapes = find_chars(input, '\\');
		uint64_t unpaired_escapes = find_unpaired_escapes(escapes);
		uint64_t unescaped_quotes = quotes & ~(unpaired_escapes << 1);
		uint64_t newlines = find_chars(input, '\n');
		uint64_t quote_mask = find_quote_mask(unescaped_quotes, &prev_inside_quote);
		uint64_t tokenpositions = newlines & ~quote_mask;
		uint64_t carriages = find_chars(input, '\r') & ~quote_mask;
		if (unlikely(carriages != 0))
			exit(1);

		uint64_t offset = 0;
		while (tokenpositions > 0) {
			int numchars = __builtin_ctzll(tokenpositions);
			tokenpositions >>= numchars;
			tokenpositions >>= 1;
			offset += numchars + 1;
			*output++ = pos + offset;
			numfound++;
		}

		pos += 64;
		input += 2;
	}
	// TODO: handle tail
	return numfound;
}

int main(int argc, char *argv[])
{
	char *buf;
	uint64_t *lines;
	uint64_t iters = 1;

	if (argc < 2)
	{
		printf("Usage: simdcopy csvfile [iterations]\n");
		return 1;
	}
	if (argc > 2)
	{
		iters = atol(argv[2]);
	}

	buf = aligned_alloc(64, 1024*1024*1024);
	lines = aligned_alloc(8, 128*1024*1024*sizeof(uint64_t));

	if (!buf || !lines)
		return 1;

	FILE *f = fopen(argv[1], "r");
	if (!f)
		return 1;

#define READBLOCK (1024*1024)
	size_t len = 0;
	while (len < sizeof(buf) - READBLOCK)
	{
		size_t result = fread(buf + len, 1, READBLOCK, f);
		if (!result)
			break;
		len += result;
	}
	fclose(f);

	struct timespec start;
	struct timespec end;

	printf("Parsing %lu bytes, %lu times\n", len, iters);
	uint64_t numfound;
	clock_gettime(CLOCK_MONOTONIC, &start);
	for (uint64_t i = 0; i < iters; i++) {
		numfound = parseIntoLines(buf, len, lines);
	}
	clock_gettime(CLOCK_MONOTONIC, &end);

	double delta = (end.tv_sec - start.tv_sec) + (1.e-9)*(end.tv_nsec - start.tv_nsec);

	printf("Found %lu rows in %lu bytes in %f milliseconds\n", numfound, len*iters, delta*1000);
	printf("  Speed: %0.3f GB/s\n", len/delta/1e9*iters);

	return 0;
}

Reply via email to