Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 87 additions & 78 deletions docs/geneva/jobs/contexts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ icon: circle-nodes

Geneva automatically packages and deploys your Python execution environment to its worker nodes. This ensures that distributed execution occurs in the same environment and dependencies as your prototype.

We currently support one processing backend: **Ray**. Geneva jobs can be deployed on a Kubernetes cluster on demand or on an existing Ray cluster.
We currently support one processing backend: **Ray**. There are 3 ways to connect to a Ray cluster:

<Card icon="lightbulb">
If you are using a remote Ray cluster, you will need to have the notebook or script that code is packaged on running the same CPU architecture / OS. By default, Ray clusters are run in Linux. If you host a Jupyter service on a Mac, Geneva will attempt to deploy Mac shared libraries to a Linux cluster and result in "Module not found" errors. You can instead use a hosted Jupyter notebook, or host your Jupyter or Python environment on a Linux VM or container.
</Card>
1. Local Ray
2. KubeRay: create a cluster on demand in your Kubernetes cluster.
3. Existing Ray Cluster

## Ray Auto Connect
## Ray Clusters

To execute jobs without an external Ray cluster, you can just trigger the `Table.backfill` method. This will auto-create a local Ray cluster and is only suitable for prototyping on small datasets. Simply define the UDF, add a column, and trigger the job:
### Local Ray

To execute jobs without an external Ray cluster, you can just trigger the `Table.backfill` method. This will auto-create a Ray cluster on your machine. Because it's on your laptop/desktop, this is only suitable for prototyping on small datasets. But it is the easiest way to get started. Simply define the UDF, add a column, and trigger the job:

<CodeGroup>
```python Python icon="python"
Expand All @@ -28,115 +30,115 @@ tbl.backfill("filename_len")
```
</CodeGroup>

## Existing Ray Cluster

Geneva can execute jobs against an existing Ray cluster. You can define a RayCluster by specifying the address of the cluster and packages needed on your workers.

This approach makes it easy to tailor resource requirements to your particular UDFs.

You can then wrap your table backfill call with the RayCluster context.

<CodeGroup>
```python Python icon="python"
from geneva.runners.ray.raycluster import _HeadGroupSpec, _WorkerGroupSpec
from geneva.runners.ray._mgr import ray_cluster

geneva.connect(my_db_uri)

with ray_cluster(
addr = "ray-head:10001" # replace ray head with the address of your ray head node
skip_site_packages=False, # optionally skip shipping python site packages if already in image
use_portforwarding=False, # Must be False when byo ray cluster
pip=[], # list of pip package or urls to install on each image.
):

tbl.backfill("xy_product")
```
</CodeGroup>

> **Note**: If your Ray cluster is managed by KubeRay, you'll need to setup kubectl port forwarding setup so Geneva can connect.

For more interactive usage, you can use this pattern:

<CodeGroup>
```python Python icon="python"
# this is a k8s pod spec.
raycluster = ray_cluster(...)
raycluster.__enter__() # equivalent of ray.init()

# trigger the backfill on column "filename_len"
tbl.backfill("filename_len")

raycluster.__exit__(None, None, None)
```
</CodeGroup>

## Ray on Kubernetes

Geneva uses KubeRay to deploy Ray on Kubernetes. To do so, you need to create an execution context, which needs two things:

- a Geneva Cluster. This includes the name, resource demands, Docker images, and other Ray configurations
- a Geneva Manifest. This includes the python packages and any local files that each worker will need

These Clusters and Manifests can be persisted and shared between different users.
Geneva will package up your local environment and send it to each worker node, so they'll have access to all the same dependencies as if you ran a simple Python script yourself.

### Define a Cluster
### KubeRay

A Geneva Cluster represents the resource needs, Docker images, and other Ray configurations necessary to run your job. Make sure that the resources requested will fit inside the kuberay cluster you're connecting to.
If you have a Kubernetes cluster with kuberay-operator, you can use Geneva to automatically provision RayClusters. To do so, define a Geneva cluster, representing the resource needs, Docker images, and other Ray configurations necessary to run your job. Make sure your cluster has adequate compute resources to provision the RayCluster. Here is an example Geneva cluster definition:

<CodeGroup>
```python Python icon="python"
import sys
import ray
import geneva
from geneva.cluster.builder import GenevaClusterBuilder
from geneva.cluster import K8sConfigMethod
from geneva.runners.ray.raycluster import get_ray_image

db = geneva.connect(my_db_uri)
db = geneva.connect("s3://my-bucket/my-db")

ray_version = ray.__version__
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
cluster_name = "my_geneva_cluster"
cluster_name = "my-geneva-cluster" # lowercase, numbers, hyphens only
service_account = "my_k8s_service_account" # k8s service account bound geneva runs as
k8s_namespace = "geneva" # k8s namespace

cluster = GenevaClusterBuilder()
cluster = (
GenevaClusterBuilder()
.name(cluster_name)
.namespace(k8s_namespace)
.portforwarding(True) # required for kuberay to expose ray ports
.aws_config(region="us-east-1") # only required if using AWS
.k8s_config_method(K8sConfigMethod.LOCAL) # Load k8s config from `~/.kube.config`
.config_method(K8sConfigMethod.LOCAL) # Load k8s config from `~/.kube.config`
# (other options include EKS_AUTH to load from AWS EKS, or IN_CLUSTER to load the
# config when running inside a pod in the cluster)
.head_group(
service_account=service_account,
image=get_ray_image(ray_version, python_version)
cpus=2,
node_selector={"geneva.lancedb.com/ray-head":""}, # k8s label required for head in your cluster
)
.add_cpu_worker_group(
cpus=4,
memory="8Gi",
service_account=service_account,
image=get_ray_image(ray_version, python_version)
node_selector={"geneva.lancedb.com/ray-worker-cpu":""}, # k8s label for cpu worker in your cluster
)
.add_gpu_worker_group(
cpus=2,
memory="8Gi",
gpus=1,
service_account=service_account,
image=get_ray_image(ray_version, python_version, gpu=True) # Note the GPU image here
image=get_ray_image(ray_version, python_version, gpu=True), # Note the GPU image here
node_selector={"geneva.lancedb.com/ray-worker-gpu":""}, # k8s label for gpu worker in your cluster
)
.build()
)

db.define_cluster("my_geneva_cluster", cluster)
db.define_cluster(cluster_name, cluster)
# define_cluster stores the cluster metadata in persistent storage. The Cluster can then be referenced by name and provisioned when creating an execution context.

table = db.get_table("my_table")
with db.context(cluster=cluster_name):
table.backfill("my_udf")
```
See [the API docs](https://lancedb.github.io/geneva/api/cluster/) for all the parameters GenevaClusterBuilder can use.

</CodeGroup>

### External Ray cluster
If you already have a Ray cluster, Geneva can execute jobs against it too. You do so by defining a Geneva cluster which has the address of the cluster. Here's an example:

<CodeGroup>
```python Python icon="python"
import geneva
from geneva.cluster.builder import GenevaClusterBuilder
from geneva.cluster import GenevaClusterType

db = geneva.connect(my_db_uri)
cluster_name = "my-geneva-external-cluster"

cluster = (
GenevaClusterBuilder()
.name(cluster_name)
.cluster_type(GenevaClusterType.EXTERNAL_RAY)
.ray_address("ray://my_ip:my_port")
.portforwarding(False) # This must be False when using an external Ray cluster
.build()
)
db.define_cluster(cluster_name, cluster)

```
</CodeGroup>

## Dependencies

Most UDFs require some dependencies: helper libraries like `pillow` for image processing, pre-trained models like `open-clip` to calculate embeddings, or even small config files. We have two ways to get them to workers:

1. Use defaults
2. Define a manifest

### Use Defaults
By default, LanceDB packages your local environment and sends it to Ray workers. This includes your local Python `site-packages` (defined by `site.getsitepackages()`) and either the current workspace root (if you're in a python repo) or the current working directory (if you're not). If you don't explicitly define a manifest, this is what will happen.

### Define a Manifest

A Geneva Manifest represents the files and dependencies used in the execution environment. Calling `define_manifest()` packages files in the local environment and stores the Manifest metadata and files in persistent storage.
The Manifest can then be referenced by name when creating an execution context. Persistent Manifests allow for deterministic execution environments that can be shared and reused.
Sometimes you need more control over what the workers get access to. For example:
- you might need to include files from another directory, or another python package
- you might not want to send all your local dependencies (if your repo has lots of dependencies but your UDF will only need a few)
- you might need packages to be built separately for the worker's architecture (for example, you can't build `pyarrow` on a Mac and run it on a Linux Ray worker).
- you might want to reuse dependencies between two backfill jobs, so you know they are running with the same environment.

For these use cases, you can define a Manifest. Calling `define_manifest()` packages files in the local environment and stores the Manifest metadata and files in persistent storage. The Manifest can then be referenced by name, shared, and reused.

<CodeGroup>
```python Python icon="python"
Expand All @@ -152,29 +154,36 @@ manifest = (
.pip(["lancedb", "numpy"])
.py_modules(["my_module"])
).build()
)

db.define_manifest(manifest_name, manifest)
```
</CodeGroup>

This manifest contains a couple ways to get files and resources to the Ray workers:
- local environment: Everything in your local environment, including your local working directory and python `site_packages`, will be zipped and sent to workers.
- you can see these zip files by setting `.local_zip_output_dir(path)` in the builder, or set `.delete_local_zips(True)` if you don't care
- you can set `skip_site_packages=True` if you don't want to upload your local `site_packages`. This is useful, for example, if you’re developing on an ARM64 machine (e.g., Apple Silicon Macs) and want to avoid sending ARM64 prebuilt packages to x86-64 Ray nodes. In that case, you will probably need `pip` and `py_modules`:
- `pip` and `py_modules`: packages that you want to be installed, but are not installed locally. These are passed in to Ray's [RuntimeEnv](https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html#ray-runtime-env-runtimeenv), which has more details about how they are included. In short, `pip` is a list of packages that you want it to install from pip, and `py_modules` is a list of local or remote zip files that Ray unzips and adds to workers' PYTHONPATHs.
What's in a manifest and how can you define it? (methods are all on `GenevaManifestBuilder`)

|Contents|How you can define it|
|---|---|
|Local python packages|Will be uploaded automatically, unless you set `.skip_site_packages(True)`.|
|Local working directory (or workspace root, if in a python repo)|Will be uploaded automatically.|
|Python packages to be installed with `pip`|Use `.pip(packages: list[str])` or `.add_pip(package: str)`. See [Ray's RuntimeEnv docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html) for details.|
|Local python packages outside of `site_packages`|Use `.py_modules(modules: list[str])` or `.add_py_module(module: str)`. See [Ray's RuntimeEnv docs](https://docs.ray.io/en/latest/ray-core/api/doc/ray.runtime_env.RuntimeEnv.html) for details.|
|Container image for head node|Use `.head_image(head_image: str)` or `default_head_image()` to use the default. Note that, if the image is also defined in the GenevaCluster, the image set here in the Manifest will take priority.|
|Container image for worker nodes|Use `.worker_image(worker_image: str)` or `default_worker_image()` to use the default for the current platform. As with the head image, this takes priority over any images set in the Cluster.|

If you want to see exactly what is being uploaded to the cluster, set `.delete_local_zips(False)` and `.local_zip_output_dir(path)` then examine the zip files in `path`.

### Create an Execution Context
## Putting it all together: Execution Contexts

An execution context represents the concrete execution environment used to execute a distributed Job.
An execution context represents the concrete execution environment (Cluster and Manifest) used to execute a distributed job.

Calling `context` will enter a context manager that will provision an execution cluster and execute the Job using the Cluster and Manifest definitions provided. Once completed, the context manager will automatically de-provision the cluster.
Calling `context` will enter a context manager that will provision an execution cluster and execute the Job using the Cluster and Manifest definitions provided. Because you've already defined the cluster and manifest, you can just reference them by name. Note that providing a manifest is optional. Once completed, the context manager will automatically de-provision the cluster.

<CodeGroup>
```python Python icon="python"
db = geneva.connect(my_db_uri)
tbl = db.get_table("my_table")

# Providing a manifest is optional; if not provided, it will work as described in "Use defaults" above.
with db.context(cluster=cluster_name, manifest=manifest_name):
tbl.backfill("embedding")
```
Expand Down
Loading