from __future__ import (absolute_import, division, print_function) import mantid.simpleapi as mantid import os from isis_powder.routines import common, yaml_parser from isis_powder.routines.RunDetails import RunDetails from isis_powder.polaris_routines import polaris_advanced_config def calculate_absorb_corrections(ws_to_correct, multiple_scattering): mantid.MaskDetectors(ws_to_correct, SpectraList=list(range(0, 55))) absorb_dict = polaris_advanced_config.absorption_correction_params geometry_json = {'Shape': 'Cylinder', 'Height': absorb_dict["cylinder_sample_height"], 'Radius': absorb_dict["cylinder_sample_radius"], 'Center': absorb_dict["cylinder_position"]} material_json = {'ChemicalFormula': absorb_dict["chemical_formula"]} mantid.SetSample(InputWorkspace=ws_to_correct, Geometry=geometry_json, Material=material_json) ws_to_correct = mantid.ConvertUnits(InputWorkspace=ws_to_correct, OutputWorkspace=ws_to_correct, Target="TOF") ws_to_correct = mantid.MayersSampleCorrection(InputWorkspace=ws_to_correct, OutputWorkspace=ws_to_correct, MultipleScattering=multiple_scattering) ws_to_correct = mantid.ConvertUnits(InputWorkspace=ws_to_correct, OutputWorkspace=ws_to_correct, Target="dSpacing") return ws_to_correct def get_run_details(run_number_string, inst_settings): run_number = common.generate_run_numbers(run_number_string=run_number_string) if isinstance(run_number, list): # Only take first run number run_number = run_number[0] yaml_dict = yaml_parser.get_run_dictionary(run_number_string=run_number, file_path=inst_settings.cal_mapping_file) if inst_settings.chopper_on: chopper_config = yaml_dict["chopper_on"] else: chopper_config = yaml_dict["chopper_off"] label = yaml_dict["label"] offset_file_name = yaml_dict["offset_file_name"] empty_runs = chopper_config["empty_run_numbers"] vanadium_runs = chopper_config["vanadium_run_numbers"] grouping_full_path = os.path.normpath(os.path.expanduser(inst_settings.calibration_dir)) grouping_full_path = os.path.join(grouping_full_path, inst_settings.grouping_file_name) in_calib_dir = os.path.join(inst_settings.calibration_dir, label) offsets_file_full_path = os.path.join(in_calib_dir, offset_file_name) # Generate the name of the splined file we will either be loading or saving splined_vanadium_name = _generate_splined_van_filename(chopper_on=inst_settings.chopper_on, vanadium_run_string=vanadium_runs, offset_file_name=offset_file_name) splined_vanadium = os.path.join(in_calib_dir, splined_vanadium_name) run_details = RunDetails(run_number=run_number) run_details.user_input_run_number = run_number_string run_details.empty_runs = empty_runs run_details.vanadium_run_numbers = vanadium_runs run_details.label = label run_details.offset_file_path = offsets_file_full_path run_details.grouping_file_path = grouping_full_path run_details.splined_vanadium_file_path = splined_vanadium return run_details def split_into_tof_d_spacing_groups(run_details, processed_spectra): d_spacing_output = [] tof_output = [] run_number = str(run_details.user_input_run_number) for name_index, ws in enumerate(processed_spectra): d_spacing_out_name = run_number + "-ResultD-" + str(name_index + 1) tof_out_name = run_number + "-ResultTOF-" + str(name_index + 1) d_spacing_output.append(mantid.ConvertUnits(InputWorkspace=ws, OutputWorkspace=d_spacing_out_name, Target="dSpacing")) tof_output.append(mantid.ConvertUnits(InputWorkspace=ws, OutputWorkspace=tof_out_name, Target="TOF")) # Group the outputs d_spacing_group_name = run_number + "-Results-D-Grp" d_spacing_group = mantid.GroupWorkspaces(InputWorkspaces=d_spacing_output, OutputWorkspace=d_spacing_group_name) tof_group_name = run_number + "-Results-TOF-Grp" tof_group = mantid.GroupWorkspaces(InputWorkspaces=tof_output, OutputWorkspace=tof_group_name) return d_spacing_group, tof_group def process_vanadium_for_focusing(bank_spectra, mask_path, spline_number): bragg_masking_list = _read_masking_file(mask_path) masked_workspace_list = _apply_bragg_peaks_masking(bank_spectra, mask_list=bragg_masking_list) output = common.spline_vanadium_for_focusing(focused_vanadium_spectra=masked_workspace_list, num_splines=spline_number) common.remove_intermediate_workspace(masked_workspace_list) return output def _apply_bragg_peaks_masking(workspaces_to_mask, mask_list): output_workspaces = list(workspaces_to_mask) for ws_index, (bank_mask_list, workspace) in enumerate(zip(mask_list, output_workspaces)): output_name = "masked_vanadium-" + str(ws_index + 1) for mask_params in bank_mask_list: output_workspaces[ws_index] = mantid.MaskBins(InputWorkspace=output_workspaces[ws_index], OutputWorkspace=output_name, XMin=mask_params[0], XMax=mask_params[1]) return output_workspaces def _generate_splined_van_filename(chopper_on, vanadium_run_string, offset_file_name): output_string = "SplinedVan_" + str(vanadium_run_string) + "_chopper" output_string += "On" if chopper_on else "Off" output_string += '_' + offset_file_name output_string += ".nxs" return output_string def _read_masking_file(masking_file_path): all_banks_masking_list = [] bank_masking_list = [] ignore_line_prefixes = (' ', '\n', '\t', '#') # Matches whitespace or # symbol with open(masking_file_path) as mask_file: for line in mask_file: if line.startswith(ignore_line_prefixes): # Push back onto new bank if bank_masking_list: all_banks_masking_list.append(bank_masking_list) bank_masking_list = [] else: # Parse and store in current list line.rstrip() bank_masking_list.append(line.split()) if bank_masking_list: all_banks_masking_list.append(bank_masking_list) return all_banks_masking_list