Commit 2be54fa2 authored by Islam, Fahima's avatar Islam, Fahima
Browse files

Upload New File

parent 1db51c77
Loading
Loading
Loading
Loading
+420 −0
Original line number Diff line number Diff line
import numpy as np
from detector_utils import extract_bank_data, get_pack_types, convert_to_euler_angles_xyz, count_total_pixels
from neutrons2lambda import convert2histogram
import glob

def compute_pixelPos_from_pixelID(pixelID, start_index, Nx, Ny, xwidth, yheight, dx, dy):
    """
    Compute the (x, y) positions from the given pixelID.

    Parameters:
    - pixelID (array or scalar): The pixel index array.
    - start_index (int): The starting pixel index.
    - Nx (int): Number of pixels in the x-direction.
    - Ny (int): Number of pixels in the y-direction.
    - xwidth (float): Total width of the detector.
    - yheight (float): Total height of the detector.
    - dx (float): Pixel size in the x direction.
    - dy (float): Pixel size in the y direction.

    Returns:
    - x (array or scalar): Computed x positions.
    - y (array or scalar): Computed y positions.
    """
    # Compute xindex and yindex
    xindex = (pixelID - start_index) // Ny
    yindex = (pixelID - start_index) % Ny
    # Compute x and y positions
    x = xindex * dx - (xwidth / 2)
    y = yindex * dy - (yheight / 2)

    return x, y


def calculate_start_indices(bank_data, selected_banks):
    """
    Calculates the start indices dynamically, ensuring bank1 starts at 0.
    If the first selected bank is not bank1, its start index is computed correctly.

    Parameters:
    - bank_data (dict): Dictionary containing detector bank data with keys as 'bankN'.
    - selected_banks (list): List of banks to include, e.g., ["bank82", "bank83", ..., "bank99"].

    Returns:
    - dict: A dictionary where keys are bank names and values are their corresponding start indices.
    """
    pack_types = get_pack_types()  # Get detector pack configurations
    start_indices = {}

    # Sort banks numerically (ensure correct order)
    sorted_bank_names = sorted(bank_data.keys(), key=lambda x: int(x.replace("bank", "")))

    # Initialize start index
    start_index = 0

    # Find the correct start index for the first selected bank
    for bank_name in sorted_bank_names:

        if bank_name in bank_data:

            pack_type = bank_data[bank_name]["pack_type"]
            Nx = pack_types[pack_type]["tube_count"]
            Ny = pack_types[pack_type]["pixels_in_tube"]
            total_pixels = Nx * Ny

            if bank_name == selected_banks[0]:  # Stop accumulating when reaching first selected bank
                break
            start_index += total_pixels
            # print(start_index)# Accumulate pixels of all previous banks

    # Assign start indices to the selected banks
    for bank_name in selected_banks:
        if bank_name in bank_data:
            pack_type = bank_data[bank_name]["pack_type"]
            Nx = pack_types[pack_type]["tube_count"]
            Ny = pack_types[pack_type]["pixels_in_tube"]
            total_pixels = Nx * Ny

            # Assign start index to this bank
            start_indices[bank_name] = start_index

            # Update start index for the next bank
            start_index += total_pixels

    return start_indices


def compute_pixelPos_for_allBank(bank_data, selected_banks=None):
    """
    Computes (x, y) positions for selected banks based on detector parameters.

    Parameters:
    - bank_data (dict): Dictionary containing all detector bank data.
    - all_pixelID (numpy array): Array of pixel IDs.
    - selected_banks (list, optional): List of banks to compute positions for. If None, uses all banks.

    Returns:
    - pixel_positions (dict): Dictionary where keys are bank names and values are lists of (x, y) positions.
    """
    pack_types = get_pack_types()  # Get detector pack configurations

    # If no banks are specified, use all banks
    if selected_banks is None:
        selected_banks = list(bank_data.keys())

    # Compute correct start indices for selected banks
    start_indices = calculate_start_indices(bank_data, selected_banks)

    pixel_positions = {}
    min_pixelID = 0
    max_pixelID = count_total_pixels(bank_data) - 1
    pixelID = np.arange(min_pixelID, max_pixelID + 1)

    for bank_name in selected_banks:  # Only iterate over selected banks
        if bank_name not in bank_data:
            continue  # Skip if the bank is not in the full dataset

        # Get correct start index
        start_index = start_indices[bank_name]

        pack = pack_types[bank_data[bank_name]["pack_type"]]
        xwidth = pack["width"]
        yheight = pack["height"]
        Nx = pack["tube_count"]
        Ny = pack["pixels_in_tube"]
        dx, dy = xwidth / Nx, yheight / Ny  # Pixel size

        # Compute (x, y) positions
        x, y = compute_pixelPos_from_pixelID(
            pixelID[start_index:start_index + (Nx * Ny)],
            start_index, Nx, Ny, xwidth, yheight, dx, dy
        )
        pixel_positions[bank_name] = list(zip(x, y))

    return pixel_positions


def transform_to_sample_frame_and_compute_Lsd(x_det, y_det, bank_location, yzy_angles):
    """
    Transforms detector coordinates (x_det, y_det) into the sample frame and computes L_sd.

    Parameters:
    - x_det, y_det (numpy arrays): X and Y positions in the detector frame.
    - bank_location (tuple): (x_bank, y_bank, z_bank) position in the sample frame.
    - yzy_angles (tuple): Y-Z-Y rotation angles for the bank.

    Returns:
    - x_sample, y_sample, z_sample: Transformed coordinates in the sample frame.
    - L_sd: Sample-to-detector distance for each pixel.
    """
    # Convert to homogeneous coordinates (x, y, 0) in detector frame
    r_detector = np.vstack([x_det, y_det, np.zeros_like(x_det)])

    # Compute rotation matrix from YZY angles
    alpha, beta, gamma = convert_to_euler_angles_xyz(yzy_angles[::-1])  # Reverse angles
    alpha, beta, gamma = np.deg2rad([alpha, beta, gamma])
    #
    R_Y1 = np.array([
        [np.cos(alpha), 0, np.sin(alpha)],
        [0, 1, 0],
        [-np.sin(alpha), 0, np.cos(alpha)]
    ])

    R_Z = np.array([
        [np.cos(beta), -np.sin(beta), 0],
        [np.sin(beta), np.cos(beta), 0],
        [0, 0, 1]
    ])

    R_Y2 = np.array([
        [np.cos(gamma), 0, np.sin(gamma)],
        [0, 1, 0],
        [-np.sin(gamma), 0, np.cos(gamma)]
    ])

    # Final rotation matrix
    R = R_Y2 @ R_Z @ R_Y1

    # Transform detector positions into sample frame
    r_sample = R @ r_detector  # Rotate
    r_sample += np.array(bank_location).reshape(3, 1)  # Translate

    # Compute sample-to-detector distance L_sd
    L_sd = np.sqrt(r_sample[0, :]**2 + r_sample[1, :]**2 + r_sample[2, :]**2)

    return r_sample[0, :], r_sample[1, :], r_sample[2, :], L_sd


def compute_L_for_all_pixels(bank_data, L_ms, selected_banks=None):
    """
    Computes and returns a dictionary mapping each detector pixel ID to its total flight path L.

    Parameters:
    - bank_data (dict): Dictionary of detector bank information.
    - L_ms (float): Moderator-to-sample distance in meters.

    Returns:
    - pixel_to_L_map (dict): Mapping from pixel ID to total flight path L.
    """
    pixel_to_L_map = {}
    pack_types = get_pack_types()  # Get detector pack configurations
    # If no banks are specified, use all banks
    if selected_banks is None:
        selected_banks = list(bank_data.keys())

    # Compute correct start indices for selected banks
    start_indices = calculate_start_indices(bank_data, selected_banks)
    min_pixelID = 0
    max_pixelID = count_total_pixels(bank_data) - 1
    pixelID = np.arange(min_pixelID, max_pixelID + 1)
    # start_index = np.where(pixelID == min_pixelID)[0][0]
    for bank_name in selected_banks:  # Only iterate over selected banks
        if bank_name not in bank_data:
            continue  # Skip if the bank is not in the full dataset

        # Get correct start index
        start_index = start_indices[bank_name]

        pack = pack_types[bank_data[bank_name]["pack_type"]]
        xwidth = pack["width"]
        yheight = pack["height"]
        Nx = pack["tube_count"]
        Ny = pack["pixels_in_tube"]
        dx, dy = xwidth / Nx, yheight / Ny  # Pixel size
        bank_location = bank_data[bank_name]["location"]
        yzy_angles = bank_data[bank_name]["yzy_angles"]
        # Compute (x, y) positions
        x_det, y_det = compute_pixelPos_from_pixelID(pixelID[start_index:start_index+(Nx*Ny)], start_index, Nx, Ny, xwidth, yheight, dx, dy)


        _, _, _, L_sd = transform_to_sample_frame_and_compute_Lsd(x_det, y_det, bank_location, yzy_angles)

        # Compute total L for each detector pixel
        L_total = L_ms + L_sd

        # Store in dictionary
        for i, L_value in enumerate(L_total):
            pixel_to_L_map[start_index + i] = L_value


    return pixel_to_L_map

def compute_theta(x_sample, y_sample, z_sample):
    """
    Compute theta for each pixel position.
    """
    return 0.5 *np.arctan2(np.sqrt(x_sample**2 + y_sample**2), z_sample)



def compute_theta_for_all_pixels(bank_data, selected_banks=None):
    """
    Computes and returns a dictionary mapping each detector pixel ID to its total flight path L.

    Parameters:
    - bank_data (dict): Dictionary of detector bank information.
    - L_ms (float): Moderator-to-sample distance in meters.

    Returns:
    - pixel_to_L_map (dict): Mapping from pixel ID to total flight path L.
    """
    pixel_to_theta_map = {}
    pack_types = get_pack_types()  # Get detector pack configurations
    # If no banks are specified, use all banks
    if selected_banks is None:
        selected_banks = list(bank_data.keys())

    # Compute correct start indices for selected banks
    start_indices = calculate_start_indices(bank_data, selected_banks)
    min_pixelID = 0
    max_pixelID = count_total_pixels(bank_data) - 1
    pixelID = np.arange(min_pixelID, max_pixelID + 1)
    # start_index = np.where(pixelID == min_pixelID)[0][0]

    for bank_name in selected_banks:  # Only iterate over selected banks
        if bank_name not in bank_data:
            continue  # Skip if the bank is not in the full dataset

        # Get correct start index
        start_index = start_indices[bank_name]

        pack = pack_types[bank_data[bank_name]["pack_type"]]
        xwidth = pack["width"]
        yheight = pack["height"]
        Nx = pack["tube_count"]
        Ny = pack["pixels_in_tube"]
        dx, dy = xwidth / Nx, yheight / Ny  # Pixel size
        bank_location = bank_data[bank_name]["location"]
        yzy_angles = bank_data[bank_name]["yzy_angles"]
        # Compute (x, y) positions
        x_det, y_det = compute_pixelPos_from_pixelID(pixelID[start_index:start_index+(Nx*Ny)], start_index, Nx, Ny, xwidth, yheight, dx, dy)

        x_sample, y_sample, z_sample, L_sd = transform_to_sample_frame_and_compute_Lsd(x_det, y_det, bank_location, yzy_angles)

        # Compute total L for each detector pixel
        theta = compute_theta(x_sample, y_sample, z_sample)

        # Store in dictionary
        for i, theta_value in enumerate(theta):
            pixel_to_theta_map[start_index + i] = theta_value


    return pixel_to_theta_map



def compute_wavelength(TOF,  L_values):
    """
    Computes neutron wavelength from time-of-flight and total  for each event.

    Parameters:
    - TOF (numpy array): Time-of-flight in microseconds (µs) for each event.
    - pixelID (numpy array): Pixel IDs corresponding to TOF values.
    - pixel_to_L_map (dict): Dictionary mapping pixel IDs to total flight path L.

    Returns:
    - wavelength (numpy array): Wavelength for each neutron event in Ångströms (Å).
    """

    return 3956e-7 * (TOF / L_values)  # Ensures TOF and L_values have the same shape

def compute_Q(wavelength, theta):
    """
    Compute Q from wavelength and theta.
    """
    return (4 * np.pi / wavelength) * np.sin(theta)


# Define bank groups
BANK_GROUPS = {
    "group1": list(range(1, 15)),
    "group2": list(range(15, 38)),
    "group3": list(range(38, 52)),
    "group4": list(range(52, 64)),
    "group5": list(range(64, 82)),
    "group6": list(range(82, 100)),
}

def process_neutron_events(
    simdir,
    xml_file=None,  # Default to None
    group=None,
    first_bank=None,
    last_bank=None,
    single_bank=None,
    L_ms=19.75,
    q_bins=320,
    q_range_=(0,30)
):
    """
    Processes neutron scattering events for selected detector banks.

    Parameters:
    - simdir (str): Path to the simulation directory containing bank .npy files.
    - xml_file (str, optional): Path to the XML file. If None, assumes it is inside simdir.
    - group (str, optional): Group name (e.g., 'group6'). If provided, selects the corresponding banks.
    - first_bank (int, optional): First bank number (used if `group` is None).
    - last_bank (int, optional): Last bank number (used if `group` is None).
    - single_bank (int, optional): A single bank number to process (overrides all other selection methods).
    - L_ms (float): Moderator-to-sample distance in meters.
    - q_bins (int): Number of Q bins for histogram conversion.
    - q_range_ (tuple, optional): Min and max values for Q range in histogram conversion (default: (0, 30)).

    Returns:
    - q_bin (numpy.ndarray): Binned Q values.
    - I_bin (numpy.ndarray): Intensity values.
    - e_bin (numpy.ndarray): Error values.
    """

    # Use XML file from simdir if not provided
    if xml_file is None:
        xml_file = f"{simdir}/NOMAD_Definition.xml"

    # Determine selected banks
    if single_bank is not None:
        selected_banks = [f"bank{single_bank}"]
    elif group:
        selected_banks = [f"bank{i}" for i in BANK_GROUPS.get(group, [])]
    elif first_bank is not None and last_bank is not None:
        selected_banks = [f"bank{i}" for i in range(first_bank, last_bank + 1)]
    else:
        # Default: Select all banks (1 to 99)
        selected_banks = [f"bank{i}" for i in range(1, 100)]

    # Extract bank data
    bank_data = extract_bank_data(xml_file)

    # Get all bank files efficiently
    all_banks = set(glob.glob(f"{simdir}/*/bank*.npy"))

    # Filter bank files using set operations (efficient)
    detector = [f for f in all_banks if any(f"{bank}.npy" in f for bank in selected_banks)]

    if not detector:
        raise ValueError("No matching bank files found.")

    # Load and concatenate event data efficiently
    events = [np.load(npyf) for npyf in detector]
    events = np.concatenate(events)  # Convert to a single structured array

    # Compute pixel-specific values
    pixel_Lsd_bank = compute_L_for_all_pixels(bank_data, L_ms, selected_banks)
    pixel_theta_bank = compute_theta_for_all_pixels(bank_data, selected_banks)

    # Vectorized processing of events
    pid = events['pixelID']
    tof = events['tofChannelNo']
    p = events['p']

    # Get precomputed values for this event's pixels
    L_sd_event = np.array([pixel_Lsd_bank[p] for p in pid])
    theta_event = np.array([pixel_theta_bank[p] for p in pid])

    # Compute wavelength and Q (vectorized)
    wavelength_event = compute_wavelength(tof, L_sd_event)
    Q_values = compute_Q(wavelength_event, theta_event)
    print(Q_values.size)
    # Convert to histogram
    q_bin, I_bin, e_bin = convert2histogram(Q_values, p, q_bins, q_range_)

    return q_bin, I_bin, e_bin