"""Functionality to convert the CAMUS dataset to the zea format.
.. note::
Requires SimpleITK to be installed: ``pip install SimpleITK``.
The CAMUS (Cardiac Acquisitions for Multi-structure Ultrasound Segmentation)
dataset contains 2D echocardiographic sequences from 500 patients.
The sequences are stored in NIfTI (.nii.gz) format.
The dataset can be downloaded automatically using the ``--download`` flag::
python -m zea.data.convert camus <source_folder> <destination_folder> --download
**Links**:
- `Original dataset <https://humanheart-project.creatis.insa-lyon.fr/database/#collection/6373703d73e9f0047faa1bc8>`_
"""
from __future__ import annotations
import logging
import os
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
import numpy as np
import scipy
from skimage.transform import resize
from tqdm import tqdm
from zea import log
from zea.data.convert.utils import download_from_girder, sitk_load, unzip
from zea.data.data_format import generate_zea_dataset
from zea.func.tensor import translate
from zea.internal.utils import find_first_nonzero_index
# Girder collection ID for the CAMUS dataset
_CAMUS_COLLECTION_ID = "6373703d73e9f0047faa1bc8"
[docs]
def process_camus(source_path, output_path, overwrite=False):
"""Converts the camus database to the zea format.
Args:
source_path (str, pathlike): The path to the original camus file.
output_path (str, pathlike): The path to the output file.
overwrite (bool, optional): Set to True to overwrite existing file.
Defaults to False.
"""
# Check if output file already exists and remove
if os.path.exists(output_path):
if overwrite:
os.remove(output_path)
else:
logging.warning("Output file already exists. Skipping conversion.")
return
# Open the file
image_seq, _ = sitk_load(source_path)
# Convert to polar coordinates
image_seq_polar = []
for image in image_seq:
image_seq_polar.append(transform_sc_image_to_polar(image))
image_seq_polar = np.stack(image_seq_polar, axis=0)
# Change range to [-60, 0] dB
image_seq = translate(image_seq, (0, 255), (-60, 0))
image_seq_polar = translate(image_seq_polar, (0, 255), (-60, 0))
generate_zea_dataset(
path=output_path,
image=image_seq_polar,
image_sc=image_seq,
probe_name="generic",
description="camus dataset converted to zea format",
)
splits = {"train": [1, 401], "val": [401, 451], "test": [451, 501]}
[docs]
def get_split(patient_id: int) -> str:
"""
Determine which dataset split a patient ID belongs to.
Args:
patient_id: Integer ID of the patient.
Returns:
The split name: "train", "val", or "test".
Raises:
ValueError: If the patient_id does not fall into any defined split range.
"""
if splits["train"][0] <= patient_id < splits["train"][1]:
return "train"
elif splits["val"][0] <= patient_id < splits["val"][1]:
return "val"
elif splits["test"][0] <= patient_id < splits["test"][1]:
return "test"
else:
raise ValueError(f"Did not find split for patient: {patient_id}")
def _process_task(task):
"""
Unpack a task tuple and invoke process_camus in a worker process.
Creates parent directories for the target outputs, calls process_camus
with the unpacked paths, and logs then re-raises any exception raised by processing.
Args:
task (tuple): (source_file_str, output_file_str)
- source_file_str: filesystem path to the source CAMUS file as a string.
- output_file_str: filesystem path for the ZEA output file as a string.
"""
source_file_str, output_file_str = task
source_file = Path(source_file_str)
output_file = Path(output_file_str)
# Ensure destination directories exist (safe to call from multiple processes)
output_file.parent.mkdir(parents=True, exist_ok=True)
# Call the real processing function (must be importable in the worker)
# If process_camus lives in another module, import it there instead.
try:
process_camus(source_file, output_file, overwrite=False)
except Exception:
# Log and re-raise so the main process can handle it
log.error("Error processing %s", source_file)
raise
[docs]
def download_camus( # pragma: no cover
destination: str | Path, patients: list[int] | None = None
) -> Path:
"""Download the CAMUS dataset from the Girder server.
Downloads NIfTI files for each patient.
Args:
destination: Directory where the dataset will be downloaded.
patients: List of patient IDs to download (1-500).
If None, all patients are downloaded.
Returns:
Path to the downloaded dataset directory.
"""
return download_from_girder(
collection_id=_CAMUS_COLLECTION_ID,
destination=destination,
dataset_name="CAMUS",
patients=patients,
top_folder_name="database_nifti",
)
[docs]
def convert_camus(args):
"""Convert the CAMUS dataset into zea HDF5 files across dataset splits.
Processes files found under the CAMUS source folder (after unzipping or
downloading if needed), assigns each patient to a train/val/test split,
creates matching output paths, and executes per-file conversion tasks
either serially or in parallel.
Usage::
python -m zea.data.convert camus <source_folder> <destination_folder>
python -m zea.data.convert camus <source_folder> <destination_folder> --download
Args:
args (argparse.Namespace): An object with attributes:
- src (str | Path): Path to the CAMUS archive or extracted folder,
or a directory to download into when ``--download`` is set.
- dst (str | Path): Root destination folder for ZEA HDF5 outputs;
split subfolders will be created.
- download (bool, optional): If True, download the dataset first from the
Girder server.
- no_hyperthreading (bool, optional): If True, run tasks serially instead
of using a process pool.
"""
camus_source_folder = Path(args.src)
camus_output_folder = Path(args.dst)
# Optionally download the dataset
if getattr(args, "download", False):
camus_source_folder = download_camus(camus_source_folder)
elif not camus_source_folder.exists():
raise FileNotFoundError(
f"Source folder does not exist: {camus_source_folder}. "
"Use --download to download the CAMUS dataset automatically."
)
else:
# Look for either CAMUS_public.zip or folders database_nifti, database_split
camus_source_folder = unzip(camus_source_folder, "camus")
# check if output folders already exist
for split in splits:
assert not (camus_output_folder / split).exists(), (
f"Output folder {camus_output_folder / split} exists. Exiting program."
)
# clone folder structure of source to output using pathlib
files = list(camus_source_folder.glob("**/*_half_sequence.nii.gz"))
tasks = []
for source_file in files:
patient = source_file.stem.split("_")[0]
patient_id = int(patient.removeprefix("patient"))
split = get_split(patient_id)
output_file = camus_output_folder / split / source_file.relative_to(camus_source_folder)
# Replace .nii.gz with .hdf5
output_file = output_file.with_suffix("").with_suffix(".hdf5")
# make sure folder exists
output_file.parent.mkdir(parents=True, exist_ok=True)
tasks.append((str(source_file), str(output_file)))
if not tasks:
log.info("No files found to process.")
return
if getattr(args, "no_hyperthreading", False):
log.info("no_hyperthreading is True — running tasks serially (no ProcessPoolExecutor)")
for t in tqdm(tasks, desc="Processing files (serial)"):
try:
_process_task(t)
except Exception as e:
log.error("Task processing failed: %s", e)
log.info("Processing finished for %d files (serial)", len(tasks))
return
# Submit tasks to the process pool and track progress
with ProcessPoolExecutor() as exe:
for _ in tqdm(exe.map(_process_task, tasks), total=len(tasks), desc="Processing files"):
pass
log.info("Processing finished for %d files", len(tasks))