Loading lib/galaxy/managers/lddas.py +5 −54 Original line number Diff line number Diff line import logging from galaxy import model, util from galaxy import model from galaxy.managers import base as manager_base from galaxy.managers.datasets import DatasetAssociationManager from galaxy.structured_app import MinimalManagerApp Loading @@ -18,6 +18,7 @@ class LDDAManager(DatasetAssociationManager): """ Set up and initialize other managers needed by lddas. """ super().__init__(app) def get(self, trans, id, check_accessible=True): return manager_base.get_object(trans, id, Loading @@ -26,56 +27,6 @@ class LDDAManager(DatasetAssociationManager): check_accessible=check_accessible) def _set_permissions(self, trans, library_dataset, role_ids_dict): dataset = library_dataset.library_dataset_dataset_association.dataset new_access_roles_ids = role_ids_dict["DATASET_ACCESS"] new_manage_roles_ids = role_ids_dict["DATASET_MANAGE_PERMISSIONS"] new_modify_roles_ids = role_ids_dict["LIBRARY_MODIFY"] # ACCESS DATASET ROLES valid_access_roles = [] invalid_access_roles_ids = [] valid_roles_for_dataset, total_roles = trans.app.security_agent.get_valid_roles(trans, dataset) if new_access_roles_ids is None: trans.app.security_agent.make_dataset_public(dataset) else: for role_id in new_access_roles_ids: role = self.role_manager.get(trans, self.app, role_id) if role in valid_roles_for_dataset: valid_access_roles.append(role) else: invalid_access_roles_ids.append(role_id) if len(invalid_access_roles_ids) > 0: log.warning("The following roles could not be added to the dataset access permission: " + str(invalid_access_roles_ids)) access_permission = dict(access=valid_access_roles) trans.app.security_agent.set_dataset_permission(dataset, access_permission) # MANAGE DATASET ROLES valid_manage_roles = [] invalid_manage_roles_ids = [] new_manage_roles_ids = util.listify(new_manage_roles_ids) for role_id in new_manage_roles_ids: role = self.role_manager.get(trans, self.app, role_id) if role in valid_roles_for_dataset: valid_manage_roles.append(role) else: invalid_manage_roles_ids.append(role_id) if len(invalid_manage_roles_ids) > 0: log.warning("The following roles could not be added to the dataset manage permission: " + str(invalid_manage_roles_ids)) manage_permission = {trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS: valid_manage_roles} trans.app.security_agent.set_dataset_permission(dataset, manage_permission) # MODIFY LIBRARY ITEM ROLES valid_modify_roles = [] invalid_modify_roles_ids = [] new_modify_roles_ids = util.listify(new_modify_roles_ids) for role_id in new_modify_roles_ids: role = self.role_manager.get(trans, self.app, role_id) if role in valid_roles_for_dataset: valid_modify_roles.append(role) else: invalid_modify_roles_ids.append(role_id) if len(invalid_modify_roles_ids) > 0: log.warning("The following roles could not be added to the dataset modify permission: " + str(invalid_modify_roles_ids)) modify_permission = {trans.app.security_agent.permitted_actions.LIBRARY_MODIFY: valid_modify_roles} trans.app.security_agent.set_library_item_permission(library_dataset, modify_permission) # Check Git history for an older broken implementation, but it was broken # and security related and had not test coverage so it was deleted. raise NotImplementedError() lib/galaxy/webapps/galaxy/api/datasets.py +14 −10 Original line number Diff line number Diff line Loading @@ -46,6 +46,10 @@ class DatasetsController(BaseGalaxyAPIController, UsesVisualizationMixin): def serializer_by_type(self): return {'dataset': self.hda_serializer, 'dataset_collection': self.hdca_serializer} @property def dataset_manager_by_type(self): return {'hda': self.hda_manager, 'ldda': self.ldda_manager} def _parse_serialization_params(self, kwd, default_view): view = kwd.get('view', None) keys = kwd.get('keys') Loading Loading @@ -124,14 +128,15 @@ class DatasetsController(BaseGalaxyAPIController, UsesVisualizationMixin): ) return [self.serializer_by_type[content.history_content_type].serialize_to_view(content, user=trans.user, trans=trans, view=view) for content in contents] @web.legacy_expose_api_anonymous @web.expose_api_anonymous_and_sessionless def show(self, trans, id, hda_ldda='hda', data_type=None, provider=None, **kwd): """ GET /api/datasets/{encoded_dataset_id} Displays information about and/or content of a dataset. """ # Get dataset. dataset = self.get_hda_or_ldda(trans, hda_ldda=hda_ldda, dataset_id=id) decoded_dataset_id = self.decode_id(id) dataset = self.dataset_manager_by_type[hda_ldda].get_accessible(decoded_dataset_id, trans.user) # Use data type to return particular type of data. if data_type == 'state': Loading Loading @@ -169,7 +174,8 @@ class DatasetsController(BaseGalaxyAPIController, UsesVisualizationMixin): Display user-facing storage details related to the objectstore a dataset resides in. """ dataset_instance = self.get_hda_or_ldda(trans, hda_ldda=hda_ldda, dataset_id=dataset_id) decoded_dataset_id = self.decode_id(dataset_id) dataset_instance = self.dataset_manager_by_type[hda_ldda].get_accessible(decoded_dataset_id, trans.user) dataset = dataset_instance.dataset object_store = self.app.object_store object_store_id = dataset.object_store_id Loading Loading @@ -201,13 +207,11 @@ class DatasetsController(BaseGalaxyAPIController, UsesVisualizationMixin): if payload: kwd.update(payload) hda_ldda = kwd.get('hda_ldda', 'hda') dataset_assoc = self.get_hda_or_ldda(trans, hda_ldda=hda_ldda, dataset_id=dataset_id) if hda_ldda == "hda": self.hda_manager.update_permissions(trans, dataset_assoc, **kwd) return self.hda_manager.serialize_dataset_association_roles(trans, dataset_assoc) else: self.ldda_manager.update_permissions(trans, dataset_assoc, **kwd) return self.ldda_manager.serialize_dataset_association_roles(trans, dataset_assoc) decoded_dataset_id = self.decode_id(dataset_id) dataset_manager = self.dataset_manager_by_type[hda_ldda] dataset = dataset_manager.get_accessible(decoded_dataset_id, trans.user) dataset_manager.update_permissions(trans, dataset, **payload) return dataset_manager.serialize_dataset_association_roles(trans, dataset) def _dataset_in_use_state(self, dataset): """ Loading lib/galaxy_test/api/test_datasets.py +23 −0 Original line number Diff line number Diff line Loading @@ -102,6 +102,29 @@ class DatasetsApiTestCase(ApiTestCase): self._assert_status_code_is(show_response, 200) self.__assert_matches_hda(hda1, show_response.json()) def test_show_permission_denied(self): hda = self.dataset_populator.new_dataset(self.history_id) self.dataset_populator.make_private(history_id=self.history_id, dataset_id=hda['id']) with self._different_user(): show_response = self._get(f"datasets/{hda['id']}") self._assert_status_code_is(show_response, 403) def test_admin_can_update_permissions(self): # Create private dataset hda = self.dataset_populator.new_dataset(self.history_id) dataset_id = hda['id'] self.dataset_populator.make_private(history_id=self.history_id, dataset_id=dataset_id) # Admin removes restrictions payload = {"action": "remove_restrictions"} update_response = self._put(f"datasets/{dataset_id}/permissions", payload, admin=True) self._assert_status_code_is_ok(update_response) # Other users can access the dataset with self._different_user(): show_response = self._get(f"datasets/{hda['id']}") self._assert_status_code_is_ok(show_response) def __assert_matches_hda(self, input_hda, query_hda): self._assert_has_keys(query_hda, "id", "name") assert input_hda["name"] == query_hda["name"] Loading Loading
lib/galaxy/managers/lddas.py +5 −54 Original line number Diff line number Diff line import logging from galaxy import model, util from galaxy import model from galaxy.managers import base as manager_base from galaxy.managers.datasets import DatasetAssociationManager from galaxy.structured_app import MinimalManagerApp Loading @@ -18,6 +18,7 @@ class LDDAManager(DatasetAssociationManager): """ Set up and initialize other managers needed by lddas. """ super().__init__(app) def get(self, trans, id, check_accessible=True): return manager_base.get_object(trans, id, Loading @@ -26,56 +27,6 @@ class LDDAManager(DatasetAssociationManager): check_accessible=check_accessible) def _set_permissions(self, trans, library_dataset, role_ids_dict): dataset = library_dataset.library_dataset_dataset_association.dataset new_access_roles_ids = role_ids_dict["DATASET_ACCESS"] new_manage_roles_ids = role_ids_dict["DATASET_MANAGE_PERMISSIONS"] new_modify_roles_ids = role_ids_dict["LIBRARY_MODIFY"] # ACCESS DATASET ROLES valid_access_roles = [] invalid_access_roles_ids = [] valid_roles_for_dataset, total_roles = trans.app.security_agent.get_valid_roles(trans, dataset) if new_access_roles_ids is None: trans.app.security_agent.make_dataset_public(dataset) else: for role_id in new_access_roles_ids: role = self.role_manager.get(trans, self.app, role_id) if role in valid_roles_for_dataset: valid_access_roles.append(role) else: invalid_access_roles_ids.append(role_id) if len(invalid_access_roles_ids) > 0: log.warning("The following roles could not be added to the dataset access permission: " + str(invalid_access_roles_ids)) access_permission = dict(access=valid_access_roles) trans.app.security_agent.set_dataset_permission(dataset, access_permission) # MANAGE DATASET ROLES valid_manage_roles = [] invalid_manage_roles_ids = [] new_manage_roles_ids = util.listify(new_manage_roles_ids) for role_id in new_manage_roles_ids: role = self.role_manager.get(trans, self.app, role_id) if role in valid_roles_for_dataset: valid_manage_roles.append(role) else: invalid_manage_roles_ids.append(role_id) if len(invalid_manage_roles_ids) > 0: log.warning("The following roles could not be added to the dataset manage permission: " + str(invalid_manage_roles_ids)) manage_permission = {trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS: valid_manage_roles} trans.app.security_agent.set_dataset_permission(dataset, manage_permission) # MODIFY LIBRARY ITEM ROLES valid_modify_roles = [] invalid_modify_roles_ids = [] new_modify_roles_ids = util.listify(new_modify_roles_ids) for role_id in new_modify_roles_ids: role = self.role_manager.get(trans, self.app, role_id) if role in valid_roles_for_dataset: valid_modify_roles.append(role) else: invalid_modify_roles_ids.append(role_id) if len(invalid_modify_roles_ids) > 0: log.warning("The following roles could not be added to the dataset modify permission: " + str(invalid_modify_roles_ids)) modify_permission = {trans.app.security_agent.permitted_actions.LIBRARY_MODIFY: valid_modify_roles} trans.app.security_agent.set_library_item_permission(library_dataset, modify_permission) # Check Git history for an older broken implementation, but it was broken # and security related and had not test coverage so it was deleted. raise NotImplementedError()
lib/galaxy/webapps/galaxy/api/datasets.py +14 −10 Original line number Diff line number Diff line Loading @@ -46,6 +46,10 @@ class DatasetsController(BaseGalaxyAPIController, UsesVisualizationMixin): def serializer_by_type(self): return {'dataset': self.hda_serializer, 'dataset_collection': self.hdca_serializer} @property def dataset_manager_by_type(self): return {'hda': self.hda_manager, 'ldda': self.ldda_manager} def _parse_serialization_params(self, kwd, default_view): view = kwd.get('view', None) keys = kwd.get('keys') Loading Loading @@ -124,14 +128,15 @@ class DatasetsController(BaseGalaxyAPIController, UsesVisualizationMixin): ) return [self.serializer_by_type[content.history_content_type].serialize_to_view(content, user=trans.user, trans=trans, view=view) for content in contents] @web.legacy_expose_api_anonymous @web.expose_api_anonymous_and_sessionless def show(self, trans, id, hda_ldda='hda', data_type=None, provider=None, **kwd): """ GET /api/datasets/{encoded_dataset_id} Displays information about and/or content of a dataset. """ # Get dataset. dataset = self.get_hda_or_ldda(trans, hda_ldda=hda_ldda, dataset_id=id) decoded_dataset_id = self.decode_id(id) dataset = self.dataset_manager_by_type[hda_ldda].get_accessible(decoded_dataset_id, trans.user) # Use data type to return particular type of data. if data_type == 'state': Loading Loading @@ -169,7 +174,8 @@ class DatasetsController(BaseGalaxyAPIController, UsesVisualizationMixin): Display user-facing storage details related to the objectstore a dataset resides in. """ dataset_instance = self.get_hda_or_ldda(trans, hda_ldda=hda_ldda, dataset_id=dataset_id) decoded_dataset_id = self.decode_id(dataset_id) dataset_instance = self.dataset_manager_by_type[hda_ldda].get_accessible(decoded_dataset_id, trans.user) dataset = dataset_instance.dataset object_store = self.app.object_store object_store_id = dataset.object_store_id Loading Loading @@ -201,13 +207,11 @@ class DatasetsController(BaseGalaxyAPIController, UsesVisualizationMixin): if payload: kwd.update(payload) hda_ldda = kwd.get('hda_ldda', 'hda') dataset_assoc = self.get_hda_or_ldda(trans, hda_ldda=hda_ldda, dataset_id=dataset_id) if hda_ldda == "hda": self.hda_manager.update_permissions(trans, dataset_assoc, **kwd) return self.hda_manager.serialize_dataset_association_roles(trans, dataset_assoc) else: self.ldda_manager.update_permissions(trans, dataset_assoc, **kwd) return self.ldda_manager.serialize_dataset_association_roles(trans, dataset_assoc) decoded_dataset_id = self.decode_id(dataset_id) dataset_manager = self.dataset_manager_by_type[hda_ldda] dataset = dataset_manager.get_accessible(decoded_dataset_id, trans.user) dataset_manager.update_permissions(trans, dataset, **payload) return dataset_manager.serialize_dataset_association_roles(trans, dataset) def _dataset_in_use_state(self, dataset): """ Loading
lib/galaxy_test/api/test_datasets.py +23 −0 Original line number Diff line number Diff line Loading @@ -102,6 +102,29 @@ class DatasetsApiTestCase(ApiTestCase): self._assert_status_code_is(show_response, 200) self.__assert_matches_hda(hda1, show_response.json()) def test_show_permission_denied(self): hda = self.dataset_populator.new_dataset(self.history_id) self.dataset_populator.make_private(history_id=self.history_id, dataset_id=hda['id']) with self._different_user(): show_response = self._get(f"datasets/{hda['id']}") self._assert_status_code_is(show_response, 403) def test_admin_can_update_permissions(self): # Create private dataset hda = self.dataset_populator.new_dataset(self.history_id) dataset_id = hda['id'] self.dataset_populator.make_private(history_id=self.history_id, dataset_id=dataset_id) # Admin removes restrictions payload = {"action": "remove_restrictions"} update_response = self._put(f"datasets/{dataset_id}/permissions", payload, admin=True) self._assert_status_code_is_ok(update_response) # Other users can access the dataset with self._different_user(): show_response = self._get(f"datasets/{hda['id']}") self._assert_status_code_is_ok(show_response) def __assert_matches_hda(self, input_hda, query_hda): self._assert_has_keys(query_hda, "id", "name") assert input_hda["name"] == query_hda["name"] Loading