
i'm still trying to get myself to engage this nettensors code more ... it's hard to stay with there was idea that likely those rotor motors are too weak if not geared, this is quite possible, i don't know worried i'll make this nettensors code even messier here's the total current function blob. this is not a cohesive function that makes sense, it's colelctions of my amnesiatic attempts to implement it from copypasting a different one in. def read_many(self, offset_lengths, progress, validate_sorted=True): if validate_sorted: sorted_offset_lengths = list(offset_lengths) sorted_offset_lengths.sort() assert sorted_offset_lengths == offset_lengths OP_FETCH = 1 OP_PLACE = 2 OP_OUTPUT = 4 offset_length_tail_idx_ops = torch.zeros([offset_lengths.shape[0]*2, 5]) OFFSET, LENGTH, TAIL, IDX, OP = range(offset_length_tail_ops.shape[-1]) op_ct = 0 #results = torch.empty(len(o #results = [None] * len(offset_lengths) tails = (offset_lengths[:,0] + offset_lengths[:,1]).clamp(max=len(self.mmap)) aligned_offsets = offset_lengths[:,0] // self.blksize; aligned_offsets *= self.blksize aligned_tails = (tails - 1); aligned_tails //= self.blksize; aligned_tails += 1; aligned_tails *= self.blksize; torch.clamp(aligned_tails, max=self.size(), out=aligned_tails) cls = type(self.fetchers) avail_disk_space = (psutil.disk_usage(self.fn).free + cls.sparse_usage) * self.fetchers.usage_frac - cls.sparse_usage min_hole = 0 pbar = range(len(offset_lengths)) if progress: pbar = tqdm.tqdm(pbar, total=len(offset_lengths), desc=progress, leave=False, unit='rd') idx = 0 while idx < len(offset_lengths): #for idx in pbar:#range(len(offset_lengths)): #offset, length = offset_lengths[idx] #tail = min(offset + length, len(self.mmap)) #aligned_offset = (offset // self.blksize) * self.blksize #aligned_tail = min(self.size(), (((tail - 1) // self.blksize) + 1) * self.blksize) aligned_offset = aligned_offsets[idx].item() next_hole = self._next_sparse(max(aligned_offset, min_hole), os.SEEK_HOLE) # 1/3: COMPLETELY CACHED ITEMS this description of the subset looks correct cached_idcs = (tails[idx:] < next_hole).nonzero()[:,0] num_cached_idcs = cached_idcs.shape[0] if num_cached_idcs > 0: next_idx = idx + num_cached_idcs assert (cached_idcs < next_idx).all() next_op_ct = op_ct + num_cached_idcs offset_length_tail_ops[op_ct:next_op_ct,[OFFSET,LENGTH]] = offset_lengths[idx:next_idx] offset_length_tail_ops[op_ct:next_op_ct,TAIL] = tails[idx:next_idx] offset_length_tail_ops[op_ct:next_op_ct,IDX] = cached_idcs offset_length_tail_ops[op_ct:next_op_ct,OP] = OP_OUTPUT op_ct = next_op_ct idx = next_idx next_data = self._next_sparse(next_hole, os.SEEK_DATA) # 2/3: COMPLETELY UNCACHED ITEMS missing_idcs = (tails[idx:] < next_data).nonzero()[:,0] #missing_idcs = (next_hole < tails[idx:] and next_data > offsets_lengths[idx:,0]).nonzero()[:,0] #missing_idcs = (next_hole < tails[idx:]).nonzero()[:,0] # here we are handling all undownloaded indices before the next cached ones # there could be many pages between them that don't need to be fetched num_missing_idcs = missing_idcs.shape[0] if num_missing_idcs > 0: # uncached data next_idx = idx + num_missing_idcs assert (missing_idcs < next_idx).all() but most likely there could be some idcs that are not missing ... ones where tail < next_hole? missing_offset_lengths = offset_lengths[idx:next_idx] if missing_offset_lengths[:,1].sum() > avail_disk_space: # note: SMALL BUG here in that it checks the tensor size instead of the surrounding pages actually fetched # no more disk space if not cls.warned_space: import warnings warnings.warn( '\nCACHE FULL CACHE FULL' + '\nRequested=' + str(tqdm.tqdm.format_sizeof(aligned_tail - aligned_offset, 'B', 1024)) + ' Cached=' + str(tqdm.tqdm.format_sizeof(cls.sparse_usage, 'B', 1024)) + ' Free=' + str(tqdm.tqdm.format_sizeof(psutil.disk_usage(self.fn).free, 'B', 1024)) + '\n' + os.path.dirname(self.fn) + '\nCACHE FULL CACHE FULL' , stacklevel=5) cls.warned_space = True next_op_ct = op_ct + num_missing_idcs offset_length_tail_ops[op_ct:next_op_ct,[OFFSET,LENGTH]] = missing_offset_lengths offset_length_tail_ops[op_ct:next_op_ct,TAIL] = tails[idx:next_idx] offset_length_tail_ops[op_ct:next_op_ct,IDX] = missing_idcs offset_length_tail_ops[op_ct:next_op_ct,OP] = OP_FETCH | OP_OUTPUT op_ct = next_op_ct idx = next_idx continue # now we want to group these into those that do not have an empty page between them, and we can fetch everything underneath them at once g # note that there may be a tensor that is partly filled, and may have further holes farther along, at the right edge #if num_missing_idcs > 1: if True: # add fetches, placements, and outputs covering all pages with tensors wholly in them # each tensor has an aligned offset and aligned tail already # it might be more useful to consider abstract alignment points that information would be rounded too # we could calculate empty regions from the differences between the offsets and tails # empty regions that don't contain pages would be elided away # empty regions that do contain pages would be aligned # alternatively one could compare aligned_offsets and aligned_tails for equality and overlap aligned_start = aligned_offsets[idx] aligned_end = min(aligned_tails[next_idx-1], next_hole) #empty_regions = missing_offset_lengths[:-1, 0] assert not (tails[idx:next_idx-1] > offsets[idx+1:next_idx]).any() #mergeable_regions = aligned_tails[idx:next_idx-1] >= aligned_offsets[idx+1:next_idx] #merge_bounds = mergeable_regions[:-1] != mergeable_regions[1:] # so here first we compare the preceding tails to the following offsets # mergeable_regions then provides a bool for each space between regions that represents the two being the same. so there's 1 less bool and it's the spaces between regions. # merge_bounds then looks for these regions of sameness within mergeable_regions. so merge_bounds has a length of 2 fewer than originally, and it relates to comparisons between adjacent pairs of spaces between regions. # the data starts and ends with useful groups .. but this may not yet be represented in merge_bounds # might want to special case length==2 somewhere for ease # it's notable that mergeable_regions[x] indicates if True that the region can be merged with its neighbors # but that if False it indicates that the region needs to be treated as its own single group region_mask = aligned_tails[idx:next_idx-1] < aligned_offsets[idx+1:next_idx] region_bounds = region_mask.nonzero()[:,0] region_bounds += idx # region bounds says which offsets+1 cannot be replaced by their preceding offsets (if idx shifted by 1) # that is, which tails cannot be replaced by their following tails # T T F F T F T true if the tail at that index is less than the offst of the following -- ie a gap follows # we want head + tail from 0 and 1 because a gap follows them # then we want head from 2, and tail from 4 # then head from 5, and tail from 6 # 0 1 2 3 4 5 6 # 0, 1, 4, 6 # head and tail from: -1+1=0, 0 # 0+1=1, 1 # 1+1=2, 4 # 4+1=5, 6 # note these are double indices next_op_ct = op_ct + region_bounds.shape[0] + 1 offset_length_tail_idx_ops[op_ct:next_op_ct-1,TAIL] = aligned_tails[region_bounds] offset_length_tail_idx_ops[next_op_ct-1,TAIL] = aligned_end # why is the offset of one, one more than the tail of the preceding one # maybe an indexing operation not performed? #offset_length_tail_idx_ops[op_ct+1:next_op_ct,OFFSET] = offset_length_tail_idx_ops[op_ct:next_op_ct-1,TAIL] #offset_length_tail_idx_ops[op_ct+1:next_op_ct,OFFSET] += 1 region_bounds += 1 offset_length_tail_idx_ops[op_ct+1:next_op_ct,OFFSET] = aligned_offsets[region_bounds] offset_length_tail_idx_ops[op_ct,OFFSET] = aligned_start offset_length_tail_idx_ops[op_ct:next_op_ct,OP] = OP_FETCH | OP_PLACE #if mergeable_regions[0]: # # first pair is mergeable # # merge_bouds[0] represents the end of the first merge #else: # # first pair is not mergeable # # merge_bounds[0] represents the start of the first merge #merged_aligned_offsets = aligned_offsets[ ## how to now merge them; i guess extract the start and end # what remains after all the merged placements above is the last one # it may not need to be fully fetched # ... it calculates a bool regarding whether the last tail is distinct from the second-to-last # but there are only n-1 calcs ... one is added at the start with the initial offset ... g # consider [oop # #torch.minimum(offset_length_tail_idx_ops[next_op_ct-1,TAIL], next_hole) ##### now under the condition of num_missing_idcs > 1, # now, # ops have hopefully been set to place everything up to next_hole # it may be such that tail[next_idx-1] > next_hole in which case more must be done we can maybe do hole on left for every placement #if aligned_offset - 1 >= min_hole: # hole_on_left = self._next_sparse(aligned_offset - 1, os.SEEK_HOLE) < aligned_offset #else: # hole_on_left = False # # hole_on_left = self._next_sparse( # 3/3: 1 PARTLY CACHED ITEM, possibly with multiple scattered holes inside 1 item #next_data = self._next_sparse(next_hole, os.SEEK_DATA) tail = tails[next_idx-1] aligned_tail = aligned_tails[next_idx-1] aligned_offset = offset_length_tail_idx_ops[next_op_ct-1,TAIL] length = aligned_tail - aligned_offset while next_data < tail: assert next_data - next_hole <= length length = next_data - next_hole offset_length_tail_idx_ops[next_op_ct,OFFSET] = next_hole offset_length_tail_idx_ops[next_op_ct,TAIL] = next_data offset_length_tail_idx_ops[next_op_ct,OP] = OP_FETCH | OP_PLACE next_op_ct += 1 cls.sparse_usage += length next_hole = self._next_sparse(next_data, os.SEEK_HOLE) next_data = self._next_sparse(next_hole, os.SEEK_DATA) if next_hole < tail: length = aligned_tail - next_hole offset_length_tail_idx_ops[next_op_ct,OFFSET] = next_hole offset_length_tail_idx_ops[next_op_ct,TAIL] = aligned_tail offset_length_tail_idx_ops[next_op_ct,OP] = OP_FETCH | OP_PLACE next_op_ct += 1 cls.sparse_usage += length next_hole = self._next_sparse(aligned_tail, os.SEEK_HOLE) offset_length_tail_idx_ops[op_ct:next_op_ct,LENGTH] = offset_length_tail_idx_ops[op_ct:next_op_ct,TAIL] offset_length_tail_idx_ops[op_ct:next_op_ct,LENGTH] -= offset_length_tail_idx_ops[op_ct:next_op_ct,OFFSET] cls.sparse_usage += offset_length_tail_idx_ops[op_ct:next_op_ct,LENGTH].sum() op_ct = next_op_ct min_hole = max(next_hole, min_hole) next_op_ct = op_ct + offset_length_tail_idx_ops if next_hole < tail: # data not cached if cls.sparse_usage + aligned_tail - aligned_offset > (psutil.disk_usage(self.fn).free + cls.sparse_usage) * self.fetchers.usage_frac: # no more disk space if not cls.warned_space: import warnings warnings.warn( '\nCACHE FULL CACHE FULL' + '\nRequested=' + str(tqdm.tqdm.format_sizeof(aligned_tail - aligned_offset, 'B', 1024)) + ' Cached=' + str(tqdm.tqdm.format_sizeof(cls.sparse_usage, 'B', 1024)) + ' Free=' + str(tqdm.tqdm.format_sizeof(psutil.disk_usage(self.fn).free, 'B', 1024)) + '\n' + os.path.dirname(self.fn) + '\nCACHE FULL CACHE FULL' , stacklevel=5) cls.warned_space = True fetch_outputs.append([len(fetches), idx]) fetches.append([offset, length]) continue #return super().read(offset, length, progress=progress) hole_on_left = self._next_sparse(max(aligned_offset - 1, min_hole), os.SEEK_HOLE) < aligned_offset length = aligned_tail - aligned_offset next_data = self._next_sparse(next_hole, os.SEEK_DATA) while next_data < tail: assert next_data - next_hole <= length length = next_data - next_hole placements.append([len(fetches), next_hole, next_data]) fetches.append([next_hole, length]) #self.mmap[next_hole:next_data] = super().read(next_hole, length, progress=progress) cls.sparse_usage += length next_hole = self._next_sparse(next_data, os.SEEK_HOLE) next_data = self._next_sparse(next_hole, os.SEEK_DATA) if next_hole < tail: length = aligned_tail - next_hole placements.append([len(fetches), next_hole, aligned_tail]) fetches.append([next_hole, length]) #self.mmap[next_hole:aligned_tail] = super().read(next_hole, length, progress=progress) cls.sparse_usage += length # updated this while sleepy # on docker vms i found the memory mapper filling extra blocks with 0s # this new code tries to ensure data is correct when that happens # i've also updated the pagesize calculation so this might happen less next_hole = self._next_sparse(aligned_tail, os.SEEK_HOLE) extra_0s_right = min(next_hole, next_data) while extra_0s_right > aligned_tail: length = extra_0s_right - aligned_tail placements.append([len(fetches), aligned_tail, extra_0s_right]) fetches.append([aligned_tail, length]) #self.mmap[aligned_tail:extra_0s_right] = super().read(aligned_tail, length, progress=progress) cls.sparse_usage += length next_hole = self._next_sparse(extra_0s_right, os.SEEK_HOLE) extra_0s_right = min(next_hole, next_data) min_hole = max(next_hole, min_hole) if hole_on_left: check_holes_on_left.append(aligned_offset) # if self._next_sparse(aligned_offset - 1, os.SEEK_HOLE) >= aligned_offset: place_outputs.append([offset, tail, idx]) #return self.mmap[offset:tail] if progress: pbar.close() if len(fetches): fetches = super().read_many(fetches, progress=progress, validate_sorted=False) for fetchidx, start, end in placements: self.mmap[start:end] = fetches[fetchidx] for fetchidx, resultidx in fetch_outputs: results[resultidx] = fetches[fetchidx] for start, end, resultidx in place_outputs: results[resultidx] = self.mmap[offset:tail] for check_hole in check_holes_on_left: if self._next_sparse(check_hole - 1, os.SEEK_HOLE) >= check_hole: # a hole on the left disappeared # this could be resolved by walking holes on the left or storing auxiliary data regarding allocated regions # the former is space efficient and the latter time efficient; they could be combined as well os.unlink(self.fn) raise Exception( 'Your memory mapper is writing data below the cached region ' + 'even when aligned to the pagesize and blocksize. ' + 'The current code generates corrupt cached runs of 0s in this situation.') return results