"""
Script to convert the PICMUS database to the zea format.
For more information about the dataset, resort to the following links:
- The original dataset can be found at `this link <https://www.creatis.insa-lyon.fr/Challenge/IEEE_IUS_2016/download>`_.
"""
import logging
import os
from pathlib import Path
import h5py
import numpy as np
from zea import log
from zea.beamform.delays import compute_t0_delays_planewave
from zea.data.convert.utils import unzip
from zea.data.data_format import generate_zea_dataset
[docs]
def convert(source_path, output_path, overwrite=False):
"""
Converts and writes a single PICMUS file to the zea format.
Args:
source_path (str, pathlike): The path to the original PICMUS file.
output_path (str, pathlike): The path to the output file.
overwrite (bool, optional): Set to True to overwrite existing file.
Defaults to False.
"""
# Check if output file already exists and remove
if os.path.exists(output_path):
if overwrite:
os.remove(output_path)
else:
logging.warning("Output file already exists. Skipping conversion.")
return
# Open the file
file = h5py.File(source_path, "r")
# Get the group containing the dataset
file = file["US"]["US_DATASET0000"]
if "data" not in file:
raise ValueError("The file does not contain the data group.")
# Extract I- and Q-data (shape (tx, el, ax))
i_data = file["data"]["real"][:]
q_data = file["data"]["imag"][:]
if np.abs(np.sum(q_data)) < 0.1:
# Use only the I-data, add dummy dimension (shape (tx, el, ax, ch=1))
raw_data = i_data[..., None]
else:
# Stack I- and Q-data (shape (tx, el, ax, 2))
raw_data = np.stack([i_data, q_data], axis=-1)
# Add dummy frame dimension (shape (frame=1, tx, el, ax, ch=1))
raw_data = raw_data[None]
raw_data = np.transpose(raw_data, (0, 1, 3, 2, 4))
_, n_tx, _, n_el, _ = raw_data.shape
center_frequency = int(file["modulation_frequency"][:][0])
# Fix a mistake in one of the PICMUS files
if center_frequency == 0:
center_frequency = 5.208e6
sampling_frequency = int(file["sampling_frequency"][:][0])
probe_geometry = np.transpose(file["probe_geometry"][:], (1, 0))
sound_speed = float(file["sound_speed"][:][0])
focus_distances = np.zeros((n_tx,), dtype=np.float32)
polar_angles = file["angles"][:]
azimuth_angles = np.zeros((n_tx,), dtype=np.float32)
t0_delays = np.zeros((n_tx, n_el), dtype=np.float32)
tx_apodizations = np.ones((n_tx, n_el), dtype=np.float32)
initial_times = np.zeros((n_tx,))
for n in range(n_tx):
v = np.array([np.sin(polar_angles[n]), 0, np.cos(0)])
initial_times[n] = -np.min(np.sum(probe_geometry * v[None], axis=1)) / sound_speed
t0_delays[n] = compute_t0_delays_planewave(
probe_geometry=probe_geometry,
polar_angles=polar_angles[n],
sound_speed=sound_speed,
)
# This line changes the data format to work with the old beamformer,
# which is not in accordance with the new zea format
generate_zea_dataset(
path=output_path,
raw_data=raw_data,
center_frequency=center_frequency,
sampling_frequency=sampling_frequency,
probe_geometry=probe_geometry,
initial_times=initial_times,
sound_speed=sound_speed,
t0_delays=t0_delays,
focus_distances=focus_distances,
polar_angles=polar_angles,
azimuth_angles=azimuth_angles,
tx_apodizations=tx_apodizations,
probe_name="verasonics_l11_4v",
description="PICMUS dataset converted to zea format",
)
[docs]
def convert_picmus(args):
"""
Convert PICMUS HDF5 files under a source directory into the zea dataset format,
preserving relative paths in the destination.
Args:
args (argparse.Namespace): An object with the following attributes.
- src (str or Path): Path to the PICMUS source directory or archive.
- dst (str or Path): Path to the output directory where converted .hdf5 files
will be written.
Note:
- Scans `src` (after unzipping if needed) for `.hdf5` files containing IQ/RF data and
converts each to the zea format.
- Preserves the relative directory structure under `dst` and places each converted
file in its own subdirectory named after the file stem.
- Fails fast if `src` does not exist or if `dst` already exists.
"""
# Get the source and output directories
base_dir = Path(args.src)
dst = Path(args.dst)
# Check if the source directory exists and create the output directory
assert base_dir.exists(), f"Source directory {base_dir} does not exist."
assert not dst.exists(), f"Destination directory {dst} already exists, Exiting."
# Unzip the PICMUS dataset if necessary
base_dir = unzip(base_dir, "picmus")
dst.mkdir(parents=True, exist_ok=False)
# Traverse the source directory and convert all files
for file in base_dir.rglob("*.hdf5"):
str_file = str(file)
# Select only the data files that actually contain rf or iq data
# (There are also files containing the geometry of the phantoms or
# images)
is_data_file = str_file.endswith("iq.hdf5") or str_file.endswith("rf.hdf5")
if not is_data_file or "img" in str_file:
log.info("Skipping %s", file.name)
continue
log.info("Converting %s", file.name)
# Find the folder relative to the base directory to retain the
# folder structure in the output directory
output_file = dst / file.relative_to(base_dir)
# Define the output path
# NOTE: I added output_file.stem to put each file in its own
# folder. This makes it possible to use it as a dataset because
# it ensures there are never different types of data file in
# the same folder.
output_file = output_file.parent / output_file.stem / f"{output_file.stem}.hdf5"
# Convert the file
try:
# Create the output directory if it does not exist already
output_file.parent.mkdir(parents=True, exist_ok=True)
convert(file, output_file, overwrite=True)
except Exception:
output_file.parent.rmdir()
log.error("Failed to convert %s", str_file)
continue