i'm still trying to get myself to engage this nettensors code more ... it's
hard to stay with
there was idea that likely those rotor motors are too weak if not geared, this
is quite possible, i don't know
worried i'll make this nettensors code even messier
here's the total current function blob. this is not a cohesive function that
makes sense, it's colelctions of my amnesiatic attempts to implement it from
copypasting a different one in.
def read_many(self, offset_lengths, progress, validate_sorted=True):
if validate_sorted:
sorted_offset_lengths = list(offset_lengths)
sorted_offset_lengths.sort()
assert sorted_offset_lengths == offset_lengths
OP_FETCH = 1
OP_PLACE = 2
OP_OUTPUT = 4
offset_length_tail_idx_ops =
torch.zeros([offset_lengths.shape[0]*2, 5])
OFFSET, LENGTH, TAIL, IDX, OP =
range(offset_length_tail_ops.shape[-1])
op_ct = 0
#results = torch.empty(len(o
#results = [None] * len(offset_lengths)
tails = (offset_lengths[:,0] +
offset_lengths[:,1]).clamp(max=len(self.mmap))
aligned_offsets = offset_lengths[:,0] // self.blksize;
aligned_offsets *= self.blksize
aligned_tails = (tails - 1); aligned_tails //= self.blksize;
aligned_tails += 1; aligned_tails *= self.blksize; torch.clamp(aligned_tails,
max=self.size(), out=aligned_tails)
cls = type(self.fetchers)
avail_disk_space = (psutil.disk_usage(self.fn).free +
cls.sparse_usage) * self.fetchers.usage_frac - cls.sparse_usage
min_hole = 0
pbar = range(len(offset_lengths))
if progress:
pbar = tqdm.tqdm(pbar, total=len(offset_lengths),
desc=progress, leave=False, unit='rd')
idx = 0
while idx < len(offset_lengths):
#for idx in pbar:#range(len(offset_lengths)):
#offset, length = offset_lengths[idx]
#tail = min(offset + length, len(self.mmap))
#aligned_offset = (offset // self.blksize) * self.blksize
#aligned_tail = min(self.size(), (((tail - 1) // self.blksize)
+ 1) * self.blksize)
aligned_offset = aligned_offsets[idx].item()
next_hole = self._next_sparse(max(aligned_offset, min_hole),
os.SEEK_HOLE)
# 1/3: COMPLETELY CACHED ITEMS this description of the subset
looks correct
cached_idcs = (tails[idx:] < next_hole).nonzero()[:,0]
num_cached_idcs = cached_idcs.shape[0]
if num_cached_idcs > 0:
next_idx = idx + num_cached_idcs
assert (cached_idcs < next_idx).all()
next_op_ct = op_ct + num_cached_idcs
offset_length_tail_ops[op_ct:next_op_ct,[OFFSET,LENGTH]] =
offset_lengths[idx:next_idx]
offset_length_tail_ops[op_ct:next_op_ct,TAIL] =
tails[idx:next_idx]
offset_length_tail_ops[op_ct:next_op_ct,IDX] = cached_idcs
offset_length_tail_ops[op_ct:next_op_ct,OP] = OP_OUTPUT
op_ct = next_op_ct
idx = next_idx
next_data = self._next_sparse(next_hole, os.SEEK_DATA)
# 2/3: COMPLETELY UNCACHED ITEMS
missing_idcs = (tails[idx:] < next_data).nonzero()[:,0]
#missing_idcs = (next_hole < tails[idx:] and next_data >
offsets_lengths[idx:,0]).nonzero()[:,0]
#missing_idcs = (next_hole < tails[idx:]).nonzero()[:,0]
# here we are handling all undownloaded indices before the next
cached ones
# there could be many pages between them that don't need to be
fetched
num_missing_idcs = missing_idcs.shape[0]
if num_missing_idcs > 0:
# uncached data
next_idx = idx + num_missing_idcs
assert (missing_idcs < next_idx).all() but most likely
there could be some idcs that are not missing ... ones where tail < next_hole?
missing_offset_lengths = offset_lengths[idx:next_idx]
if missing_offset_lengths[:,1].sum() > avail_disk_space:
# note: SMALL BUG here in that it checks the tensor
size instead of the surrounding pages actually fetched
# no more disk space
if not cls.warned_space:
import warnings
warnings.warn(
'\nCACHE FULL CACHE FULL' +
'\nRequested=' +
str(tqdm.tqdm.format_sizeof(aligned_tail -
aligned_offset, 'B', 1024))
+ ' Cached=' +
str(tqdm.tqdm.format_sizeof(cls.sparse_usage,
'B', 1024))
+ ' Free=' +
str(tqdm.tqdm.format_sizeof(psutil.disk_usage(self.fn).free, 'B', 1024))
+ '\n' +
os.path.dirname(self.fn)
+ '\nCACHE FULL CACHE FULL'
, stacklevel=5)
cls.warned_space = True
next_op_ct = op_ct + num_missing_idcs
offset_length_tail_ops[op_ct:next_op_ct,[OFFSET,LENGTH]] =
missing_offset_lengths
offset_length_tail_ops[op_ct:next_op_ct,TAIL] =
tails[idx:next_idx]
offset_length_tail_ops[op_ct:next_op_ct,IDX] =
missing_idcs
offset_length_tail_ops[op_ct:next_op_ct,OP] = OP_FETCH
| OP_OUTPUT
op_ct = next_op_ct
idx = next_idx
continue
# now we want to group these into those that do not have an
empty page between them, and we can fetch everything underneath them at once
g # note that there may be a tensor that is partly filled,
and may have further holes farther along, at the right edge
#if num_missing_idcs > 1:
if True:
# add fetches, placements, and outputs covering all
pages with tensors wholly in them
# each tensor has an aligned offset and aligned
tail already
# it might be more useful to consider abstract
alignment points that information would be rounded too
# we could calculate empty regions from the
differences between the offsets and tails
# empty regions that don't contain pages would be
elided away
# empty regions that do contain pages would be
aligned
# alternatively one could compare aligned_offsets
and aligned_tails for equality and overlap
aligned_start = aligned_offsets[idx]
aligned_end = min(aligned_tails[next_idx-1], next_hole)
#empty_regions = missing_offset_lengths[:-1, 0]
assert not (tails[idx:next_idx-1] >
offsets[idx+1:next_idx]).any()
#mergeable_regions = aligned_tails[idx:next_idx-1] >=
aligned_offsets[idx+1:next_idx]
#merge_bounds = mergeable_regions[:-1] !=
mergeable_regions[1:]
# so here first we compare the preceding tails to the
following offsets
# mergeable_regions then provides a bool for each space
between regions that represents the two being the same. so there's 1 less bool
and it's the spaces between regions.
# merge_bounds then looks for these regions of sameness
within mergeable_regions. so merge_bounds has a length of 2 fewer than
originally, and it relates to comparisons between adjacent pairs of spaces
between regions.
# the data starts and ends with useful groups .. but
this may not yet be represented in merge_bounds
# might want to special case length==2 somewhere
for ease
# it's notable that mergeable_regions[x]
indicates if True that the region can be merged with its neighbors
# but that if False it indicates that the
region needs to be treated as its own single group
region_mask = aligned_tails[idx:next_idx-1] <
aligned_offsets[idx+1:next_idx]
region_bounds = region_mask.nonzero()[:,0]
region_bounds += idx
# region bounds says which offsets+1 cannot
be replaced by their preceding offsets (if idx shifted by 1)
# that is, which tails cannot be replaced
by their following tails
# T T F F T F T true if the tail at that index
is less than the offst of the following -- ie a gap follows
# we want head + tail from 0 and 1
because a gap follows them
# then we want head from 2, and tail from
4
# then head from 5, and tail from 6
# 0 1 2 3 4 5 6
# 0, 1, 4, 6
# head and tail from: -1+1=0, 0
# 0+1=1, 1
# 1+1=2, 4
# 4+1=5, 6
# note these are double indices
next_op_ct = op_ct + region_bounds.shape[0] + 1
offset_length_tail_idx_ops[op_ct:next_op_ct-1,TAIL] =
aligned_tails[region_bounds]
offset_length_tail_idx_ops[next_op_ct-1,TAIL] =
aligned_end
# why is the offset of one, one more than the
tail of the preceding one
# maybe an indexing operation not performed?
#offset_length_tail_idx_ops[op_ct+1:next_op_ct,OFFSET]
= offset_length_tail_idx_ops[op_ct:next_op_ct-1,TAIL]
#offset_length_tail_idx_ops[op_ct+1:next_op_ct,OFFSET]
+= 1
region_bounds += 1
offset_length_tail_idx_ops[op_ct+1:next_op_ct,OFFSET] =
aligned_offsets[region_bounds]
offset_length_tail_idx_ops[op_ct,OFFSET] = aligned_start
offset_length_tail_idx_ops[op_ct:next_op_ct,OP] =
OP_FETCH | OP_PLACE
#if mergeable_regions[0]:
# # first pair is mergeable
# # merge_bouds[0] represents the end of the first
merge
#else:
# # first pair is not mergeable
# # merge_bounds[0] represents the start of the
first merge
#merged_aligned_offsets = aligned_offsets[
## how to now merge them; i guess extract the start and
end
# what remains after all the merged placements above is
the last one
# it may not need to be fully fetched
# ... it calculates a bool regarding whether the last
tail is distinct from the second-to-last
# but there are only n-1 calcs ... one is added at the
start with the initial offset ...
g # consider [oop
#
#torch.minimum(offset_length_tail_idx_ops[next_op_ct-1,TAIL], next_hole)
##### now under the condition of num_missing_idcs > 1,
# now,
# ops have hopefully been set to place everything up
to next_hole
# it may be such that tail[next_idx-1] > next_hole in
which case more must be done
we can maybe do hole on left for every placement
#if aligned_offset - 1 >= min_hole:
# hole_on_left = self._next_sparse(aligned_offset - 1,
os.SEEK_HOLE) < aligned_offset
#else:
# hole_on_left = False
#
# hole_on_left = self._next_sparse(
# 3/3: 1 PARTLY CACHED ITEM, possibly with multiple
scattered holes inside 1 item
#next_data = self._next_sparse(next_hole, os.SEEK_DATA)
tail = tails[next_idx-1]
aligned_tail = aligned_tails[next_idx-1]
aligned_offset =
offset_length_tail_idx_ops[next_op_ct-1,TAIL]
length = aligned_tail - aligned_offset
while next_data < tail:
assert next_data - next_hole <= length
length = next_data - next_hole
offset_length_tail_idx_ops[next_op_ct,OFFSET] =
next_hole
offset_length_tail_idx_ops[next_op_ct,TAIL] = next_data
offset_length_tail_idx_ops[next_op_ct,OP] = OP_FETCH |
OP_PLACE
next_op_ct += 1
cls.sparse_usage += length
next_hole = self._next_sparse(next_data, os.SEEK_HOLE)
next_data = self._next_sparse(next_hole, os.SEEK_DATA)
if next_hole < tail:
length = aligned_tail - next_hole
offset_length_tail_idx_ops[next_op_ct,OFFSET] =
next_hole
offset_length_tail_idx_ops[next_op_ct,TAIL] =
aligned_tail
offset_length_tail_idx_ops[next_op_ct,OP] = OP_FETCH |
OP_PLACE
next_op_ct += 1
cls.sparse_usage += length
next_hole = self._next_sparse(aligned_tail,
os.SEEK_HOLE)
offset_length_tail_idx_ops[op_ct:next_op_ct,LENGTH] =
offset_length_tail_idx_ops[op_ct:next_op_ct,TAIL]
offset_length_tail_idx_ops[op_ct:next_op_ct,LENGTH] -=
offset_length_tail_idx_ops[op_ct:next_op_ct,OFFSET]
cls.sparse_usage +=
offset_length_tail_idx_ops[op_ct:next_op_ct,LENGTH].sum()
op_ct = next_op_ct
min_hole = max(next_hole, min_hole)
next_op_ct = op_ct +
offset_length_tail_idx_ops
if next_hole < tail:
# data not cached
if cls.sparse_usage + aligned_tail - aligned_offset >
(psutil.disk_usage(self.fn).free + cls.sparse_usage) * self.fetchers.usage_frac:
# no more disk space
if not cls.warned_space:
import warnings
warnings.warn(
'\nCACHE FULL CACHE FULL' +
'\nRequested=' +
str(tqdm.tqdm.format_sizeof(aligned_tail -
aligned_offset, 'B', 1024))
+ ' Cached=' +
str(tqdm.tqdm.format_sizeof(cls.sparse_usage,
'B', 1024))
+ ' Free=' +
str(tqdm.tqdm.format_sizeof(psutil.disk_usage(self.fn).free, 'B', 1024))
+ '\n' +
os.path.dirname(self.fn)
+ '\nCACHE FULL CACHE FULL'
, stacklevel=5)
cls.warned_space = True
fetch_outputs.append([len(fetches), idx])
fetches.append([offset, length])
continue
#return super().read(offset, length, progress=progress)
hole_on_left = self._next_sparse(max(aligned_offset - 1,
min_hole), os.SEEK_HOLE) < aligned_offset
length = aligned_tail - aligned_offset
next_data = self._next_sparse(next_hole, os.SEEK_DATA)
while next_data < tail:
assert next_data - next_hole <= length
length = next_data - next_hole
placements.append([len(fetches), next_hole, next_data])
fetches.append([next_hole, length])
#self.mmap[next_hole:next_data] =
super().read(next_hole, length, progress=progress)
cls.sparse_usage += length
next_hole = self._next_sparse(next_data, os.SEEK_HOLE)
next_data = self._next_sparse(next_hole, os.SEEK_DATA)
if next_hole < tail:
length = aligned_tail - next_hole
placements.append([len(fetches), next_hole,
aligned_tail])
fetches.append([next_hole, length])
#self.mmap[next_hole:aligned_tail] =
super().read(next_hole, length, progress=progress)
cls.sparse_usage += length
# updated this while sleepy
# on docker vms i found the memory mapper filling extra
blocks with 0s
# this new code tries to ensure data is correct when
that happens
# i've also updated the pagesize calculation so this
might happen less
next_hole = self._next_sparse(aligned_tail,
os.SEEK_HOLE)
extra_0s_right = min(next_hole, next_data)
while extra_0s_right > aligned_tail:
length = extra_0s_right - aligned_tail
placements.append([len(fetches), aligned_tail,
extra_0s_right])
fetches.append([aligned_tail, length])
#self.mmap[aligned_tail:extra_0s_right] =
super().read(aligned_tail, length, progress=progress)
cls.sparse_usage += length
next_hole = self._next_sparse(extra_0s_right,
os.SEEK_HOLE)
extra_0s_right = min(next_hole, next_data)
min_hole = max(next_hole, min_hole)
if hole_on_left:
check_holes_on_left.append(aligned_offset)
# if self._next_sparse(aligned_offset - 1, os.SEEK_HOLE)
>= aligned_offset:
place_outputs.append([offset, tail, idx])
#return self.mmap[offset:tail]
if progress:
pbar.close()
if len(fetches):
fetches = super().read_many(fetches, progress=progress,
validate_sorted=False)
for fetchidx, start, end in placements:
self.mmap[start:end] = fetches[fetchidx]
for fetchidx, resultidx in fetch_outputs:
results[resultidx] = fetches[fetchidx]
for start, end, resultidx in place_outputs:
results[resultidx] = self.mmap[offset:tail]
for check_hole in check_holes_on_left:
if self._next_sparse(check_hole - 1, os.SEEK_HOLE) >=
check_hole:
# a hole on the left disappeared
# this could be resolved by walking holes on the left or
storing auxiliary data regarding allocated regions
# the former is space efficient and the latter time
efficient; they could be combined as well
os.unlink(self.fn)
raise Exception(
'Your memory mapper is writing data below the cached
region ' +
'even when aligned to the pagesize and blocksize. ' +
'The current code generates corrupt cached runs of 0s
in this situation.')
return results