Source code for ewoksid13.tasks.diffmap

import json
import logging
from pathlib import Path

import h5py
import matplotlib.image
import numpy as np
from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore
from ewokscore import Task
from ewoksxrpd.tasks.utils.data_utils import create_hdf5_link
from pyFAI.io.integration_config import WorkerConfig
from scipy.ndimage import zoom
from silx.io import h5py_utils

logger = logging.getLogger(__name__)


def _generate_available_h5group_name(parent: h5py.Group, base_name: str) -> str:
    """Generate an available group name in the parent group based on the base_name."""
    if base_name not in parent:
        return base_name

    i = 1
    while f"{base_name}_{i:02}" in parent:
        i += 1
    return f"{base_name}_{i:02}"


[docs] class CreateDiffMapFile( Task, input_names=[ "nxdata_url", ], optional_input_names=[ "lima_name", "master_filename", "scan_nb", "external_output_filename", "output_filename", "nxprocess_name", "integration_options", "x_nb_points", "y_nb_points", "scan_memory_url", "normalization_counter", "dark_value", "save_img", "do_diffmap", ], output_names=[ "nxdata_url", "filename_img", ], ): """ Creates a DiffMap NexusProcess compliant with pyFAI-diffmap/view API after an already integrated dataset Input names: nxdata_url : url to stored integrated data. Normally it's stored in the PROCESSED_DATA scan file, example scan_file.h5::16.1/eiger_integrate/integrated Optional input names: lima_name: name of the detector device. For example: "eiger" master_filename: RAW_DATA master filename, kmap information may be stored here scan_nb: number of the scan external_output_filename: external file where to write the diffmap nexus process (usually it's the PROCESSED_DATA scan file) output_filename: file where to write a link to the diffmap nexus process (usually it's the PROCESSED_DATA master file of collection) nxprocess_name: name of the nexus process. If not provided, will be set to "<lima_name>_diffmap" integration_options: dictionary to write as a configuration group x_nb_points: number of points along the x direction. If not provided, will be searched as kmap paramaters in master_filename y_nb_points: number of points along the y direction. If not provided, will be searched as kmap paramaters in master_filename do_diffmap: if False, skip this process * nxdata_url: where to read (integrated_data) * external_output_filename: where to save the diffmap * output_filename: where to link the diffmap """ NXDATA_DIFFMAP_NAME = "diffmap"
[docs] def run(self): self.outputs.nxdata_url = None if not self.get_input_value("do_diffmap", True): return nxdata_url: str = self.inputs.nxdata_url input_filename, nxdata_integrate_path = nxdata_url.split("::") external_output_filename: str = self.get_input_value( "external_output_filename", None ) output_filename: str = self.get_input_value("output_filename", None) if not external_output_filename and not output_filename: external_output_filename = input_filename output_filename = input_filename elif not external_output_filename: external_output_filename = output_filename elif not output_filename: output_filename = external_output_filename # Open the integrated dataset from nxdata_url mode = "r+" if external_output_filename == input_filename else "r" with h5py_utils.open_item( filename=input_filename, name="/", mode=mode, ) as file_input: nxdata_integrate = file_input[nxdata_integrate_path] dset_integrated = nxdata_integrate[nxdata_integrate.attrs["signal"]] if dset_integrated.ndim != 2: logger.error( "Multidimensional diffmap is not implemented. Skipping diffmap." ) return # Shape of dataset nb_frames, nbpt_rad = dset_integrated.shape # Get the shape of the diffraction map x_nb_points = self.get_input_value("x_nb_points", None) y_nb_points = self.get_input_value("y_nb_points", None) master_filename: str = self.get_input_value("master_filename", None) scan_nb = self._get_scan_nb(file_input) if x_nb_points is None or y_nb_points is None: if not master_filename or not scan_nb: raise RuntimeError( "kmap parameters must be provided either as x_nb_points and y_nb_points or as master_filename and scan_number." ) with h5py_utils.open_item( filename=master_filename, name=f"{scan_nb}.1" ) as scan_entry: instrument = scan_entry["instrument"] if "kmap_parameters" not in instrument: raise RuntimeError("Missing kmap parameters in master file.") kmap_parameters = instrument["kmap_parameters"] x_nb_points = kmap_parameters["x_nb_points"][()] y_nb_points = kmap_parameters["y_nb_points"][()] # This block makes Ctrl+C scans break if nb_frames != (x_nb_points * y_nb_points): raise ValueError( f"Number of frames {nb_frames} does not match with kmap points {x_nb_points}*{y_nb_points}={x_nb_points*y_nb_points}" ) # Lima/detector name lima_name: str = self.get_input_value( "lima_name", nxdata_integrate.parent.get("detector_name", None) ) if isinstance(lima_name, h5py.Dataset): lima_name = lima_name[()] if isinstance(lima_name, bytes): lima_name = lima_name.decode("utf-8") nxprocess_diffmap_name = self.get_input_value("nxprocess_name", None) if not nxprocess_diffmap_name and not lima_name: logger.warning( "There is neither nxprocess_name provided and lima_name, a None value will be used for the diffmap process name." ) nxprocess_diffmap_name = ( nxprocess_diffmap_name or f"{lima_name}_{self.NXDATA_DIFFMAP_NAME}" ) # Open the external file the result into external_output_filename with h5py_utils.open_item( filename=external_output_filename, name="/", mode="r+" ) as file_output: scan_entry_name = Path(nxdata_integrate.parent.parent.name).name scan_entry = file_output[scan_entry_name] nxprocess_diffmap_name = _generate_available_h5group_name( parent=scan_entry, base_name=nxprocess_diffmap_name ) nxprocess_diffmap = scan_entry.create_group(nxprocess_diffmap_name) nxprocess_diffmap.attrs.update( { "NX_class": "NXprocess", "program": "diffraction-map", "version": "1.0", "default": self.NXDATA_DIFFMAP_NAME, "nxdata_integrate": nxdata_integrate_path, } ) nxprocess_diffmap["dim0"] = y_nb_points nxprocess_diffmap["dim0"].attrs["axis"] = "motory" nxprocess_diffmap["dim1"] = x_nb_points nxprocess_diffmap["dim1"].attrs["axis"] = "motorx" nxprocess_diffmap["offset"] = 0 integration_options: dict = self.get_input_value( "integration_options", {} ) if integration_options.get( "mask_file", None ) and integration_options.get("do_mask", True): mask_file = integration_options.get("mask_file") else: mask_file = "" nxprocess_diffmap["mask_file"] = mask_file nxprocess_diffmap["name"] = "ROI diffraction map" # NxNote: configuration config = nxprocess_diffmap.create_group("configuration") config.attrs["NX_class"] = "NXnote" pyfai_config = WorkerConfig.from_dict(integration_options) config["data"] = json.dumps( pyfai_config.as_dict(), indent=2, separators=(",\r\n", ": ") ) config["type"] = "text/json" # NxData: result nxdata_diffmap = nxprocess_diffmap.create_group( self.NXDATA_DIFFMAP_NAME ) nxdata_diffmap.attrs["NX_class"] = "NXdata" nxdata_diffmap["ypoints"] = np.arange(y_nb_points) nxdata_diffmap["xpoints"] = np.arange(x_nb_points) # Copy the radial information axes_integrate = nxdata_integrate.attrs.get("axes", [""]) nxprocess_diffmap["dim2"] = nbpt_rad radial_name = axes_integrate[-1] nxprocess_diffmap["dim2"].attrs.update( { "axis": "diffraction", "name": radial_name, } ) nxdata_integrated_axis = nxdata_integrate[radial_name] virtual_source = h5py.VirtualSource(nxdata_integrated_axis) virtual_layout = h5py.VirtualLayout( shape=nxdata_integrated_axis.shape, dtype=nxdata_integrated_axis.dtype, ) virtual_layout[:] = virtual_source[:] nxdata_diffmap.create_virtual_dataset(radial_name, virtual_layout) # Reshape and write the diffmap intensity_integrated = dset_integrated[:] dset_intensity_diffmap = nxdata_diffmap.create_dataset( name="intensity", data=np.reshape( intensity_integrated, (y_nb_points, x_nb_points, nbpt_rad), ), dtype="float32", chunks=(1, 1, nbpt_rad), fillvalue=np.nan, ) # NxData attrs nxdata_diffmap.attrs.update( { "interpretation": "image", "signal": "intensity", "axes": [radial_name, "ypoints", "xpoints"], } ) # Map with shifted dimensions layout = h5py.VirtualLayout( shape=(nbpt_rad, y_nb_points, x_nb_points), dtype=dset_intensity_diffmap.dtype, ) source = h5py.VirtualSource(dset_intensity_diffmap) for i in range(y_nb_points): for j in range(x_nb_points): layout[:, i, j] = source[i, j] nxdata_diffmap.create_virtual_dataset( "map", layout, fillvalue=np.nan ).attrs["interpretation"] = "image" # Soft-link to the NXdata integrated create_hdf5_link( parent=nxprocess_diffmap, link_name=Path(nxdata_integrate.name).name, target=nxdata_integrate, relative=True, ) # Check if normalization of integrated data is possible norm_values = self.get_normalization_values() if norm_values is not None and len(norm_values) != nb_frames: logger.error( f"Normalization values array has different number of frames ({len(norm_values)}) than the integrated data ({nb_frames})" ) norm_values = None if norm_values is not None: # Save normalization values dset_normalization_values = nxdata_diffmap.create_dataset( name=self.get_input_value("normalization_counter"), data=norm_values, dtype="float32", chunks=True, fillvalue=np.nan, ) dset_normalization_values.attrs["interpretation"] = "scalar" # Dark value is the normalization counter value when the beam is off dark_value = self.get_input_value("dark_value", 0.0) nxdata_diffmap["dark_value"] = dark_value norm_values -= dark_value integrate_intensity_norm = np.where( norm_values[:, np.newaxis] == 0, 0.0, dset_integrated[:] / norm_values[:, np.newaxis], ) dset_diffmap_intensity_norm = nxdata_diffmap.create_dataset( name="intensity_norm", data=np.reshape( integrate_intensity_norm, (y_nb_points, x_nb_points, nbpt_rad), ), dtype="float32", chunks=(1, 1, nbpt_rad), fillvalue=np.nan, ) # Map-norm with shifted dimensions layout = h5py.VirtualLayout( shape=(nbpt_rad, y_nb_points, x_nb_points), dtype=dset_diffmap_intensity_norm.dtype, ) source = h5py.VirtualSource(dset_diffmap_intensity_norm) for i in range(y_nb_points): for j in range(x_nb_points): layout[:, i, j] = source[i, j] nxdata_diffmap.create_virtual_dataset( "map_norm", layout, fillvalue=np.nan ).attrs["interpretation"] = "image" nxdata_diffmap.attrs["signal"] = ( "map" if norm_values is None else "map_norm" ) # If the master_filename is provided, we may get a link to the lima measurement if master_filename and scan_nb and lima_name: if lima_name not in scan_entry["measurement"]: scan_entry["measurement"][lima_name] = h5py.ExternalLink( filename=master_filename, path=f"{scan_nb}.1/measurement/{lima_name}", ) # Link the diffmap nexus process to output_filename if output_filename != external_output_filename: with h5py_utils.open_item( filename=output_filename, name=scan_entry_name, mode="r+", ) as scan_entry: create_hdf5_link( parent=scan_entry, link_name=nxprocess_diffmap_name, target=nxprocess_diffmap, relative=True, ) # Save a .png image dset_to_save = ( nxdata_diffmap["map"] if norm_values is None else nxdata_diffmap["map_norm"] ) filename_img = self.save_image( external_output_filename=external_output_filename, intensity_dset=dset_to_save[:], ) self.outputs.nxdata_url = f"{external_output_filename}::{scan_entry_name}/{nxprocess_diffmap_name}/{self.NXDATA_DIFFMAP_NAME}" self.outputs.filename_img = filename_img
def _get_scan_nb(self, file_input: h5py.Group) -> int: nxdata_url: str = self.inputs.nxdata_url _, nxdata_integrate_path = nxdata_url.split("::") nxdata_integrate = file_input[nxdata_integrate_path] nxprocess_integrate = nxdata_integrate.parent scan_entry = nxprocess_integrate.parent scan_entry_name = Path(scan_entry.name).name return int(scan_entry_name.split(".")[0])
[docs] def get_normalization_values(self) -> np.ndarray: """Provides a way to read the monitor array either from a blissdata stream or from the file.""" normalization_counter = self.get_input_value("normalization_counter", None) if normalization_counter is None: return # Retrieve counter data from blissdata stream or from the file scan_memory_url = self.get_input_value("scan_memory_url", None) if scan_memory_url: try: datastore = DataStore(url=BeaconData().get_redis_data_db()) except Exception as e: logger.error( f"Failed to connect to Redis datastore: {e}. Needs a BEACON_HOST environment variable." ) return scan = datastore.load_scan(key=scan_memory_url) while int(scan.state) != 4: scan.update() stream = next( ( stream for stream_name, stream in scan.streams.items() if normalization_counter in stream_name ), None, ) if stream is None: logger.error( f"Normalization counter {normalization_counter} not found in scan.streams." ) return None return stream[:] else: master_filename: str = self.get_input_value("master_filename", None) scan_number: int = self.get_input_value("scan_nb", None) if not master_filename or not scan_number: logger.error( "No master filename or scan number provided for normalization counter." ) return with h5py_utils.open_item( filename=master_filename, name=f"{scan_number}.1" ) as scan_entry: measurement = scan_entry["measurement"] if normalization_counter not in measurement: logger.error( f"Normalization counter {normalization_counter} not found in {measurement}." ) return return measurement[normalization_counter][:]
[docs] def save_image( self, external_output_filename: str, intensity_dset: np.ndarray, flip: bool = True, ) -> str: if self.get_input_value("save_img", True): average_intensity = np.nanmean(intensity_dset, axis=0) sh0, sh1 = average_intensity.shape # Downgrade the image to a maximum of 100 Kb average_intensity = average_intensity.astype("float32") max_size_bytes = 1e5 # 100Kb sh0_new = int(np.sqrt(max_size_bytes / 4 * sh0 / sh1)) sh1_new = int(np.sqrt(max_size_bytes / 4 * sh1 / sh0)) factor = (sh0_new / sh0, sh1_new / sh1) average_intensity = zoom(average_intensity, factor) nxprocess_name = self.get_input_value( "nxprocess_name", f"{self.inputs.lima_name}_diffmap" ) # Save the image as a .png file subdir_gallery = Path(external_output_filename).parent.joinpath("gallery") subdir_gallery.mkdir(parents=True, exist_ok=True) name = Path(external_output_filename).name.replace( ".h5", f"_{nxprocess_name}.png" ) filename_img = str(subdir_gallery.joinpath(name)) if flip: average_intensity = np.flipud(average_intensity) try: matplotlib.image.imsave( fname=filename_img, arr=average_intensity, cmap="viridis", dpi=100, origin="lower", ) return filename_img except Exception as e: logger.error(f"Failed to save image: {e}") return ""