Unverified Commit 9d3268e7 authored by mvdbeek's avatar mvdbeek
Browse files

Ensure query columns are unique

parent 8e8d8999
Loading
Loading
Loading
Loading
+69 −46
Original line number Diff line number Diff line
@@ -466,9 +466,9 @@ class JobSearch:
        data_types = []
        used_ids: List = []
        for k, input_list in input_data.items():
            # k will be matched against the JobParameter.name column. This can be prefixed depending on whethter
            # k will be matched against the JobParameter.name column. This can be prefixed depending on whether
            # the input is in a repeat, or not (section and conditional)
            for type_values in input_list:
            for value_index, type_values in enumerate(input_list):
                t = type_values["src"]
                v = type_values["id"]
                requested_ids.append(v)
@@ -476,14 +476,25 @@ class JobSearch:
                identifier = type_values["identifier"]
                if t == "hda":
                    stmt = self._build_stmt_for_hda(
                        stmt, data_conditions, used_ids, k, v, identifier, require_name_match=require_name_match
                        stmt,
                        data_conditions,
                        used_ids,
                        k,
                        v,
                        identifier,
                        require_name_match=require_name_match,
                        value_index=value_index,
                    )
                elif t == "ldda":
                    stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v)
                    stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v, value_index=value_index)
                elif t == "hdca":
                    stmt = self._build_stmt_for_hdca(stmt, data_conditions, used_ids, k, v, user.id)
                    stmt = self._build_stmt_for_hdca(
                        stmt, data_conditions, used_ids, k, v, user.id, value_index=value_index
                    )
                elif t == "dce":
                    stmt = self._build_stmt_for_dce(stmt, data_conditions, used_ids, k, v, user.id)
                    stmt = self._build_stmt_for_dce(
                        stmt, data_conditions, used_ids, k, v, user.id, value_index=value_index
                    )
                else:
                    log.error("Unknown input data type %s", t)
                    return None
@@ -670,13 +681,15 @@ class JobSearch:
        )
        return final_ordered_stmt

    def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier, require_name_match=True):
    def _build_stmt_for_hda(
        self, stmt, data_conditions, used_ids, k, v, identifier, value_index, require_name_match=True
    ):
        a = aliased(model.JobToInputDatasetAssociation)
        b = aliased(model.HistoryDatasetAssociation)
        c = aliased(model.HistoryDatasetAssociation)
        d = aliased(model.JobParameter)
        e = aliased(model.HistoryDatasetAssociationHistory)
        labeled_col = a.dataset_id.label(f"{k}_{v}")
        labeled_col = a.dataset_id.label(f"{k}_{value_index}")
        stmt = stmt.add_columns(labeled_col)
        used_ids.append(labeled_col)
        stmt = stmt.join(a, a.job_id == model.Job.id)
@@ -725,16 +738,18 @@ class JobSearch:
        )
        return stmt

    def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v):
    def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v, value_index):
        a = aliased(model.JobToInputLibraryDatasetAssociation)
        labeled_col = a.ldda_id.label(f"{k}_{v}")
        labeled_col = a.ldda_id.label(f"{k}_{value_index}")
        stmt = stmt.add_columns(labeled_col)
        stmt = stmt.join(a, a.job_id == model.Job.id)
        data_conditions.append(and_(a.name == k, a.ldda_id == v))
        used_ids.append(labeled_col)
        return stmt

    def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v, user_id, require_name_match=True):
    def _build_stmt_for_hdca(
        self, stmt, data_conditions, used_ids, k, v, user_id, value_index, require_name_match=True
    ):
        # Strategy for efficiently finding equivalent HDCAs:
        # 1. Determine the structural depth of the target HDCA by its collection_type.
        # 2. For the target HDCA (identified by 'v'):
@@ -763,9 +778,12 @@ class JobSearch:
        )
        depth = collection_type.count(":") if collection_type else 0

        a = aliased(model.JobToInputDatasetCollectionAssociation, name=f"job_to_input_dataset_collection_1_{k}_{v}")
        a = aliased(
            model.JobToInputDatasetCollectionAssociation, name=f"job_to_input_dataset_collection_1_{k}_{value_index}"
        )
        hdca_input = aliased(
            model.HistoryDatasetCollectionAssociation, name=f"history_dataset_collection_association_1_{k}_{v}"
            model.HistoryDatasetCollectionAssociation,
            name=f"history_dataset_collection_association_1_{k}_{value_index}",
        )

        _hdca_target_cte_ref = aliased(model.HistoryDatasetCollectionAssociation, name="_hdca_target_cte_ref")
@@ -798,7 +816,7 @@ class JobSearch:
            .where(_hdca_target_cte_ref.id == v)
            .distinct()
        )
        reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{v}")
        reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{value_index}")
        # --- END NEW CTE ---

        # CTE 1: signature_elements_cte (for the reference HDCA)
@@ -829,7 +847,7 @@ class JobSearch:
            _hda_cte_ref, _hda_cte_ref.id == _leaf_cte_ref.hda_id
        )
        signature_elements_select = signature_elements_select.where(_hdca_target_cte_ref.id == v)
        signature_elements_cte = signature_elements_select.cte(f"signature_elements_{k}_{v}")
        signature_elements_cte = signature_elements_select.cte(f"signature_elements_{k}_{value_index}")

        # CTE 2: reference_full_signature_cte
        # This CTE aggregates the path signature strings of the reference HDCA into a
@@ -843,7 +861,7 @@ class JobSearch:
                ).label("signature_array")
            )
            .select_from(signature_elements_cte)
            .cte(f"reference_full_signature_{k}_{v}")
            .cte(f"reference_full_signature_{k}_{value_index}")
        )

        candidate_hdca = aliased(model.HistoryDatasetCollectionAssociation, name="candidate_hdca")
@@ -883,7 +901,7 @@ class JobSearch:
            .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap)))
        )
        candidate_hdca_pre_filter_ids_cte = candidate_hdca_pre_filter_ids_select.cte(
            f"cand_hdca_pre_filter_ids_{k}_{v}"
            f"cand_hdca_pre_filter_ids_{k}_{value_index}"
        )
        # --- END NEW CTE ---

@@ -919,7 +937,7 @@ class JobSearch:
            candidate_hda, candidate_hda.id == _leaf_candidate_dce.hda_id
        )
        candidate_signature_elements_cte = candidate_signature_elements_select.cte(
            f"candidate_signature_elements_{k}_{v}"
            f"candidate_signature_elements_{k}_{value_index}"
        )

        # CTE 4: candidate_full_signatures_cte
@@ -936,7 +954,7 @@ class JobSearch:
            )
            .select_from(candidate_signature_elements_cte)
            .group_by(candidate_signature_elements_cte.c.candidate_hdca_id)
            .cte(f"candidate_full_signatures_{k}_{v}")
            .cte(f"candidate_full_signatures_{k}_{value_index}")
        )

        # CTE 5: equivalent_hdca_ids_cte
@@ -948,13 +966,13 @@ class JobSearch:
                candidate_full_signatures_cte.c.full_signature_array
                == select(reference_full_signature_cte.c.signature_array).scalar_subquery()
            )
            .cte(f"equivalent_hdca_ids_{k}_{v}")
            .cte(f"equivalent_hdca_ids_{k}_{value_index}")
        )

        # Main query `stmt` construction
        # This section joins the base job statement with the associations and then filters
        # by the HDCAs identified as equivalent in the CTEs.
        labeled_col = a.dataset_collection_id.label(f"{k}_{v}")
        labeled_col = a.dataset_collection_id.label(f"{k}_{value_index}")
        stmt = stmt.add_columns(labeled_col)
        stmt = stmt.join(a, a.job_id == model.Job.id)

@@ -972,7 +990,7 @@ class JobSearch:
        data_conditions.append(a.name == k)
        return stmt

    def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id):
    def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v, user_id, value_index):
        dce_root_target = self.sa_session.get_one(model.DatasetCollectionElement, v)

        # Determine if the target DCE points to an HDA or a child collection
@@ -987,15 +1005,18 @@ class JobSearch:
            depth = collection_type.count(":") if collection_type else 0

            # Aliases for the target DCE's collection structure
            _dce_target_root_ref = aliased(model.DatasetCollectionElement, name=f"_dce_target_root_ref_{k}_{v}")
            _dce_target_root_ref = aliased(
                model.DatasetCollectionElement, name=f"_dce_target_root_ref_{k}_{value_index}"
            )
            _dce_target_child_collection_ref = aliased(
                model.DatasetCollection, name=f"_dce_target_child_collection_ref_{k}_{v}"
                model.DatasetCollection, name=f"_dce_target_child_collection_ref_{k}_{value_index}"
            )
            # List of aliases for each potential nested level of DatasetCollectionElements
            _dce_target_level_list = [
                aliased(model.DatasetCollectionElement, name=f"_dce_target_level_{k}_{v}_{i}") for i in range(depth + 1)
                aliased(model.DatasetCollectionElement, name=f"_dce_target_level_{k}_{value_index}_{i}")
                for i in range(depth + 1)
            ]
            _hda_target_ref = aliased(model.HistoryDatasetAssociation, name=f"_hda_target_ref_{k}_{v}")
            _hda_target_ref = aliased(model.HistoryDatasetAssociation, name=f"_hda_target_ref_{k}_{value_index}")

            # --- CTE: reference_dce_all_dataset_ids_cte ---
            # This CTE (Common Table Expression) identifies all distinct dataset IDs
@@ -1028,7 +1049,7 @@ class JobSearch:
                .where(_dce_target_root_ref.id == v)
                .distinct()
            )
            reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{v}")
            reference_all_dataset_ids_cte = reference_all_dataset_ids_select.cte(f"ref_all_ds_ids_{k}_{value_index}")

            # --- CTE: reference_dce_signature_elements_cte ---
            # This CTE generates a "path signature string" for each individual element
@@ -1069,7 +1090,7 @@ class JobSearch:
                _hda_target_ref, _hda_target_ref.id == _leaf_target_dce_ref.hda_id
            ).where(_dce_target_root_ref.id == v)
            reference_dce_signature_elements_cte = reference_dce_signature_elements_select.cte(
                f"ref_dce_sig_els_{k}_{v}"
                f"ref_dce_sig_els_{k}_{value_index}"
            )

            # --- CTE: reference_full_signature_cte ---
@@ -1093,7 +1114,7 @@ class JobSearch:
                    ),  # Count elements based on path_signature_string
                )
                .select_from(reference_dce_signature_elements_cte)
                .cte(f"ref_dce_full_sig_{k}_{v}")
                .cte(f"ref_dce_full_sig_{k}_{value_index}")
            )

            # --- Aliases for Candidate Dataset Collection Structure ---
@@ -1101,14 +1122,14 @@ class JobSearch:
            # in the database, which will be compared against the reference.
            candidate_dce_root = aliased(model.DatasetCollectionElement, name=f"candidate_dce_root_{k}_{v}")
            candidate_dce_child_collection = aliased(
                model.DatasetCollection, name=f"candidate_dce_child_collection_{k}_{v}"
                model.DatasetCollection, name=f"candidate_dce_child_collection_{k}_{value_index}"
            )
            candidate_dce_level_list = [
                aliased(model.DatasetCollectionElement, name=f"candidate_dce_level_{k}_{v}_{i}")
                aliased(model.DatasetCollectionElement, name=f"candidate_dce_level_{k}_{value_index}_{i}")
                for i in range(depth + 1)
            ]
            candidate_hda = aliased(model.HistoryDatasetAssociation, name=f"candidate_hda_{k}_{v}")
            candidate_history = aliased(model.History, name=f"candidate_history_{k}_{v}")
            candidate_hda = aliased(model.HistoryDatasetAssociation, name=f"candidate_hda_{k}_{value_index}")
            candidate_history = aliased(model.History, name=f"candidate_history_{k}_{value_index}")

            # --- CTE: candidate_dce_pre_filter_ids_cte (Initial Candidate Filtering) ---
            # This CTE performs a first pass to quickly narrow down potential candidate
@@ -1146,7 +1167,7 @@ class JobSearch:
                .where(candidate_hda.dataset_id.in_(select(reference_all_dataset_ids_cte.c.ref_dataset_id_for_overlap)))
            )
            candidate_dce_pre_filter_ids_cte = candidate_dce_pre_filter_ids_select.cte(
                f"cand_dce_pre_filter_ids_{k}_{v}"
                f"cand_dce_pre_filter_ids_{k}_{value_index}"
            )

            # --- CTE: candidate_dce_signature_elements_cte ---
@@ -1195,7 +1216,7 @@ class JobSearch:
                .where(or_(candidate_history.published == true(), candidate_history.user_id == user_id))
            )
            candidate_dce_signature_elements_cte = candidate_dce_signature_elements_select.cte(
                f"cand_dce_sig_els_{k}_{v}"
                f"cand_dce_sig_els_{k}_{value_index}"
            )

            # --- CTE: candidate_pre_signatures_cte (Candidate Aggregation for Comparison) ---
@@ -1218,7 +1239,7 @@ class JobSearch:
                )
                .select_from(candidate_dce_signature_elements_cte)
                .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id)
                .cte(f"cand_dce_pre_sig_{k}_{v}")
                .cte(f"cand_dce_pre_sig_{k}_{value_index}")
            )

            # --- CTE: filtered_cand_dce_by_dataset_ids_cte (Filtering by Element Count and Dataset ID Array) ---
@@ -1238,7 +1259,7 @@ class JobSearch:
                        == reference_full_signature_cte.c.ordered_dataset_id_array,
                    )
                )
                .cte(f"filtered_cand_dce_{k}_{v}")
                .cte(f"filtered_cand_dce_{k}_{value_index}")
            )

            # --- CTE: final_candidate_signatures_cte (Final Full Signature Calculation for Matched Candidates) ---
@@ -1261,16 +1282,17 @@ class JobSearch:
                    )
                )
                .group_by(candidate_dce_signature_elements_cte.c.candidate_dce_id)
                .cte(f"final_cand_dce_full_sig_{k}_{v}")
                .cte(f"final_cand_dce_full_sig_{k}_{value_index}")
            )

            # --- Main Query Construction for Dataset Collection Elements ---
            # This section joins the main `stmt` (representing jobs) with the CTEs
            # to filter jobs whose input DCE matches the reference DCE's full signature.
            a = aliased(
                model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{v}"
                model.JobToInputDatasetCollectionElementAssociation,
                name=f"job_to_input_dce_association_{k}_{value_index}",
            )
            labeled_col = a.dataset_collection_element_id.label(f"{k}_{v}")
            labeled_col = a.dataset_collection_element_id.label(f"{k}_{value_index}")
            stmt = stmt.add_columns(labeled_col)
            stmt = stmt.join(a, a.job_id == model.Job.id)

@@ -1302,17 +1324,18 @@ class JobSearch:

            # Aliases for the "left" side (job to input DCE path)
            a = aliased(
                model.JobToInputDatasetCollectionElementAssociation, name=f"job_to_input_dce_association_{k}_{v}"
                model.JobToInputDatasetCollectionElementAssociation,
                name=f"job_to_input_dce_association_{k}_{value_index}",
            )
            dce_left = aliased(model.DatasetCollectionElement, name=f"dce_left_{k}_{v}")
            hda_left = aliased(model.HistoryDatasetAssociation, name=f"hda_left_{k}_{v}")
            dce_left = aliased(model.DatasetCollectionElement, name=f"dce_left_{k}_{value_index}")
            hda_left = aliased(model.HistoryDatasetAssociation, name=f"hda_left_{k}_{value_index}")

            # Aliases for the "right" side (target DCE path in the main query)
            dce_right = aliased(model.DatasetCollectionElement, name=f"dce_right_{k}_{v}")
            hda_right = aliased(model.HistoryDatasetAssociation, name=f"hda_right_{k}_{v}")
            dce_right = aliased(model.DatasetCollectionElement, name=f"dce_right_{k}_{value_index}")
            hda_right = aliased(model.HistoryDatasetAssociation, name=f"hda_right_{k}_{value_index}")

            # Start joins from job → input DCE association → first-level DCE (left side)
            labeled_col = a.dataset_collection_element_id.label(f"{k}_{v}")
            labeled_col = a.dataset_collection_element_id.label(f"{k}_{value_index}")
            stmt = stmt.add_columns(labeled_col)
            stmt = stmt.join(a, a.job_id == model.Job.id)
            stmt = stmt.join(dce_left, dce_left.id == a.dataset_collection_element_id)
+17 −0
Original line number Diff line number Diff line
@@ -1097,6 +1097,23 @@ class TestToolsApi(ApiTestCase, TestsTools):
            job_details = self.dataset_populator.get_job_details(copied_job_id, full=True).json()
            assert job_details["copied_from_job_id"] == outputs_one["jobs"][0]["id"]

    @skip_without_tool("cat_list")
    @requires_new_history
    def test_run_cat_list_use_cached_job_repeated_input(self):
        with self.dataset_populator.test_history_for(
            self.test_run_cat_list_use_cached_job_repeated_input
        ) as history_id:
            # Run simple non-upload tool with an input data parameter.
            input_value = dataset_to_param(self.dataset_populator.new_dataset(history_id=history_id))
            inputs = {"input1": {"batch": False, "values": [input_value, input_value]}}
            outputs_one = self._run("cat_list", history_id, inputs, assert_ok=True, wait_for_job=True)
            outputs_two = self._run(
                "cat_list", history_id, inputs, assert_ok=True, wait_for_job=True, use_cached_job=True
            )
            copied_job_id = outputs_two["jobs"][0]["id"]
            job_details = self.dataset_populator.get_job_details(copied_job_id, full=True).json()
            assert job_details["copied_from_job_id"] == outputs_one["jobs"][0]["id"]

    @skip_without_tool("collection_creates_list")
    @requires_new_history
    def test_run_collection_creates_list_use_cached_job(self):