Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@
Pipeline scripts and tools for reconstructing Electron Microscopy volumes.

### Setup
1. If necessary, [install miniconda](https://docs.conda.io/en/latest/miniconda.html).
1. If necessary, [install pixi](https://pixi.sh/latest/#installation).

### Create Environment and Install
### Create Environment and Install
```bash
git clone https://github.com/JaneliaSciComp/EM_recon_pipeline.git

cd EM_recon_pipeline

conda env create -f janelia_emrp.environment.yml
conda activate janelia_emrp
pixi install
```

### Running Scripts
```bash
# Run a script using the pixi environment
pixi run python src/python/janelia_emrp/fibsem/dat_converter.py --help

poetry install
# Or activate a shell with the environment
pixi shell
python src/python/janelia_emrp/fibsem/dat_converter.py --help
```

### Development Library Management
- Using conda with poetry as described
[here](https://ealizadeh.com/blog/guide-to-python-env-pkg-dependency-using-conda-poetry).
- To change/update dependencies, edit [pyproject.toml](pyproject.toml)
or use [poetry add](https://python-poetry.org/docs/cli/#add) and then run `poetry install`.
- Environment is managed using [pixi](https://pixi.sh/) with dependencies defined in [pyproject.toml](pyproject.toml).
- To add/update dependencies, edit the `[project]` or `[tool.pixi.dependencies]` sections in `pyproject.toml` and run `pixi install`.
10 changes: 0 additions & 10 deletions janelia_emrp.environment.yml

This file was deleted.

8,342 changes: 8,342 additions & 0 deletions pixi.lock

Large diffs are not rendered by default.

1,901 changes: 0 additions & 1,901 deletions poetry.lock

This file was deleted.

60 changes: 40 additions & 20 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,28 +1,48 @@
[tool.poetry]
[project]
name = "janelia-emrp"
version = "0.1.0"
version = "0.2.0"
description = "Pipeline scripts and tools for reconstructing Electron Microscopy volumes."
authors = ["Eric Trautman <[email protected]>"]
license = 'BSD-3-Clause'
authors = [{name = "Eric Trautman", email = "[email protected]"}]
license = "BSD-3-Clause"
requires-python = ">= 3.12"
dependencies = [
"fibsem-tools==7.0.5",
"dask-jobqueue", # For LSFCluster support (dask-janelia functionality vendored locally)
"xarray-multiscale>=0.3.3",
"render-python @ git+https://github.com/AllenInstitute/render-python.git",
"matplotlib",
"google-cloud-storage>=2.19.0",
"opencv-python>=4.10.0",
"basicpy"
]

[tool.poetry.dependencies]
python = "^3.9"
fibsem-tools = "0.3.1"
render-python = {git = "https://github.com/AllenInstitute/render-python.git"}
dask-janelia = "^0.1.4"
bokeh = "^2.3.3"
h5py = "^3.6.0"
pytest = "^7.1.1"
xarray-multiscale = "0.3.1"
pydantic = "^1.9.0"
numpy = "1.21.0"
scikit-image = "0.19.3"
[build-system]
build-backend = "hatchling.build"
requires = ["hatchling"]

[tool.poetry.dev-dependencies]
[tool.hatch.build.targets.wheel]
packages = ["src/python/janelia_emrp"]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.hatch.metadata]
allow-direct-references = true

[tool.pixi.workspace]
channels = ["conda-forge"]
platforms = ["osx-arm64", "osx-64", "linux-64", "win-64"]

[tool.pixi.pypi-dependencies]
janelia-emrp = { path = ".", editable = true }

[tool.pixi.dependencies]
python = ">=3.12"
urllib3 = ">=1.26,<2.0"
bokeh = ">=2.3.3,<4.0.0"
h5py = ">=3.6.0"
pandas = ">=1.5.0,<3.0.0"
pytest = ">=7.1.1"
pydantic = ">=2.7.0"
numpy = ">=1.26.0,<2.0.0"
scikit-image = ">=0.22.0"

# see https://docs.pytest.org/en/stable/how-to/capture-warnings.html
[tool.pytest.ini_options]
Expand Down
189 changes: 189 additions & 0 deletions src/python/janelia_emrp/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""
Dask cluster utilities for Janelia Research Campus compute cluster.

This module provides convenience functions for creating dask clusters,
originally from the dask-janelia package (https://github.com/janelia-cosem/dask-janelia).
Vendored here to avoid dependency version conflicts.
"""

from shutil import which
from distributed import LocalCluster
from dask_jobqueue import LSFCluster
import os
from pathlib import Path
import dask
from typing import Union, List, Dict, Any, Optional
import warnings

dask.config.set({"jobqueue.lsf.use-stdin": True})

threading_env_vars = [
"NUM_MKL_THREADS",
"OPENBLAS_NUM_THREADS",
"OPENMP_NUM_THREADS",
"OMP_NUM_THREADS",
]


def make_single_threaded_env_vars(threads: int) -> List[str]:
return [f"export {var}={threads}" for var in threading_env_vars]


def bsub_available() -> bool:
"""Check if the `bsub` shell command is available

Returns True if the `bsub` command is available on the path, False otherwise. This is used to check whether code is
running on the Janelia Compute Cluster.
"""
result = which("bsub") is not None
return result


def get_LSFCLuster(
threads_per_worker: int = 1,
walltime: str = "1:00",
death_timeout: str = "600s",
**kwargs,
) -> LSFCluster:
"""Create a dask_jobqueue.LSFCluster for use on the Janelia Research Campus compute cluster.

This function wraps the class dask_jobqueue.LSFCLuster and instantiates this class with some sensible defaults,
given how the Janelia cluster is configured.

This function will add environment variables that prevent libraries (OPENMP, MKL, BLAS) from running multithreaded routines with parallelism
that exceeds the number of requested cores.

Additional keyword arguments added to this function will be passed to the dask_jobqueue.LSFCluster constructor.

Parameters
----------
threads_per_worker: int
Number of cores to request from LSF. Directly translated to the `cores` kwarg to LSFCluster.
walltime: str
The expected lifetime of a worker. Defaults to one hour, i.e. "1:00"
death_timeout: str
The duration for the scheduler to wait for workers before flagging them as dead, e.g. "600s". For jobs with a large number of workers,
LSF may take a long time (minutes) to request workers. This timeout value must exceed that duration, otherwise the scheduler will
flag these slow-to-arrive workers as unresponsive and kill them.
**kwargs:
Additional keyword arguments passed to the LSFCluster constructor

Examples
--------

>>> cluster = get_LSFCLuster(threads_per_worker=2, project="scicompsoft", queue="normal")

"""

if "job_script_prologue" not in kwargs:
kwargs["job_script_prologue"] = []

kwargs["job_script_prologue"].extend(make_single_threaded_env_vars(threads_per_worker))

USER = os.environ["USER"]
HOME = os.environ["HOME"]

if "local_directory" not in kwargs:
# The default local scratch directory on the Janelia Cluster
kwargs["local_directory"] = f"/scratch/{USER}/"

if "log_directory" not in kwargs:
log_dir = f"{HOME}/.dask_distributed/"
Path(log_dir).mkdir(parents=False, exist_ok=True)
kwargs["log_directory"] = log_dir

# Memory is required by dask_jobqueue but not meaningful for slot-based LSF clusters
# Set a default that satisfies the library without affecting job submission
if "memory" not in kwargs:
kwargs["memory"] = "16GB"

cluster = LSFCluster(
cores=threads_per_worker,
walltime=walltime,
death_timeout=death_timeout,
**kwargs,
)
return cluster


def get_LocalCluster(threads_per_worker: int = 1, n_workers: int = 0, **kwargs):
"""
Creata a distributed.LocalCluster with defaults that make it more similar to a deployment on the Janelia Compute cluster.
This function is a light wrapper around the distributed.LocalCluster constructor.

Parameters
----------
n_workers: int
The number of workers to start the cluster with. This defaults to 0 here.
threads_per_worker: int
The number of threads to assign to each worker.
**kwargs:
Additional keyword arguments passed to the LocalCluster constructor
Examples
--------

>>> cluster = get_LocalCluster(threads_per_worker=8)
"""
return LocalCluster(
n_workers=n_workers, threads_per_worker=threads_per_worker, **kwargs
)


def get_cluster(
threads_per_worker: int = 1,
deployment: Optional[str] = None,
local_kwargs: Dict[str, Any] = {},
lsf_kwargs: Dict[str, Any] = {},
) -> Union[LSFCluster, LocalCluster]:

"""Convenience function to generate a dask cluster on either a local machine or the compute cluster.

Create a distributed.Client object backed by either a dask_jobqueue.LSFCluster (for use on the Janelia Compute Cluster)
or a distributed.LocalCluster (for use on a single machine). This function uses the output of the bsubAvailable function
to determine whether code is running on the compute cluster or not.
Additional keyword arguments given to this function will be forwarded to the constructor for the Client object.

Parameters
----------
threads_per_worker: int
Number of threads per worker. Defaults to 1.

deployment: str or None
Which deployment (LocalCluster or LSFCluster) to prefer. If deployment=None, then LSFCluster is preferred, but LocalCluster is used if
bsub is not available. If deployment='lsf' and bsub is not available, an error is raised.
local_kwargs: dict
Dictionary of keyword arguments for the distributed.LocalCluster constructor
lsf_kwargs: dict
Dictionary of keyword arguments for the dask_jobqueue.LSFCluster constructor
"""

if "cores" in lsf_kwargs:
warnings.warn(
"The `cores` kwarg for LSFCLuster has no effect. Use the `threads_per_worker` argument instead."
)

if "threads_per_worker" in local_kwargs:
warnings.warn(
"the `threads_per_worker` kwarg was found in `local_kwargs`. It will be overwritten with the `threads_per_worker` argument to this function."
)

if deployment is None:
if bsub_available():
cluster = get_LSFCLuster(threads_per_worker, **lsf_kwargs)
else:
cluster = get_LocalCluster(threads_per_worker, **local_kwargs)
elif deployment == "lsf":
if bsub_available():
cluster = get_LSFCLuster(threads_per_worker, **lsf_kwargs)
else:
raise EnvironmentError(
"You requested an LSFCluster but the command `bsub` is not available."
)
elif deployment == "local":
cluster = get_LocalCluster(threads_per_worker, **local_kwargs)
else:
raise ValueError(
f'deployment must be one of (None, "lsf", or "local"), not {deployment}'
)

return cluster
2 changes: 1 addition & 1 deletion src/python/janelia_emrp/dask_bag_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dask.bag as dask_bag
import sys
import time
from dask_janelia import get_cluster
from janelia_emrp.cluster import get_cluster
from distributed import Client

logger = logging.getLogger()
Expand Down
2 changes: 1 addition & 1 deletion src/python/janelia_emrp/fibsem/dat_clipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path

import numpy as np
from fibsem_tools.io.fibsem import OFFSET
from fibsem_tools.io.dat import OFFSET

from janelia_emrp.fibsem.dat_to_scheffer_8_bit import compress_and_save
from janelia_emrp.root_logger import init_logger
Expand Down
4 changes: 2 additions & 2 deletions src/python/janelia_emrp/fibsem/dat_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import math
import sys
import time
from dask_janelia import get_cluster
from janelia_emrp.cluster import get_cluster
from distributed import Client

from janelia_emrp.fibsem.cyx_dat import CYXDat, new_cyx_dat
Expand Down Expand Up @@ -409,7 +409,7 @@ def main(arg_list: list[str]):

args = parser.parse_args(arg_list)

convert_volume(volume_transfer_info=VolumeTransferInfo.parse_file(args.volume_transfer_info),
convert_volume(volume_transfer_info=VolumeTransferInfo.model_validate_json(Path(args.volume_transfer_info).read_text()),
num_workers=args.num_workers,
parent_work_dir=args.parent_work_dir,
first_dat=args.first_dat,
Expand Down
2 changes: 1 addition & 1 deletion src/python/janelia_emrp/fibsem/dat_to_h5_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import h5py
import numpy as np
import sys
from fibsem_tools.io.fibsem import OFFSET, MAGIC_NUMBER
from fibsem_tools.io.dat import OFFSET, MAGIC_NUMBER
from h5py import Dataset, Group
from xarray_multiscale import multiscale
from xarray_multiscale.reducers import windowed_mean
Expand Down
4 changes: 2 additions & 2 deletions src/python/janelia_emrp/fibsem/h5_raw_to_align.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import numpy as np
import sys
import time
from dask_janelia import get_cluster
from janelia_emrp.cluster import get_cluster
from distributed import Client

from janelia_emrp.fibsem.cyx_dat import CYXDat
Expand Down Expand Up @@ -288,7 +288,7 @@ def main(arg_list: list[str]):

args = parser.parse_args(arg_list)

convert_volume(volume_transfer_info=VolumeTransferInfo.parse_file(args.volume_transfer_info),
convert_volume(volume_transfer_info=VolumeTransferInfo.model_validate_json(Path(args.volume_transfer_info).read_text()),
num_workers=args.num_workers,
parent_work_dir=args.parent_work_dir,
first_h5=args.first_h5,
Expand Down
4 changes: 2 additions & 2 deletions src/python/janelia_emrp/fibsem/h5_to_render.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import dask.bag as db
import h5py
import renderapi
from dask_janelia import get_cluster
from janelia_emrp.cluster import get_cluster

from janelia_emrp.fibsem.dat_path import DatPath, new_dat_path
from janelia_emrp.fibsem.dat_to_h5_writer import DAT_FILE_NAME_KEY
Expand Down Expand Up @@ -571,7 +571,7 @@ def main(arg_list):

args = parser.parse_args(args=arg_list)

volume_transfer_info: VolumeTransferInfo = VolumeTransferInfo.parse_file(args.volume_transfer_info)
volume_transfer_info: VolumeTransferInfo = VolumeTransferInfo.model_validate_json(Path(args.volume_transfer_info).read_text())

if volume_transfer_info.cluster_root_paths is None:
raise ValueError(f"cluster_root_paths not defined in {args.volume_transfer_info}")
Expand Down
Loading