On Thu, 20 Feb 2020 at 18:43, David Fetter <da...@fetter.org> wrote:> > On Thu, Feb 20, 2020 at 02:36:02PM +0100, Tomas Vondra wrote: > > I think the wc2 is showing that maybe instead of parallelizing the > > parsing, we might instead try using a different tokenizer/parser and > > make the implementation more efficient instead of just throwing more > > CPUs on it. > > That was what I had in mind. > > > I don't know if our code is similar to what wc does, maytbe parsing > > csv is more complicated than what wc does. > > CSV parsing differs from wc in that there are more states in the state > machine, but I don't see anything fundamentally different.
The trouble with a state machine based approach is that the state transitions form a dependency chain, which means that at best the processing rate will be 4-5 cycles per byte (L1 latency to fetch the next state). I whipped together a quick prototype that uses SIMD and bitmap manipulations to do the equivalent of CopyReadLineText() in csv mode including quotes and escape handling, this runs at 0.25-0.5 cycles per byte. Regards, Ants Aasma
#include <stddef.h> #include <stdlib.h> #include <stdio.h> #include <inttypes.h> #include <immintrin.h> #include <string.h> #include <time.h> #define likely(x) __builtin_expect((x),1) #define unlikely(x) __builtin_expect((x),0) /* * Create a bitmap of matching characters in the next 64 bytes **/ static inline uint64_t find_chars(__m256i *data, char c) { const __m256i mask = _mm256_set1_epi8(c); uint64_t result = (uint32_t) _mm256_movemask_epi8(_mm256_cmpeq_epi8(data[0], mask)); result |= ((uint64_t) _mm256_movemask_epi8(_mm256_cmpeq_epi8(data[1], mask))) << 32; return result; } /* * Creates a bitmap of unpaired escape characters **/ static inline uint64_t find_unpaired_escapes(uint64_t escapes) { // TODO: handle unpaired escape from end of last iteration uint64_t p, e, r; p = escapes; e = escapes; r = escapes; while (e) { p = e; e = (e << 1) & escapes; r ^= e; } return r & p; } /* * Creates a bitmap mask of quoted sections given locations of * quote chatacters. **/ static inline uint64_t find_quote_mask(uint64_t quote_bits, uint64_t *prev_inside_quote) { uint64_t mask = _mm_cvtsi128_si64(_mm_clmulepi64_si128( _mm_set_epi64x(0ULL, quote_bits), _mm_set1_epi8(0xFF), 0)); mask ^= *prev_inside_quote; *prev_inside_quote = ((int64_t) mask) >> 63; return mask; } /* * Parses len bytes from buf according to csv rules and writes start positions of * records to output. Returns number of rows found. **/ int64_t parseIntoLines(char *buf, size_t len, size_t *output) { __m256i* input = (__m256i*) buf; uint64_t prev_inside_quote = 0; size_t pos = 0; uint64_t numfound = 0; *output++ = 0; numfound++; while (pos < len - 64) { uint64_t quotes = find_chars(input, '"'); uint64_t escapes = find_chars(input, '\\'); uint64_t unpaired_escapes = find_unpaired_escapes(escapes); uint64_t unescaped_quotes = quotes & ~(unpaired_escapes << 1); uint64_t newlines = find_chars(input, '\n'); uint64_t quote_mask = find_quote_mask(unescaped_quotes, &prev_inside_quote); uint64_t tokenpositions = newlines & ~quote_mask; uint64_t carriages = find_chars(input, '\r') & ~quote_mask; if (unlikely(carriages != 0)) exit(1); uint64_t offset = 0; while (tokenpositions > 0) { int numchars = __builtin_ctzll(tokenpositions); tokenpositions >>= numchars; tokenpositions >>= 1; offset += numchars + 1; *output++ = pos + offset; numfound++; } pos += 64; input += 2; } // TODO: handle tail return numfound; } int main(int argc, char *argv[]) { char *buf; uint64_t *lines; uint64_t iters = 1; if (argc < 2) { printf("Usage: simdcopy csvfile [iterations]\n"); return 1; } if (argc > 2) { iters = atol(argv[2]); } buf = aligned_alloc(64, 1024*1024*1024); lines = aligned_alloc(8, 128*1024*1024*sizeof(uint64_t)); if (!buf || !lines) return 1; FILE *f = fopen(argv[1], "r"); if (!f) return 1; #define READBLOCK (1024*1024) size_t len = 0; while (len < sizeof(buf) - READBLOCK) { size_t result = fread(buf + len, 1, READBLOCK, f); if (!result) break; len += result; } fclose(f); struct timespec start; struct timespec end; printf("Parsing %lu bytes, %lu times\n", len, iters); uint64_t numfound; clock_gettime(CLOCK_MONOTONIC, &start); for (uint64_t i = 0; i < iters; i++) { numfound = parseIntoLines(buf, len, lines); } clock_gettime(CLOCK_MONOTONIC, &end); double delta = (end.tv_sec - start.tv_sec) + (1.e-9)*(end.tv_nsec - start.tv_nsec); printf("Found %lu rows in %lu bytes in %f milliseconds\n", numfound, len*iters, delta*1000); printf(" Speed: %0.3f GB/s\n", len/delta/1e9*iters); return 0; }