Skip to content

Conversation

@ChenZiHong-Gavin
Copy link
Collaborator

@ChenZiHong-Gavin ChenZiHong-Gavin commented Nov 20, 2025

This PR refactors the GraphGen engine to a streaming-first architecture with explicit back-pressure, eliminating the long-standing risk of out-of-memory (OOM) crashes when datasets no longer fit in RAM.

@ChenZiHong-Gavin ChenZiHong-Gavin changed the title feat: add bucket for map and all-reduce feat: introduce streaming ops and backpressure to prevent OOM Nov 20, 2025
@ChenZiHong-Gavin
Copy link
Collaborator Author

ChenZiHong-Gavin commented Nov 21, 2025

https://docs.ray.io/en/latest/ maybe better.

Comment on lines +82 to +98
# if not path.exists():
# raise FileNotFoundError(f"[Read] input_path not found: {input_file}")
#
# if allowed_suffix is None:
# support_suffix = set(_MAPPING.keys())
# else:
# support_suffix = {s.lower().lstrip(".") for s in allowed_suffix}
#
# # single file
# if path.is_file():
# suffix = path.suffix.lstrip(".").lower()
# if suffix not in support_suffix:
# logger.warning(
# "[Read] Skip file %s (suffix '%s' not in allowed_suffix %s)",
# path,
# suffix,
# support_suffix,
Comment on lines +108 to +118
# for p in path.rglob("*"):
# if p.is_file() and p.suffix.lstrip(".").lower() in support_suffix:
# try:
# suffix = p.suffix.lstrip(".").lower()
# reader = _build_reader(suffix, cache_dir)
# logger.info("[Reader] Reading file %s", p)
# docs = reader.read(str(p))
# if docs:
# yield docs
# except Exception: # pylint: disable=broad-except
# logger.exception("[Reader] Error reading %s", p)
Comment on lines +1 to +3
from ray.data.datasource import (
Datasource, ReadTask
)
from ray.data.datasource import (
Datasource, ReadTask
)
import pyarrow as pa
Datasource, ReadTask
)
import pyarrow as pa
from typing import List, Dict, Any, Optional, Union
import pyarrow as pa
from typing import List, Dict, Any, Optional, Union

from typing import Iterator, List, Optional
'cached_at': time.time()
})
logger.info(f"Cached scan result for: {path}")
except OSError:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants