Skip to content

Commit 0d9a922

Browse files
authored
Simplify and extend fsspec.parquet for filters and multi-file (#1945)
1 parent 25b805d commit 0d9a922

File tree

4 files changed

+278
-112
lines changed

4 files changed

+278
-112
lines changed

fsspec/caching.py

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import math
77
import os
88
import threading
9-
import warnings
109
from collections import OrderedDict
1110
from collections.abc import Callable
1211
from concurrent.futures import Future, ThreadPoolExecutor
@@ -622,7 +621,7 @@ def __init__(
622621
fetcher: Fetcher,
623622
size: int,
624623
data: dict[tuple[int, int], bytes] | None = None,
625-
strict: bool = True,
624+
strict: bool = False,
626625
**_: Any,
627626
):
628627
super().__init__(blocksize, fetcher, size)
@@ -646,50 +645,65 @@ def __init__(
646645
else:
647646
self.data = {}
648647

648+
@property
649+
def size(self):
650+
return sum(_[1] - _[0] for _ in self.data)
651+
652+
@size.setter
653+
def size(self, value):
654+
pass
655+
656+
@property
657+
def nblocks(self):
658+
return len(self.data)
659+
660+
@nblocks.setter
661+
def nblocks(self, value):
662+
pass
663+
649664
def _fetch(self, start: int | None, stop: int | None) -> bytes:
650665
if start is None:
651666
start = 0
652667
if stop is None:
653668
stop = self.size
669+
self.total_requested_bytes += stop - start
654670

655671
out = b""
656-
for (loc0, loc1), data in self.data.items():
657-
# If self.strict=False, use zero-padded data
658-
# for reads beyond the end of a "known" buffer
672+
started = False
673+
loc_old = 0
674+
for loc0, loc1 in sorted(self.data):
675+
if (loc0 <= start < loc1) and (loc0 <= stop <= loc1):
676+
# entirely within the block
677+
off = start - loc0
678+
self.hit_count += 1
679+
return self.data[(loc0, loc1)][off : off + stop - start]
680+
if stop <= loc0:
681+
break
682+
if started and loc0 > loc_old:
683+
# a gap where we need data
684+
self.miss_count += 1
685+
if self.strict:
686+
raise ValueError
687+
out += b"\x00" * (loc0 - loc_old)
659688
if loc0 <= start < loc1:
689+
# found the start
690+
self.hit_count += 1
660691
off = start - loc0
661-
out = data[off : off + stop - start]
662-
if not self.strict or loc0 <= stop <= loc1:
663-
# The request is within a known range, or
664-
# it begins within a known range, and we
665-
# are allowed to pad reads beyond the
666-
# buffer with zero
667-
out += b"\x00" * (stop - start - len(out))
668-
self.hit_count += 1
669-
return out
670-
else:
671-
# The request ends outside a known range,
672-
# and we are being "strict" about reads
673-
# beyond the buffer
674-
start = loc1
675-
break
676-
677-
# We only get here if there is a request outside the
678-
# known parts of the file. In an ideal world, this
679-
# should never happen
680-
if self.fetcher is None:
681-
# We cannot fetch the data, so raise an error
682-
raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ")
683-
# We can fetch the data, but should warn the user
684-
# that this may be slow
685-
warnings.warn(
686-
f"Read is outside the known file parts: {(start, stop)}. "
687-
f"IO/caching performance may be poor!"
688-
)
689-
logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}")
690-
self.total_requested_bytes += stop - start
692+
out = self.data[(loc0, loc1)][off : off + stop - start]
693+
started = True
694+
elif start < loc0 and stop > loc1:
695+
# the whole block
696+
self.hit_count += 1
697+
out += self.data[(loc0, loc1)]
698+
elif loc0 <= stop <= loc1:
699+
# end block
700+
self.hit_count += 1
701+
return out + self.data[(loc0, loc1)][: stop - loc0]
702+
loc_old = loc1
691703
self.miss_count += 1
692-
return out + super()._fetch(start, stop)
704+
if started and not self.strict:
705+
return out + b"\x00" * (stop - loc_old)
706+
raise ValueError
693707

694708

695709
class UpdatableLRU(Generic[P, T]):

0 commit comments

Comments
 (0)