Loading utils/PG3_62140_rel_offset.dat 0 → 100644 +43120 −0 File added.Preview size limit exceeded, changes collapsed. Show changes utils/pg3_calib_doctor.py +225 −124 Original line number Diff line number Diff line Loading @@ -9,39 +9,69 @@ import sys import glob import urllib import plotly.graph_objects as go from plotly.subplots import make_subplots sys.path.append("/SNS/users/y8z/miniconda/envs/slack/lib/python3.13/site-packages/") from slack_sdk import WebClient def plot_masked_spectra(run_num, out_dir, bad_pixels, diag_dict, diag_input, output_html=None): def plot_diagnostics(run_num, out_dir, num_masks, bad_diag_input, good_diag_input, rel_offset, diag_dict, output_html=None): """ Plots all masked spectra files for a given run using Plotly. Parameters: - run_num: The run number as a string (e.g., '60785'). - out_dir: The output directory where the HTML file will be saved. - bad_pixels: A boolean inferring whether to plot unmasked bad pixels (True) or masked good pixels (False). - diag_dict: Dictionary containing diagnostic information for the sample environment. - diag_input: List containing diagnostic input values. - output_html: Optional name for the output HTML file. Creates a combined diagnostic HTML with three subplots: 1. Unmasked bad pixels spectra 2. Masked good pixels spectra 3. Relative d-spacing offset per pixel index Parameters ---------- run_num : str or int Run number. out_dir : str Directory where .dat files are saved and the HTML will be written. num_masks : int Total number of masked pixels. bad_diag_input : list or None [num_masks, num_bad_unmasked], or None if no unmasked bad pixels. good_diag_input : list or None [num_masks, num_good_masked], or None if no masked good pixels. rel_offset : np.ndarray Full array of relative d-spacing offsets for all pixels (0.0 for masked pixels). diag_dict : dict Diagnostic thresholds dictionary. output_html : str, optional Output HTML file path. """ if output_html is None: if bad_pixels: output_html = os.path.join(out_dir, f"pg3_{run_num}_unmasked_bad_pixels_plot.html") else: output_html = os.path.join(out_dir, f"pg3_{run_num}_masked_good_pixels_plot.html") output_html = os.path.join(out_dir, f"pg3_{run_num}_diagnostics_plot.html") bad_count = bad_diag_input[1] if bad_diag_input is not None else 0 good_count = good_diag_input[1] if good_diag_input is not None else 0 # Pre-compute outlier count for subplot title num_outliers = 0 if len(rel_offset) > 0: _unmasked = rel_offset[rel_offset != 0] if len(_unmasked) > 0: _Q1 = np.percentile(_unmasked, 5) _Q3 = np.percentile(_unmasked, 95) _IQR = _Q3 - _Q1 num_outliers = int(np.sum((_unmasked > _Q3 + 1.5 * _IQR) | (_unmasked < _Q1 - 1.5 * _IQR))) fig = make_subplots( rows=3, cols=1, subplot_titles=[ f"Unmasked Bad Pixels (Count: {bad_count})", f"Masked Good Pixels (Count: {good_count})", f"Relative d-spacing Offset per Pixel (Outliers: {num_outliers})" ], vertical_spacing=0.08 ) # Match files like: pg3_60785_6140_masked_*.dat if bad_pixels: pattern = os.path.join(out_dir, f"PG3_{run_num}_unmasked_bad_pixels_*.dat") else: pattern = os.path.join(out_dir, f"PG3_{run_num}_masked_good_pixels_*.dat") def add_spectra_traces(pattern, row): file_paths = glob.glob(pattern) # Sort files numerically by the spectrum number at the end def get_spectrum_num(path): try: # Extract number between last '_' and '.dat' return int(os.path.basename(path).split('_')[-1].split('.')[0]) except (ValueError, IndexError): return 0 Loading @@ -49,24 +79,15 @@ def plot_masked_spectra(run_num, out_dir, bad_pixels, diag_dict, diag_input, out file_paths.sort(key=get_spectrum_num) if not file_paths: print(f"[Error] No files found matching: {pattern}") return fig = go.Figure() for path in file_paths: file_name = os.path.basename(path) # Extract the ID for the legend spectrum_id = file_name.split('_')[-1].replace('.dat', '') x_vals = [] y_vals = [] x_vals, y_vals = [], [] try: with open(path, 'r', encoding='utf-8') as f: # Read all lines lines = f.readlines() # Skip header: Line 1 (comments) and Line 2 (spectrum number) for line in lines[2:]: line = line.strip() if not line: Loading @@ -78,65 +99,143 @@ def plot_masked_spectra(run_num, out_dir, bad_pixels, diag_dict, diag_input, out y_vals.append(float(parts[1])) except ValueError: continue if x_vals: fig.add_trace(go.Scatter( fig.add_trace( go.Scatter( x=x_vals, y=y_vals, mode='lines', name=f"Spectrum {spectrum_id}", hovertemplate='X: %{x}<br>Y: %{y}<extra></extra>' )) ), row=row, col=1 ) except Exception as e: print(f"Error reading {file_name}: {e}") if bad_pixels: f_title = f"Unmasked Bad Pixels for PG3 Run Number {run_num}; Total # of Masks: {diag_input[0]}" f_title += f", Total # of Unmasked Bad Pixels: {diag_input[1]}" else: f_title = f"Masked Good Pixels for PG3 Run Number {run_num}; Total # of Masks: {diag_input[0]}" f_title += f", Total # of Masked Good Pixels: {diag_input[1]}" add_spectra_traces( os.path.join(out_dir, f"PG3_{run_num}_unmasked_bad_pixels_*.dat"), row=1 ) add_spectra_traces( os.path.join(out_dir, f"PG3_{run_num}_masked_good_pixels_*.dat"), row=2 ) # Ensure both panels always render axes even when empty for row in (1, 2): fig.add_trace( go.Scatter(x=[], y=[], mode='lines', showlegend=False, hoverinfo='skip'), row=row, col=1 ) if len(rel_offset) > 0: masked_indices = np.where(rel_offset == 0)[0] unmasked_indices = np.where(rel_offset != 0)[0] unmasked_offset = rel_offset[unmasked_indices] Q1 = np.percentile(unmasked_offset, 5) Q3 = np.percentile(unmasked_offset, 95) IQR = Q3 - Q1 upper = Q3 + 1.5 * IQR lower = Q1 - 1.5 * IQR outlier_mask = (unmasked_offset > upper) | (unmasked_offset < lower) if len(masked_indices) > 0: fig.add_trace( go.Scatter( x=masked_indices.tolist(), y=[0.0] * len(masked_indices), mode='markers', name='Masked Pixels', marker=dict(size=4, color='black'), hovertemplate='Pixel: %{x}<br>Masked (offset=0)<extra></extra>' ), row=3, col=1 ) fig.add_trace( go.Scatter( x=unmasked_indices[~outlier_mask].tolist(), y=unmasked_offset[~outlier_mask].tolist(), mode='markers', name='Rel. Offset', marker=dict(size=5, color='royalblue'), hovertemplate='Pixel: %{x}<br>|Rel. Offset|: %{y:.6f}<extra></extra>' ), row=3, col=1 ) if outlier_mask.any(): fig.add_trace( go.Scatter( x=unmasked_indices[outlier_mask].tolist(), y=unmasked_offset[outlier_mask].tolist(), mode='markers', name='Outlier Pixels', marker=dict(size=9, color='red', symbol='x'), hovertemplate='Pixel: %{x}<br>|Rel. Offset|: %{y:.6f}<extra></extra>' ), row=3, col=1 ) x_range = [0, len(rel_offset) - 1] for bound, label in ((upper, f'Upper bound ({upper:.5f})'), (lower, f'Lower bound ({lower:.5f})')): fig.add_trace( go.Scatter( x=x_range, y=[bound, bound], mode='lines', name=label, line=dict(dash='dash', color='tomato', width=1.5), hovertemplate=f'{label}<extra></extra>' ), row=3, col=1 ) fig.update_layout( title=f_title, xaxis_title="d (Å)", yaxis_title="Counts", title=f"PG3 Calibration Diagnostics — Run {run_num} | Total Masks: {num_masks}", height=1800, hovermode="closest", legend=dict( title="Spectrum ID", traceorder="normal" ) legend=dict(title="Spectrum / Offset", traceorder="normal") ) fig.update_xaxes(title_text="d (Å)", row=1, col=1) fig.update_xaxes(title_text="d (Å)", row=2, col=1) fig.update_xaxes(title_text="Detector Pixel Index", row=3, col=1) fig.update_yaxes(title_text="Counts", row=1, col=1) fig.update_yaxes(title_text="Counts", row=2, col=1) fig.update_yaxes(title_text="|Relative Offset|", row=3, col=1) fig.write_html(output_html) print(f"[Info] Successfully saved plot to {output_html}") print(f"[Info] Successfully saved combined diagnostics plot to {output_html}") # --- Warning conditions --- condt_1 = num_masks < diag_dict["NumMasks"][0] - diag_dict["NumMasks"][1] condt_2 = num_masks > diag_dict["NumMasks"][0] + diag_dict["NumMasks"][1] condt_bad = bad_diag_input is not None and bad_diag_input[1] > diag_dict["NumBadUnmasked"] condt_good = good_diag_input is not None and good_diag_input[1] > 0 post_warning = condt_1 or condt_2 or condt_bad or condt_good condt_1 = diag_input[0] < diag_dict["NumMasks"][0] - diag_dict["NumMasks"][1] condt_2 = diag_input[0] > diag_dict["NumMasks"][0] + diag_dict["NumMasks"][1] if bad_pixels: condt_3 = diag_input[1] > diag_dict["NumBadUnmasked"] post_warning = condt_1 or condt_2 or condt_3 if post_warning: trigger_msg = "" if condt_3: trigger_msg += f"⚠️ Trigger: Number of unmasked bad pixels ({diag_input[1]}) " trigger_msg += f"exceeds expected threshold ({diag_dict['NumBadUnmasked']})." if condt_1 or condt_2: if condt_bad: trigger_msg += ( f"⚠️ Trigger: Number of unmasked bad pixels ({bad_diag_input[1]}) " f"exceeds expected threshold ({diag_dict['NumBadUnmasked']})." ) if condt_good: if trigger_msg: trigger_msg += "\n" trigger_msg += f"⚠️ Trigger: Number of masks ({diag_input[0]}) is outside expected range " trigger_msg += f"({diag_dict['NumMasks'][0]} ± {diag_dict['NumMasks'][1]})." else: post_warning = True trigger_msg = f"⚠️ Trigger: Number of masked good pixels ({diag_input[1]}) is non-zero." trigger_msg += f"⚠️ Trigger: Number of masked good pixels ({good_diag_input[1]}) is non-zero." if condt_1 or condt_2: trigger_msg = f"\n⚠️ Trigger: Number of masks ({diag_input[0]}) is outside expected range " trigger_msg += f"({diag_dict['NumMasks'][0]} ± {diag_dict['NumMasks'][1]})." if trigger_msg: trigger_msg += "\n" trigger_msg += ( f"⚠️ Trigger: Number of masks ({num_masks}) is outside expected range " f"({diag_dict['NumMasks'][0]} ± {diag_dict['NumMasks'][1]})." ) if post_warning: sltk = "xoxb-970229573620-6616766768435-KI4xnOfT7roQo9fUpLSSAVjn" client = WebClient(token=sltk) channel_id = "C06J5CYTQ3V" file_size = os.path.getsize(output_html) Loading Loading @@ -197,6 +296,14 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): OutputWorkspace=pg3_input, XMin=300 ) ApplyDiffCal( InstrumentWorkspace=pg3_input, CalibrationWorkspace="_cal" ) MaskDetectors( Workspace=pg3_input, MaskedWorkspace="_mask" ) ConvertUnits( InputWorkspace=pg3_input, OutputWorkspace="pg3_wksp_d", Loading Loading @@ -238,6 +345,14 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): ] def get_chunk_index(i): for idx, chunk in enumerate(pg3_chunks): low, high = chunk[0] if low <= i < high: return idx return -1 def get_chunk_index_1(i): for idx, chunk in enumerate(pg3_chunks_1): low, high = chunk[0] if low <= i < high: Loading @@ -258,7 +373,7 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): num_masks = 0 rel_offset = [] for i in range(num_hist): idx = get_chunk_index(i) idx = get_chunk_index_1(i) d_range_1 = pg3_chunks_1[idx][1] num_peaks_1 = pg3_chunks_1[idx][2] d_range_2 = pg3_chunks_2[idx][1] Loading @@ -275,6 +390,7 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): if mtd["_mask"].readY(i)[0] == 1.0: num_masks += 1 idx = get_chunk_index(i) # Alignment diagnostics if mtd["_mask"].readY(i)[0] == 1.0: rel_offset.append(0.0) Loading Loading @@ -314,8 +430,6 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): SpectrumList=bad_pixels_unmasked, OneSpectrumPerFile=True ) diag_input = [num_masks, len(bad_pixels_unmasked)] plot_masked_spectra(run_num, run_out_dir, True, diag_dict, diag_input) if len(good_pixels_masked) > 0: ExtractSpectra( InputWorkspace="pg3_wksp_d", Loading @@ -328,27 +442,14 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): SpectrumList=good_pixels_masked, OneSpectrumPerFile=True ) diag_input = [num_masks, len(good_pixels_masked)] plot_masked_spectra(run_num, run_out_dir, False, diag_dict, diag_input) data = np.array(rel_offset) data = np.abs(data) original_indices = np.where(data != 0)[0] data = data[original_indices] # --- Outlier detection using IQR method --- Q1 = np.percentile(data, 5) Q3 = np.percentile(data, 95) IQR = Q3 - Q1 lower = Q1 - 1.5 * IQR upper = Q3 + 1.5 * IQR rel_offset_arr = np.array(rel_offset) mean = np.mean(data) std = np.std(data) bad_diag_input = [num_masks, len(bad_pixels_unmasked)] if bad_pixels_unmasked else None good_diag_input = [num_masks, len(good_pixels_masked)] if good_pixels_masked else None outlier_mask = (data < lower) | (data > upper) outliers = data[outlier_mask] outlier_indices_original = original_indices[outlier_mask] plot_diagnostics(run_num, run_out_dir, num_masks, bad_diag_input, good_diag_input, rel_offset_arr, diag_dict) return Loading Loading
utils/PG3_62140_rel_offset.dat 0 → 100644 +43120 −0 File added.Preview size limit exceeded, changes collapsed. Show changes
utils/pg3_calib_doctor.py +225 −124 Original line number Diff line number Diff line Loading @@ -9,39 +9,69 @@ import sys import glob import urllib import plotly.graph_objects as go from plotly.subplots import make_subplots sys.path.append("/SNS/users/y8z/miniconda/envs/slack/lib/python3.13/site-packages/") from slack_sdk import WebClient def plot_masked_spectra(run_num, out_dir, bad_pixels, diag_dict, diag_input, output_html=None): def plot_diagnostics(run_num, out_dir, num_masks, bad_diag_input, good_diag_input, rel_offset, diag_dict, output_html=None): """ Plots all masked spectra files for a given run using Plotly. Parameters: - run_num: The run number as a string (e.g., '60785'). - out_dir: The output directory where the HTML file will be saved. - bad_pixels: A boolean inferring whether to plot unmasked bad pixels (True) or masked good pixels (False). - diag_dict: Dictionary containing diagnostic information for the sample environment. - diag_input: List containing diagnostic input values. - output_html: Optional name for the output HTML file. Creates a combined diagnostic HTML with three subplots: 1. Unmasked bad pixels spectra 2. Masked good pixels spectra 3. Relative d-spacing offset per pixel index Parameters ---------- run_num : str or int Run number. out_dir : str Directory where .dat files are saved and the HTML will be written. num_masks : int Total number of masked pixels. bad_diag_input : list or None [num_masks, num_bad_unmasked], or None if no unmasked bad pixels. good_diag_input : list or None [num_masks, num_good_masked], or None if no masked good pixels. rel_offset : np.ndarray Full array of relative d-spacing offsets for all pixels (0.0 for masked pixels). diag_dict : dict Diagnostic thresholds dictionary. output_html : str, optional Output HTML file path. """ if output_html is None: if bad_pixels: output_html = os.path.join(out_dir, f"pg3_{run_num}_unmasked_bad_pixels_plot.html") else: output_html = os.path.join(out_dir, f"pg3_{run_num}_masked_good_pixels_plot.html") output_html = os.path.join(out_dir, f"pg3_{run_num}_diagnostics_plot.html") bad_count = bad_diag_input[1] if bad_diag_input is not None else 0 good_count = good_diag_input[1] if good_diag_input is not None else 0 # Pre-compute outlier count for subplot title num_outliers = 0 if len(rel_offset) > 0: _unmasked = rel_offset[rel_offset != 0] if len(_unmasked) > 0: _Q1 = np.percentile(_unmasked, 5) _Q3 = np.percentile(_unmasked, 95) _IQR = _Q3 - _Q1 num_outliers = int(np.sum((_unmasked > _Q3 + 1.5 * _IQR) | (_unmasked < _Q1 - 1.5 * _IQR))) fig = make_subplots( rows=3, cols=1, subplot_titles=[ f"Unmasked Bad Pixels (Count: {bad_count})", f"Masked Good Pixels (Count: {good_count})", f"Relative d-spacing Offset per Pixel (Outliers: {num_outliers})" ], vertical_spacing=0.08 ) # Match files like: pg3_60785_6140_masked_*.dat if bad_pixels: pattern = os.path.join(out_dir, f"PG3_{run_num}_unmasked_bad_pixels_*.dat") else: pattern = os.path.join(out_dir, f"PG3_{run_num}_masked_good_pixels_*.dat") def add_spectra_traces(pattern, row): file_paths = glob.glob(pattern) # Sort files numerically by the spectrum number at the end def get_spectrum_num(path): try: # Extract number between last '_' and '.dat' return int(os.path.basename(path).split('_')[-1].split('.')[0]) except (ValueError, IndexError): return 0 Loading @@ -49,24 +79,15 @@ def plot_masked_spectra(run_num, out_dir, bad_pixels, diag_dict, diag_input, out file_paths.sort(key=get_spectrum_num) if not file_paths: print(f"[Error] No files found matching: {pattern}") return fig = go.Figure() for path in file_paths: file_name = os.path.basename(path) # Extract the ID for the legend spectrum_id = file_name.split('_')[-1].replace('.dat', '') x_vals = [] y_vals = [] x_vals, y_vals = [], [] try: with open(path, 'r', encoding='utf-8') as f: # Read all lines lines = f.readlines() # Skip header: Line 1 (comments) and Line 2 (spectrum number) for line in lines[2:]: line = line.strip() if not line: Loading @@ -78,65 +99,143 @@ def plot_masked_spectra(run_num, out_dir, bad_pixels, diag_dict, diag_input, out y_vals.append(float(parts[1])) except ValueError: continue if x_vals: fig.add_trace(go.Scatter( fig.add_trace( go.Scatter( x=x_vals, y=y_vals, mode='lines', name=f"Spectrum {spectrum_id}", hovertemplate='X: %{x}<br>Y: %{y}<extra></extra>' )) ), row=row, col=1 ) except Exception as e: print(f"Error reading {file_name}: {e}") if bad_pixels: f_title = f"Unmasked Bad Pixels for PG3 Run Number {run_num}; Total # of Masks: {diag_input[0]}" f_title += f", Total # of Unmasked Bad Pixels: {diag_input[1]}" else: f_title = f"Masked Good Pixels for PG3 Run Number {run_num}; Total # of Masks: {diag_input[0]}" f_title += f", Total # of Masked Good Pixels: {diag_input[1]}" add_spectra_traces( os.path.join(out_dir, f"PG3_{run_num}_unmasked_bad_pixels_*.dat"), row=1 ) add_spectra_traces( os.path.join(out_dir, f"PG3_{run_num}_masked_good_pixels_*.dat"), row=2 ) # Ensure both panels always render axes even when empty for row in (1, 2): fig.add_trace( go.Scatter(x=[], y=[], mode='lines', showlegend=False, hoverinfo='skip'), row=row, col=1 ) if len(rel_offset) > 0: masked_indices = np.where(rel_offset == 0)[0] unmasked_indices = np.where(rel_offset != 0)[0] unmasked_offset = rel_offset[unmasked_indices] Q1 = np.percentile(unmasked_offset, 5) Q3 = np.percentile(unmasked_offset, 95) IQR = Q3 - Q1 upper = Q3 + 1.5 * IQR lower = Q1 - 1.5 * IQR outlier_mask = (unmasked_offset > upper) | (unmasked_offset < lower) if len(masked_indices) > 0: fig.add_trace( go.Scatter( x=masked_indices.tolist(), y=[0.0] * len(masked_indices), mode='markers', name='Masked Pixels', marker=dict(size=4, color='black'), hovertemplate='Pixel: %{x}<br>Masked (offset=0)<extra></extra>' ), row=3, col=1 ) fig.add_trace( go.Scatter( x=unmasked_indices[~outlier_mask].tolist(), y=unmasked_offset[~outlier_mask].tolist(), mode='markers', name='Rel. Offset', marker=dict(size=5, color='royalblue'), hovertemplate='Pixel: %{x}<br>|Rel. Offset|: %{y:.6f}<extra></extra>' ), row=3, col=1 ) if outlier_mask.any(): fig.add_trace( go.Scatter( x=unmasked_indices[outlier_mask].tolist(), y=unmasked_offset[outlier_mask].tolist(), mode='markers', name='Outlier Pixels', marker=dict(size=9, color='red', symbol='x'), hovertemplate='Pixel: %{x}<br>|Rel. Offset|: %{y:.6f}<extra></extra>' ), row=3, col=1 ) x_range = [0, len(rel_offset) - 1] for bound, label in ((upper, f'Upper bound ({upper:.5f})'), (lower, f'Lower bound ({lower:.5f})')): fig.add_trace( go.Scatter( x=x_range, y=[bound, bound], mode='lines', name=label, line=dict(dash='dash', color='tomato', width=1.5), hovertemplate=f'{label}<extra></extra>' ), row=3, col=1 ) fig.update_layout( title=f_title, xaxis_title="d (Å)", yaxis_title="Counts", title=f"PG3 Calibration Diagnostics — Run {run_num} | Total Masks: {num_masks}", height=1800, hovermode="closest", legend=dict( title="Spectrum ID", traceorder="normal" ) legend=dict(title="Spectrum / Offset", traceorder="normal") ) fig.update_xaxes(title_text="d (Å)", row=1, col=1) fig.update_xaxes(title_text="d (Å)", row=2, col=1) fig.update_xaxes(title_text="Detector Pixel Index", row=3, col=1) fig.update_yaxes(title_text="Counts", row=1, col=1) fig.update_yaxes(title_text="Counts", row=2, col=1) fig.update_yaxes(title_text="|Relative Offset|", row=3, col=1) fig.write_html(output_html) print(f"[Info] Successfully saved plot to {output_html}") print(f"[Info] Successfully saved combined diagnostics plot to {output_html}") # --- Warning conditions --- condt_1 = num_masks < diag_dict["NumMasks"][0] - diag_dict["NumMasks"][1] condt_2 = num_masks > diag_dict["NumMasks"][0] + diag_dict["NumMasks"][1] condt_bad = bad_diag_input is not None and bad_diag_input[1] > diag_dict["NumBadUnmasked"] condt_good = good_diag_input is not None and good_diag_input[1] > 0 post_warning = condt_1 or condt_2 or condt_bad or condt_good condt_1 = diag_input[0] < diag_dict["NumMasks"][0] - diag_dict["NumMasks"][1] condt_2 = diag_input[0] > diag_dict["NumMasks"][0] + diag_dict["NumMasks"][1] if bad_pixels: condt_3 = diag_input[1] > diag_dict["NumBadUnmasked"] post_warning = condt_1 or condt_2 or condt_3 if post_warning: trigger_msg = "" if condt_3: trigger_msg += f"⚠️ Trigger: Number of unmasked bad pixels ({diag_input[1]}) " trigger_msg += f"exceeds expected threshold ({diag_dict['NumBadUnmasked']})." if condt_1 or condt_2: if condt_bad: trigger_msg += ( f"⚠️ Trigger: Number of unmasked bad pixels ({bad_diag_input[1]}) " f"exceeds expected threshold ({diag_dict['NumBadUnmasked']})." ) if condt_good: if trigger_msg: trigger_msg += "\n" trigger_msg += f"⚠️ Trigger: Number of masks ({diag_input[0]}) is outside expected range " trigger_msg += f"({diag_dict['NumMasks'][0]} ± {diag_dict['NumMasks'][1]})." else: post_warning = True trigger_msg = f"⚠️ Trigger: Number of masked good pixels ({diag_input[1]}) is non-zero." trigger_msg += f"⚠️ Trigger: Number of masked good pixels ({good_diag_input[1]}) is non-zero." if condt_1 or condt_2: trigger_msg = f"\n⚠️ Trigger: Number of masks ({diag_input[0]}) is outside expected range " trigger_msg += f"({diag_dict['NumMasks'][0]} ± {diag_dict['NumMasks'][1]})." if trigger_msg: trigger_msg += "\n" trigger_msg += ( f"⚠️ Trigger: Number of masks ({num_masks}) is outside expected range " f"({diag_dict['NumMasks'][0]} ± {diag_dict['NumMasks'][1]})." ) if post_warning: sltk = "xoxb-970229573620-6616766768435-KI4xnOfT7roQo9fUpLSSAVjn" client = WebClient(token=sltk) channel_id = "C06J5CYTQ3V" file_size = os.path.getsize(output_html) Loading Loading @@ -197,6 +296,14 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): OutputWorkspace=pg3_input, XMin=300 ) ApplyDiffCal( InstrumentWorkspace=pg3_input, CalibrationWorkspace="_cal" ) MaskDetectors( Workspace=pg3_input, MaskedWorkspace="_mask" ) ConvertUnits( InputWorkspace=pg3_input, OutputWorkspace="pg3_wksp_d", Loading Loading @@ -238,6 +345,14 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): ] def get_chunk_index(i): for idx, chunk in enumerate(pg3_chunks): low, high = chunk[0] if low <= i < high: return idx return -1 def get_chunk_index_1(i): for idx, chunk in enumerate(pg3_chunks_1): low, high = chunk[0] if low <= i < high: Loading @@ -258,7 +373,7 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): num_masks = 0 rel_offset = [] for i in range(num_hist): idx = get_chunk_index(i) idx = get_chunk_index_1(i) d_range_1 = pg3_chunks_1[idx][1] num_peaks_1 = pg3_chunks_1[idx][2] d_range_2 = pg3_chunks_2[idx][1] Loading @@ -275,6 +390,7 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): if mtd["_mask"].readY(i)[0] == 1.0: num_masks += 1 idx = get_chunk_index(i) # Alignment diagnostics if mtd["_mask"].readY(i)[0] == 1.0: rel_offset.append(0.0) Loading Loading @@ -314,8 +430,6 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): SpectrumList=bad_pixels_unmasked, OneSpectrumPerFile=True ) diag_input = [num_masks, len(bad_pixels_unmasked)] plot_masked_spectra(run_num, run_out_dir, True, diag_dict, diag_input) if len(good_pixels_masked) > 0: ExtractSpectra( InputWorkspace="pg3_wksp_d", Loading @@ -328,27 +442,14 @@ def pg3_calib_doctor(run_num, cal_file, manual_mask, out_dir, diag_dict): SpectrumList=good_pixels_masked, OneSpectrumPerFile=True ) diag_input = [num_masks, len(good_pixels_masked)] plot_masked_spectra(run_num, run_out_dir, False, diag_dict, diag_input) data = np.array(rel_offset) data = np.abs(data) original_indices = np.where(data != 0)[0] data = data[original_indices] # --- Outlier detection using IQR method --- Q1 = np.percentile(data, 5) Q3 = np.percentile(data, 95) IQR = Q3 - Q1 lower = Q1 - 1.5 * IQR upper = Q3 + 1.5 * IQR rel_offset_arr = np.array(rel_offset) mean = np.mean(data) std = np.std(data) bad_diag_input = [num_masks, len(bad_pixels_unmasked)] if bad_pixels_unmasked else None good_diag_input = [num_masks, len(good_pixels_masked)] if good_pixels_masked else None outlier_mask = (data < lower) | (data > upper) outliers = data[outlier_mask] outlier_indices_original = original_indices[outlier_mask] plot_diagnostics(run_num, run_out_dir, num_masks, bad_diag_input, good_diag_input, rel_offset_arr, diag_dict) return Loading