Unverified Commit b48c73a6 authored by mvdbeek's avatar mvdbeek
Browse files

Add support for bulk dbkey and datatype changes

parent c2aa3b73
Loading
Loading
Loading
Loading
+31 −12
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ from typing import (
    Union,
)

from celery import group
from pydantic import (
    Extra,
    Field,
@@ -1618,24 +1619,42 @@ class HistoryItemOperator:
        self, item: HistoryItemModel, params: ChangeDatatypeOperationParams, trans: ProvidesHistoryContext
    ):
        if isinstance(item, HistoryDatasetAssociation):
            wrapped_task = self._change_item_datatype(item, params, trans)
            trans.sa_session.flush()
            if wrapped_task:
                wrapped_task.delay()

        elif isinstance(item, HistoryDatasetCollectionAssociation):
            wrapped_tasks = []
            for dataset_instance in item.dataset_instances:
                wrapped_task = self._change_item_datatype(dataset_instance, params, trans)
                if wrapped_task:
                    wrapped_tasks.append(wrapped_task)
            trans.sa_session.flush()
            group(wrapped_tasks).delay()

    def _change_item_datatype(
        self, item: HistoryDatasetAssociation, params: ChangeDatatypeOperationParams, trans: ProvidesHistoryContext
    ):
        self.hda_manager.ensure_can_change_datatype(item)
        self.hda_manager.ensure_can_set_metadata(item)
        is_deferred = item.has_deferred_data
        item.dataset.state = item.dataset.states.SETTING_METADATA
            trans.sa_session.flush()
        if is_deferred:
            if params.datatype == "auto":  # if `auto` just keep the original guessed datatype
                item.update()  # TODO: remove this `update` when we can properly track the operation results to notify the history
            else:
                trans.app.datatypes_registry.change_datatype(item, params.datatype)
            item.dataset.state = item.dataset.states.DEFERRED
                trans.sa_session.flush()
        else:
                change_datatype.delay(dataset_id=item.id, datatype=params.datatype)
            return change_datatype.si(dataset_id=item.id, datatype=params.datatype)

    def _change_dbkey(self, item: HistoryItemModel, params: ChangeDbkeyOperationParams):
        if isinstance(item, HistoryDatasetAssociation):
            item.set_dbkey(params.dbkey)
        elif isinstance(item, HistoryDatasetCollectionAssociation):
            for dataset_instance in item.dataset_instances:
                dataset_instance.set_dbkey(params.dbkey)

    def _add_tags(self, item: HistoryItemModel, user: User, params: TagOperationParams):
        manager = self._get_item_manager(item)
+63 −2
Original line number Diff line number Diff line
@@ -1212,7 +1212,7 @@ class HistoryContentsApiBulkOperationTestCase(ApiTestCase):
            _, _, history_contents = self._create_test_history_contents(history_id)

            expected_dbkey = "apiMel3"
            # Change dbkey of all items (only datasets will be affected)
            # Change dbkey of all items
            payload = {
                "operation": "change_dbkey",
                "params": {
@@ -1220,7 +1220,7 @@ class HistoryContentsApiBulkOperationTestCase(ApiTestCase):
                    "dbkey": expected_dbkey,
                },
            }
            # All items should success (even collections)
            # All items should succeed
            expected_success_count = len(history_contents)
            bulk_operation_result = self._apply_bulk_operation(history_id, payload)
            self._assert_bulk_success(bulk_operation_result, expected_success_count)
@@ -1229,6 +1229,30 @@ class HistoryContentsApiBulkOperationTestCase(ApiTestCase):
                if item["history_content_type"] == "dataset":
                    assert item["dbkey"] == expected_dbkey

    def test_bulk_dbkey_change_dataset_collection(self):
        with self.dataset_populator.test_history() as history_id:
            _, collection_ids, history_contents = self._create_test_history_contents(history_id)

            expected_dbkey = "apiMel3"
            # Change dbkey of all items
            payload = {
                "operation": "change_dbkey",
                "params": {
                    "type": "change_dbkey",
                    "dbkey": expected_dbkey,
                },
            }
            # All items should succeed
            expected_success_count = len(collection_ids)
            query = "q=history_content_type-eq&qv=dataset_collection"
            bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
            self._assert_bulk_success(bulk_operation_result, expected_success_count)
            history_contents = self._get_history_contents(history_id, query="?v=dev&keys=dbkey")
            # now verify that datasets within collections have the expected dbkey
            for item in history_contents:
                if item["history_content_type"] == "dataset":
                    assert item["dbkey"] == expected_dbkey

    def test_bulk_datatype_change(self):
        with self.dataset_populator.test_history() as history_id:
            num_datasets = 3
@@ -1266,6 +1290,43 @@ class HistoryContentsApiBulkOperationTestCase(ApiTestCase):
                assert item["data_type"] == "galaxy.datatypes.tabular.Tabular"
                assert "metadata_column_names" in item

    def test_bulk_datatype_change_collection(self):
        with self.dataset_populator.test_history() as history_id:
            _, collection_ids, history_contents = self._create_test_history_contents(history_id)

            history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
            for item in history_contents:
                if item["history_content_type"] == "dataset":
                    assert item["extension"] == "txt"
                    assert item["data_type"] == "galaxy.datatypes.data.Text"
                    assert "metadata_column_names" not in item

            self.dataset_populator.wait_for_history_jobs(history_id)

            expected_datatype = "tabular"
            # Change datatype of all datasets
            payload = {
                "operation": "change_datatype",
                "params": {
                    "type": "change_datatype",
                    "datatype": expected_datatype,
                },
            }
            bulk_operation_result = self._apply_bulk_operation(
                history_id, payload, query="q=history_content_type-eq&qv=dataset_collection"
            )
            self._assert_bulk_success(bulk_operation_result, expected_success_count=len(collection_ids))

            # Wait for celery tasks to finish
            self.dataset_populator.wait_for_history(history_id)

            history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
            for item in history_contents:
                if item["history_content_type"] == "dataset":
                    assert item["extension"] == "tabular"
                    assert item["data_type"] == "galaxy.datatypes.tabular.Tabular"
                    assert "metadata_column_names" in item

    def test_bulk_datatype_change_should_skip_set_metadata_on_deferred_data(self):
        with self.dataset_populator.test_history() as history_id:
            details = self.dataset_populator.create_deferred_hda(