Loading lib/galaxy/datatypes/qiime2.py +68 −79 Original line number Diff line number Diff line import io import ast import html import io import uuid as _uuid import zipfile Loading @@ -8,16 +8,17 @@ import yaml from galaxy.datatypes.binary import CompressedZipArchive from galaxy.datatypes.metadata import MetadataElement from galaxy.datatypes.tabular import Tabular from galaxy.datatypes.sniff import build_sniff_from_prefix from galaxy.datatypes.tabular import Tabular class _QIIME2ResultBase(CompressedZipArchive): """Base class for QIIME2Artifact and QIIME2Visualization""" MetadataElement(name="semantic_type", readonly=True) MetadataElement(name="semantic_type_simple", readonly=True, visible=False) MetadataElement(name="uuid", readonly=True) MetadataElement(name="format", optional=True, no_value='', readonly=True) MetadataElement(name="format", optional=True, no_value="", readonly=True) MetadataElement(name="version", readonly=True) def set_meta(self, dataset, overwrite=True, **kwd): Loading @@ -26,36 +27,32 @@ class _QIIME2ResultBase(CompressedZipArchive): if value: setattr(dataset.metadata, key, value) dataset.metadata.semantic_type_simple = \ _strip_properties(dataset.metadata.semantic_type) dataset.metadata.semantic_type_simple = _strip_properties(dataset.metadata.semantic_type) def set_peek(self, dataset, is_multi_byte=False): if dataset.metadata.semantic_type == 'Visualization': dataset.blurb = 'QIIME 2 Visualization' if dataset.metadata.semantic_type == "Visualization": dataset.blurb = "QIIME 2 Visualization" else: dataset.blurb = 'QIIME 2 Artifact' dataset.blurb = "QIIME 2 Artifact" dataset.peek = '\n'.join(map(': '.join, self._peek(dataset))) dataset.peek = "\n".join(map(": ".join, self._peek(dataset))) def display_peek(self, dataset): def make_row(item): return ('<tr><th>%s</th><td>%s</td></td>' % tuple(html.escape(x) for x in item)) return "<tr><th>%s</th><td>%s</td></td>" % tuple(html.escape(x) for x in item) table = ['<table cellspacing="0" cellpadding="2">'] table += list(map(make_row, self._peek(dataset, simple=True))) table += ['</table>'] table += ["</table>"] return ''.join(table) return "".join(table) def _peek(self, dataset, simple=False): peek = [ ('Type', dataset.metadata.semantic_type), ('UUID', dataset.metadata.uuid)] peek = [("Type", dataset.metadata.semantic_type), ("UUID", dataset.metadata.uuid)] if not simple: if dataset.metadata.semantic_type != 'Visualization': peek.append(('Format', dataset.metadata.format)) peek.append(('Version', dataset.metadata.version)) if dataset.metadata.semantic_type != "Visualization": peek.append(("Format", dataset.metadata.format)) peek.append(("Version", dataset.metadata.version)) return peek def _sniff(self, filename): Loading @@ -73,7 +70,7 @@ class QIIME2Artifact(_QIIME2ResultBase): def sniff(self, filename): metadata = self._sniff(filename) return metadata and metadata['semantic_type'] != 'Visualization' return metadata and metadata["semantic_type"] != "Visualization" class QIIME2Visualization(_QIIME2ResultBase): Loading @@ -81,7 +78,7 @@ class QIIME2Visualization(_QIIME2ResultBase): def sniff(self, filename): metadata = self._sniff(filename) return metadata and metadata['semantic_type'] == 'Visualization' return metadata and metadata["semantic_type"] == "Visualization" @build_sniff_from_prefix Loading @@ -97,16 +94,17 @@ class QIIME2Metadata(Tabular): on the second line (after the header). This is the most typical location and interacts best with the current implementation of Tabular. """ file_ext = "qiime2.tabular" is_subclass = False _TYPES_DIRECTIVE = '#q2:types' _TYPES_DIRECTIVE = "#q2:types" _search_lines = 2 def get_column_names(self, first_line=None): if first_line is None: return None return first_line.strip().split('\t') return first_line.strip().split("\t") def set_meta(self, dataset, **kwargs): """ Loading @@ -124,30 +122,26 @@ class QIIME2Metadata(Tabular): if line is None: return q2_types = line.strip().split('\t') q2_types = line.strip().split("\t") # The first column (q2:types) is always the IDs q2_types[0] = 'index' q2_types[0] = "index" if len(q2_types) < dataset.metadata.columns: # this is probably malformed, but easy to fix q2_types.extend([''] * (dataset.metadata.columns - len(q2_types))) for idx, (q2_type, col_type) in enumerate( zip(q2_types, dataset.metadata.column_types)): if q2_type == '': if col_type in ('float', 'int'): q2_types[idx] = 'numeric' q2_types.extend([""] * (dataset.metadata.columns - len(q2_types))) for idx, (q2_type, col_type) in enumerate(zip(q2_types, dataset.metadata.column_types)): if q2_type == "": if col_type in ("float", "int"): q2_types[idx] = "numeric" else: q2_types[idx] = 'categorical' q2_types[idx] = "categorical" else: if (q2_type == 'categorical' and col_type in ('float', 'int', 'list')): dataset.metadata.column_types[idx] = 'str' if q2_type == "categorical" and col_type in ("float", "int", "list"): dataset.metadata.column_types[idx] = "str" def sniff_prefix(self, file_prefix): for _, line in zip(range(self._search_lines), file_prefix.line_iterator()): for _, line in zip(range(self._search_lines), file_prefix.line_iterator()): if line.startswith(self._TYPES_DIRECTIVE): return True Loading Loading @@ -184,38 +178,38 @@ def _strip_properties(expression): # https://docs.python.org/3.9/library/ast.html#ast.unparse class _PredicateRemover(ast.NodeVisitor): binops = { ast.Add: ' + ', ast.Sub: ' - ', ast.Mult: ' * ', ast.Div: ' / ', ast.FloorDiv: ' // ', ast.Pow: ' ** ', ast.LShift: ' << ', ast.RShift: ' >> ', ast.BitOr: ' | ', ast.BitXor: ' ^ ', ast.BitAnd: ' & ', ast.MatMult: ' @ ' ast.Add: " + ", ast.Sub: " - ", ast.Mult: " * ", ast.Div: " / ", ast.FloorDiv: " // ", ast.Pow: " ** ", ast.LShift: " << ", ast.RShift: " >> ", ast.BitOr: " | ", ast.BitXor: " ^ ", ast.BitAnd: " & ", ast.MatMult: " @ ", } def __init__(self): self.expression = '' self.expression = "" def visit_Name(self, node): self.expression += node.id def visit_Subscript(self, node): self.visit(node.value) self.expression += '[' self.expression += "[" self.visit(node.slice) self.expression += ']' self.expression += "]" def visit_Tuple(self, node): trailing_comma = '' trailing_comma = "" for n in node.elts: self.expression += trailing_comma self.visit(n) trailing_comma = ', ' trailing_comma = ", " def visit_BinOp(self, node): self.visit(node.left) Loading @@ -230,63 +224,58 @@ def _get_metadata_from_archive(archive): metadata_contents = _get_metadata_contents(archive, uuid) return { 'uuid': uuid, 'version': framework_version, 'semantic_type': metadata_contents['type'], 'format': metadata_contents['format'] or '' "uuid": uuid, "version": framework_version, "semantic_type": metadata_contents["type"], "format": metadata_contents["format"] or "", } def _get_metadata_contents(path, uuid): with _open_file_in_archive(path, 'metadata.yaml', uuid) as fh: with _open_file_in_archive(path, "metadata.yaml", uuid) as fh: return yaml.safe_load(fh.read()) def _get_uuid(path): roots = set() for relpath in _iter_zip_root(path): if not relpath.startswith('.'): if not relpath.startswith("."): roots.add(relpath) if len(roots) == 0: raise ValueError("Archive does not have a visible root directory.") if len(roots) > 1: raise ValueError("Archive has multiple root directories: %r" % roots) raise ValueError("Archive has multiple root directories: %r" % roots) uuid = roots.pop() if not _is_uuid4(uuid): raise ValueError( "Archive root directory name %r is not a valid version 4 " "UUID." % uuid) raise ValueError("Archive root directory name %r is not a valid version 4 " "UUID." % uuid) return uuid def _get_versions(path, uuid): try: with _open_file_in_archive(path, 'VERSION', uuid) as fh: header, version_line, framework_version_line, eof = \ fh.read().split('\n') if header.strip() != 'QIIME 2': with _open_file_in_archive(path, "VERSION", uuid) as fh: header, version_line, framework_version_line, eof = fh.read().split("\n") if header.strip() != "QIIME 2": raise Exception() # GOTO except Exception version = version_line.split(':')[1].strip() framework_version = framework_version_line.split(':')[1].strip() version = version_line.split(":")[1].strip() framework_version = framework_version_line.split(":")[1].strip() return version, framework_version except Exception: raise ValueError("Archive does not contain a correctly formatted" " VERSION file.") raise ValueError("Archive does not contain a correctly formatted" " VERSION file.") def _open_file_in_archive(zip_path, path, uuid): relpath = '/'.join([uuid, path]) with zipfile.ZipFile(zip_path, mode='r') as zf: relpath = "/".join([uuid, path]) with zipfile.ZipFile(zip_path, mode="r") as zf: return io.TextIOWrapper(zf.open(relpath)) def _iter_zip_root(path): seen = set() with zipfile.ZipFile(path, mode='r') as zf: with zipfile.ZipFile(path, mode="r") as zf: for name in zf.namelist(): parts = name.split('/') # zip is always / for seperators parts = name.split("/") # zip is always / for seperators if len(parts) > 0: result = parts[0] if result not in seen: Loading test/unit/data/datatypes/test_qiime2.py +59 −48 Original line number Diff line number Diff line from galaxy.datatypes.qiime2 import (_strip_properties, QIIME2Artifact, QIIME2Visualization, QIIME2Metadata) from .util import MockDataset, get_input_files from galaxy.datatypes.qiime2 import ( _strip_properties, QIIME2Artifact, QIIME2Metadata, QIIME2Visualization, ) from .util import ( get_input_files, MockDataset, ) # Tests for QIIME2Artifact: def test_qza_sniff(): qza = QIIME2Artifact() with get_input_files('qiime2.qza') as input_files: with get_input_files("qiime2.qza") as input_files: assert qza.sniff(input_files[0]) is True def test_qza_set_meta(): qza = QIIME2Artifact() with get_input_files('qiime2.qza') as input_files: with get_input_files("qiime2.qza") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] qza.set_meta(dataset) assert dataset.metadata.uuid == 'ba8c55e1-a2bc-47ea-beb2-b37b0b3b4032' assert dataset.metadata.version == '2022.2.1' assert dataset.metadata.format == 'SingleIntDirectoryFormat' assert dataset.metadata.semantic_type == 'SingleInt1' assert dataset.metadata.semantic_type_simple == 'SingleInt1' assert dataset.metadata.uuid == "ba8c55e1-a2bc-47ea-beb2-b37b0b3b4032" assert dataset.metadata.version == "2022.2.1" assert dataset.metadata.format == "SingleIntDirectoryFormat" assert dataset.metadata.semantic_type == "SingleInt1" assert dataset.metadata.semantic_type_simple == "SingleInt1" def test_qza_set_peek(): qza = QIIME2Artifact() with get_input_files('qiime2.qza') as input_files: with get_input_files("qiime2.qza") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] qza.set_meta(dataset) qza.set_peek(dataset) assert dataset.peek == '''Type: SingleInt1 assert ( dataset.peek == """Type: SingleInt1 UUID: ba8c55e1-a2bc-47ea-beb2-b37b0b3b4032 Format: SingleIntDirectoryFormat Version: 2022.2.1''' Version: 2022.2.1""" ) # Tests for QIIME2Visualization: def test_qzv_sniff(): qzv = QIIME2Visualization() with get_input_files('qiime2.qzv') as input_files: with get_input_files("qiime2.qzv") as input_files: assert qzv.sniff(input_files[0]) is True def test_qzv_set_meta(): qzv = QIIME2Visualization() with get_input_files('qiime2.qzv') as input_files: with get_input_files("qiime2.qzv") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] qzv.set_meta(dataset) assert dataset.metadata.uuid == '368ba1e7-3a7c-4dbc-98da-79f41aeece63' assert dataset.metadata.version == '2022.2.1' assert dataset.metadata.semantic_type == 'Visualization' assert dataset.metadata.semantic_type_simple == 'Visualization' assert dataset.metadata.uuid == "368ba1e7-3a7c-4dbc-98da-79f41aeece63" assert dataset.metadata.version == "2022.2.1" assert dataset.metadata.semantic_type == "Visualization" assert dataset.metadata.semantic_type_simple == "Visualization" def test_qzv_set_peek(): qzv = QIIME2Visualization() with get_input_files('qiime2.qzv') as input_files: with get_input_files("qiime2.qzv") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] qzv.set_meta(dataset) qzv.set_peek(dataset) assert dataset.peek == '''Type: Visualization assert ( dataset.peek == """Type: Visualization UUID: 368ba1e7-3a7c-4dbc-98da-79f41aeece63 Version: 2022.2.1''' Version: 2022.2.1""" ) # Tets for QIIME2Metadata: def test_qiime2tabular_sniff(): q2md = QIIME2Metadata() with get_input_files('qiime2.tsv') as input_files: with get_input_files("qiime2.tsv") as input_files: assert q2md.sniff(input_files[0]) is True def test_qiime2tabular_sniff_false(): q2md = QIIME2Metadata() with get_input_files('test_tab1.tabular') as input_files: with get_input_files("test_tab1.tabular") as input_files: assert q2md.sniff(input_files[0]) is False def test_qiime2tabular_set_meta(): q2md = QIIME2Metadata() with get_input_files('qiime2.tsv') as input_files: with get_input_files("qiime2.tsv") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] q2md.set_meta(dataset) # Show override of type inferrence on the second to last column: assert dataset.metadata.column_types == ['str', 'str', 'str', 'int'] assert dataset.metadata.column_types == ["str", "str", "str", "int"] # Tests for _strip_properties, which is rather complicated so worth testing Loading @@ -109,9 +124,10 @@ def test_qiime2tabular_set_meta(): # Note: Not all the expressions here are completely valid types they are just # representative examples def test_strip_properties_simple(): simple_expression = 'Taxonomy % Properties("SILVIA")' stripped_expression = 'Taxonomy' stripped_expression = "Taxonomy" reconstructed_expression = _strip_properties(simple_expression) Loading @@ -120,7 +136,7 @@ def test_strip_properties_simple(): def test_strip_properties_single(): single_expression = 'FeatureData[Taxonomy % Properties("SILVIA")]' stripped_expression = 'FeatureData[Taxonomy]' stripped_expression = "FeatureData[Taxonomy]" reconstructed_expression = _strip_properties(single_expression) Loading @@ -128,9 +144,8 @@ def test_strip_properties_single(): def test_strip_properties_double(): double_expression = ('FeatureData[Taxonomy % Properties("SILVIA"), ' 'DistanceMatrix % Axes("ASV", "ASV")]') stripped_expression = 'FeatureData[Taxonomy, DistanceMatrix]' double_expression = 'FeatureData[Taxonomy % Properties("SILVIA"), ' 'DistanceMatrix % Axes("ASV", "ASV")]' stripped_expression = "FeatureData[Taxonomy, DistanceMatrix]" reconstructed_expression = _strip_properties(double_expression) Loading @@ -138,9 +153,8 @@ def test_strip_properties_double(): def test_strip_properties_nested(): nested_expression = ('Tuple[FeatureData[Taxonomy % ' 'Properties("SILVIA")] % Axes("ASV", "ASV")]') stripped_expression = 'Tuple[FeatureData[Taxonomy]]' nested_expression = "Tuple[FeatureData[Taxonomy % " 'Properties("SILVIA")] % Axes("ASV", "ASV")]' stripped_expression = "Tuple[FeatureData[Taxonomy]]" reconstructed_expression = _strip_properties(nested_expression) Loading @@ -148,10 +162,11 @@ def test_strip_properties_nested(): def test_strip_properties_complex(): complex_expression = \ ('Tuple[FeatureData[Taxonomy % Properties("SILVA")] % Axis("ASV")' ', DistanceMatrix % Axes("ASV", "ASV")] % Unique') stripped_expression = 'Tuple[FeatureData[Taxonomy], DistanceMatrix]' complex_expression = ( 'Tuple[FeatureData[Taxonomy % Properties("SILVA")] % Axis("ASV")' ', DistanceMatrix % Axes("ASV", "ASV")] % Unique' ) stripped_expression = "Tuple[FeatureData[Taxonomy], DistanceMatrix]" reconstructed_expression = _strip_properties(complex_expression) Loading @@ -159,26 +174,22 @@ def test_strip_properties_complex(): def test_strip_properties_keeps_different_binop(): expression_with_different_binop = \ ('FeatureData[Taxonomy % Properties("SILVIA"), ' 'Taxonomy & Properties]') stripped_expression = \ 'FeatureData[Taxonomy, Taxonomy & Properties]' expression_with_different_binop = 'FeatureData[Taxonomy % Properties("SILVIA"), ' "Taxonomy & Properties]" stripped_expression = "FeatureData[Taxonomy, Taxonomy & Properties]" reconstructed_expression = \ _strip_properties(expression_with_different_binop) reconstructed_expression = _strip_properties(expression_with_different_binop) assert reconstructed_expression == stripped_expression def test_strip_properties_multiple_strings(): simple_expression = 'Taxonomy % Properties("SILVIA")' stripped_simple_expression = 'Taxonomy' stripped_simple_expression = "Taxonomy" reconstructed_simple_expression = _strip_properties(simple_expression) single_expression = 'FeatureData[Taxonomy % Properties("SILVIA")]' stripped_single_expression = 'FeatureData[Taxonomy]' stripped_single_expression = "FeatureData[Taxonomy]" reconstructed_single_expression = _strip_properties(single_expression) Loading Loading
lib/galaxy/datatypes/qiime2.py +68 −79 Original line number Diff line number Diff line import io import ast import html import io import uuid as _uuid import zipfile Loading @@ -8,16 +8,17 @@ import yaml from galaxy.datatypes.binary import CompressedZipArchive from galaxy.datatypes.metadata import MetadataElement from galaxy.datatypes.tabular import Tabular from galaxy.datatypes.sniff import build_sniff_from_prefix from galaxy.datatypes.tabular import Tabular class _QIIME2ResultBase(CompressedZipArchive): """Base class for QIIME2Artifact and QIIME2Visualization""" MetadataElement(name="semantic_type", readonly=True) MetadataElement(name="semantic_type_simple", readonly=True, visible=False) MetadataElement(name="uuid", readonly=True) MetadataElement(name="format", optional=True, no_value='', readonly=True) MetadataElement(name="format", optional=True, no_value="", readonly=True) MetadataElement(name="version", readonly=True) def set_meta(self, dataset, overwrite=True, **kwd): Loading @@ -26,36 +27,32 @@ class _QIIME2ResultBase(CompressedZipArchive): if value: setattr(dataset.metadata, key, value) dataset.metadata.semantic_type_simple = \ _strip_properties(dataset.metadata.semantic_type) dataset.metadata.semantic_type_simple = _strip_properties(dataset.metadata.semantic_type) def set_peek(self, dataset, is_multi_byte=False): if dataset.metadata.semantic_type == 'Visualization': dataset.blurb = 'QIIME 2 Visualization' if dataset.metadata.semantic_type == "Visualization": dataset.blurb = "QIIME 2 Visualization" else: dataset.blurb = 'QIIME 2 Artifact' dataset.blurb = "QIIME 2 Artifact" dataset.peek = '\n'.join(map(': '.join, self._peek(dataset))) dataset.peek = "\n".join(map(": ".join, self._peek(dataset))) def display_peek(self, dataset): def make_row(item): return ('<tr><th>%s</th><td>%s</td></td>' % tuple(html.escape(x) for x in item)) return "<tr><th>%s</th><td>%s</td></td>" % tuple(html.escape(x) for x in item) table = ['<table cellspacing="0" cellpadding="2">'] table += list(map(make_row, self._peek(dataset, simple=True))) table += ['</table>'] table += ["</table>"] return ''.join(table) return "".join(table) def _peek(self, dataset, simple=False): peek = [ ('Type', dataset.metadata.semantic_type), ('UUID', dataset.metadata.uuid)] peek = [("Type", dataset.metadata.semantic_type), ("UUID", dataset.metadata.uuid)] if not simple: if dataset.metadata.semantic_type != 'Visualization': peek.append(('Format', dataset.metadata.format)) peek.append(('Version', dataset.metadata.version)) if dataset.metadata.semantic_type != "Visualization": peek.append(("Format", dataset.metadata.format)) peek.append(("Version", dataset.metadata.version)) return peek def _sniff(self, filename): Loading @@ -73,7 +70,7 @@ class QIIME2Artifact(_QIIME2ResultBase): def sniff(self, filename): metadata = self._sniff(filename) return metadata and metadata['semantic_type'] != 'Visualization' return metadata and metadata["semantic_type"] != "Visualization" class QIIME2Visualization(_QIIME2ResultBase): Loading @@ -81,7 +78,7 @@ class QIIME2Visualization(_QIIME2ResultBase): def sniff(self, filename): metadata = self._sniff(filename) return metadata and metadata['semantic_type'] == 'Visualization' return metadata and metadata["semantic_type"] == "Visualization" @build_sniff_from_prefix Loading @@ -97,16 +94,17 @@ class QIIME2Metadata(Tabular): on the second line (after the header). This is the most typical location and interacts best with the current implementation of Tabular. """ file_ext = "qiime2.tabular" is_subclass = False _TYPES_DIRECTIVE = '#q2:types' _TYPES_DIRECTIVE = "#q2:types" _search_lines = 2 def get_column_names(self, first_line=None): if first_line is None: return None return first_line.strip().split('\t') return first_line.strip().split("\t") def set_meta(self, dataset, **kwargs): """ Loading @@ -124,30 +122,26 @@ class QIIME2Metadata(Tabular): if line is None: return q2_types = line.strip().split('\t') q2_types = line.strip().split("\t") # The first column (q2:types) is always the IDs q2_types[0] = 'index' q2_types[0] = "index" if len(q2_types) < dataset.metadata.columns: # this is probably malformed, but easy to fix q2_types.extend([''] * (dataset.metadata.columns - len(q2_types))) for idx, (q2_type, col_type) in enumerate( zip(q2_types, dataset.metadata.column_types)): if q2_type == '': if col_type in ('float', 'int'): q2_types[idx] = 'numeric' q2_types.extend([""] * (dataset.metadata.columns - len(q2_types))) for idx, (q2_type, col_type) in enumerate(zip(q2_types, dataset.metadata.column_types)): if q2_type == "": if col_type in ("float", "int"): q2_types[idx] = "numeric" else: q2_types[idx] = 'categorical' q2_types[idx] = "categorical" else: if (q2_type == 'categorical' and col_type in ('float', 'int', 'list')): dataset.metadata.column_types[idx] = 'str' if q2_type == "categorical" and col_type in ("float", "int", "list"): dataset.metadata.column_types[idx] = "str" def sniff_prefix(self, file_prefix): for _, line in zip(range(self._search_lines), file_prefix.line_iterator()): for _, line in zip(range(self._search_lines), file_prefix.line_iterator()): if line.startswith(self._TYPES_DIRECTIVE): return True Loading Loading @@ -184,38 +178,38 @@ def _strip_properties(expression): # https://docs.python.org/3.9/library/ast.html#ast.unparse class _PredicateRemover(ast.NodeVisitor): binops = { ast.Add: ' + ', ast.Sub: ' - ', ast.Mult: ' * ', ast.Div: ' / ', ast.FloorDiv: ' // ', ast.Pow: ' ** ', ast.LShift: ' << ', ast.RShift: ' >> ', ast.BitOr: ' | ', ast.BitXor: ' ^ ', ast.BitAnd: ' & ', ast.MatMult: ' @ ' ast.Add: " + ", ast.Sub: " - ", ast.Mult: " * ", ast.Div: " / ", ast.FloorDiv: " // ", ast.Pow: " ** ", ast.LShift: " << ", ast.RShift: " >> ", ast.BitOr: " | ", ast.BitXor: " ^ ", ast.BitAnd: " & ", ast.MatMult: " @ ", } def __init__(self): self.expression = '' self.expression = "" def visit_Name(self, node): self.expression += node.id def visit_Subscript(self, node): self.visit(node.value) self.expression += '[' self.expression += "[" self.visit(node.slice) self.expression += ']' self.expression += "]" def visit_Tuple(self, node): trailing_comma = '' trailing_comma = "" for n in node.elts: self.expression += trailing_comma self.visit(n) trailing_comma = ', ' trailing_comma = ", " def visit_BinOp(self, node): self.visit(node.left) Loading @@ -230,63 +224,58 @@ def _get_metadata_from_archive(archive): metadata_contents = _get_metadata_contents(archive, uuid) return { 'uuid': uuid, 'version': framework_version, 'semantic_type': metadata_contents['type'], 'format': metadata_contents['format'] or '' "uuid": uuid, "version": framework_version, "semantic_type": metadata_contents["type"], "format": metadata_contents["format"] or "", } def _get_metadata_contents(path, uuid): with _open_file_in_archive(path, 'metadata.yaml', uuid) as fh: with _open_file_in_archive(path, "metadata.yaml", uuid) as fh: return yaml.safe_load(fh.read()) def _get_uuid(path): roots = set() for relpath in _iter_zip_root(path): if not relpath.startswith('.'): if not relpath.startswith("."): roots.add(relpath) if len(roots) == 0: raise ValueError("Archive does not have a visible root directory.") if len(roots) > 1: raise ValueError("Archive has multiple root directories: %r" % roots) raise ValueError("Archive has multiple root directories: %r" % roots) uuid = roots.pop() if not _is_uuid4(uuid): raise ValueError( "Archive root directory name %r is not a valid version 4 " "UUID." % uuid) raise ValueError("Archive root directory name %r is not a valid version 4 " "UUID." % uuid) return uuid def _get_versions(path, uuid): try: with _open_file_in_archive(path, 'VERSION', uuid) as fh: header, version_line, framework_version_line, eof = \ fh.read().split('\n') if header.strip() != 'QIIME 2': with _open_file_in_archive(path, "VERSION", uuid) as fh: header, version_line, framework_version_line, eof = fh.read().split("\n") if header.strip() != "QIIME 2": raise Exception() # GOTO except Exception version = version_line.split(':')[1].strip() framework_version = framework_version_line.split(':')[1].strip() version = version_line.split(":")[1].strip() framework_version = framework_version_line.split(":")[1].strip() return version, framework_version except Exception: raise ValueError("Archive does not contain a correctly formatted" " VERSION file.") raise ValueError("Archive does not contain a correctly formatted" " VERSION file.") def _open_file_in_archive(zip_path, path, uuid): relpath = '/'.join([uuid, path]) with zipfile.ZipFile(zip_path, mode='r') as zf: relpath = "/".join([uuid, path]) with zipfile.ZipFile(zip_path, mode="r") as zf: return io.TextIOWrapper(zf.open(relpath)) def _iter_zip_root(path): seen = set() with zipfile.ZipFile(path, mode='r') as zf: with zipfile.ZipFile(path, mode="r") as zf: for name in zf.namelist(): parts = name.split('/') # zip is always / for seperators parts = name.split("/") # zip is always / for seperators if len(parts) > 0: result = parts[0] if result not in seen: Loading
test/unit/data/datatypes/test_qiime2.py +59 −48 Original line number Diff line number Diff line from galaxy.datatypes.qiime2 import (_strip_properties, QIIME2Artifact, QIIME2Visualization, QIIME2Metadata) from .util import MockDataset, get_input_files from galaxy.datatypes.qiime2 import ( _strip_properties, QIIME2Artifact, QIIME2Metadata, QIIME2Visualization, ) from .util import ( get_input_files, MockDataset, ) # Tests for QIIME2Artifact: def test_qza_sniff(): qza = QIIME2Artifact() with get_input_files('qiime2.qza') as input_files: with get_input_files("qiime2.qza") as input_files: assert qza.sniff(input_files[0]) is True def test_qza_set_meta(): qza = QIIME2Artifact() with get_input_files('qiime2.qza') as input_files: with get_input_files("qiime2.qza") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] qza.set_meta(dataset) assert dataset.metadata.uuid == 'ba8c55e1-a2bc-47ea-beb2-b37b0b3b4032' assert dataset.metadata.version == '2022.2.1' assert dataset.metadata.format == 'SingleIntDirectoryFormat' assert dataset.metadata.semantic_type == 'SingleInt1' assert dataset.metadata.semantic_type_simple == 'SingleInt1' assert dataset.metadata.uuid == "ba8c55e1-a2bc-47ea-beb2-b37b0b3b4032" assert dataset.metadata.version == "2022.2.1" assert dataset.metadata.format == "SingleIntDirectoryFormat" assert dataset.metadata.semantic_type == "SingleInt1" assert dataset.metadata.semantic_type_simple == "SingleInt1" def test_qza_set_peek(): qza = QIIME2Artifact() with get_input_files('qiime2.qza') as input_files: with get_input_files("qiime2.qza") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] qza.set_meta(dataset) qza.set_peek(dataset) assert dataset.peek == '''Type: SingleInt1 assert ( dataset.peek == """Type: SingleInt1 UUID: ba8c55e1-a2bc-47ea-beb2-b37b0b3b4032 Format: SingleIntDirectoryFormat Version: 2022.2.1''' Version: 2022.2.1""" ) # Tests for QIIME2Visualization: def test_qzv_sniff(): qzv = QIIME2Visualization() with get_input_files('qiime2.qzv') as input_files: with get_input_files("qiime2.qzv") as input_files: assert qzv.sniff(input_files[0]) is True def test_qzv_set_meta(): qzv = QIIME2Visualization() with get_input_files('qiime2.qzv') as input_files: with get_input_files("qiime2.qzv") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] qzv.set_meta(dataset) assert dataset.metadata.uuid == '368ba1e7-3a7c-4dbc-98da-79f41aeece63' assert dataset.metadata.version == '2022.2.1' assert dataset.metadata.semantic_type == 'Visualization' assert dataset.metadata.semantic_type_simple == 'Visualization' assert dataset.metadata.uuid == "368ba1e7-3a7c-4dbc-98da-79f41aeece63" assert dataset.metadata.version == "2022.2.1" assert dataset.metadata.semantic_type == "Visualization" assert dataset.metadata.semantic_type_simple == "Visualization" def test_qzv_set_peek(): qzv = QIIME2Visualization() with get_input_files('qiime2.qzv') as input_files: with get_input_files("qiime2.qzv") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] qzv.set_meta(dataset) qzv.set_peek(dataset) assert dataset.peek == '''Type: Visualization assert ( dataset.peek == """Type: Visualization UUID: 368ba1e7-3a7c-4dbc-98da-79f41aeece63 Version: 2022.2.1''' Version: 2022.2.1""" ) # Tets for QIIME2Metadata: def test_qiime2tabular_sniff(): q2md = QIIME2Metadata() with get_input_files('qiime2.tsv') as input_files: with get_input_files("qiime2.tsv") as input_files: assert q2md.sniff(input_files[0]) is True def test_qiime2tabular_sniff_false(): q2md = QIIME2Metadata() with get_input_files('test_tab1.tabular') as input_files: with get_input_files("test_tab1.tabular") as input_files: assert q2md.sniff(input_files[0]) is False def test_qiime2tabular_set_meta(): q2md = QIIME2Metadata() with get_input_files('qiime2.tsv') as input_files: with get_input_files("qiime2.tsv") as input_files: dataset = MockDataset(1) dataset.file_name = input_files[0] q2md.set_meta(dataset) # Show override of type inferrence on the second to last column: assert dataset.metadata.column_types == ['str', 'str', 'str', 'int'] assert dataset.metadata.column_types == ["str", "str", "str", "int"] # Tests for _strip_properties, which is rather complicated so worth testing Loading @@ -109,9 +124,10 @@ def test_qiime2tabular_set_meta(): # Note: Not all the expressions here are completely valid types they are just # representative examples def test_strip_properties_simple(): simple_expression = 'Taxonomy % Properties("SILVIA")' stripped_expression = 'Taxonomy' stripped_expression = "Taxonomy" reconstructed_expression = _strip_properties(simple_expression) Loading @@ -120,7 +136,7 @@ def test_strip_properties_simple(): def test_strip_properties_single(): single_expression = 'FeatureData[Taxonomy % Properties("SILVIA")]' stripped_expression = 'FeatureData[Taxonomy]' stripped_expression = "FeatureData[Taxonomy]" reconstructed_expression = _strip_properties(single_expression) Loading @@ -128,9 +144,8 @@ def test_strip_properties_single(): def test_strip_properties_double(): double_expression = ('FeatureData[Taxonomy % Properties("SILVIA"), ' 'DistanceMatrix % Axes("ASV", "ASV")]') stripped_expression = 'FeatureData[Taxonomy, DistanceMatrix]' double_expression = 'FeatureData[Taxonomy % Properties("SILVIA"), ' 'DistanceMatrix % Axes("ASV", "ASV")]' stripped_expression = "FeatureData[Taxonomy, DistanceMatrix]" reconstructed_expression = _strip_properties(double_expression) Loading @@ -138,9 +153,8 @@ def test_strip_properties_double(): def test_strip_properties_nested(): nested_expression = ('Tuple[FeatureData[Taxonomy % ' 'Properties("SILVIA")] % Axes("ASV", "ASV")]') stripped_expression = 'Tuple[FeatureData[Taxonomy]]' nested_expression = "Tuple[FeatureData[Taxonomy % " 'Properties("SILVIA")] % Axes("ASV", "ASV")]' stripped_expression = "Tuple[FeatureData[Taxonomy]]" reconstructed_expression = _strip_properties(nested_expression) Loading @@ -148,10 +162,11 @@ def test_strip_properties_nested(): def test_strip_properties_complex(): complex_expression = \ ('Tuple[FeatureData[Taxonomy % Properties("SILVA")] % Axis("ASV")' ', DistanceMatrix % Axes("ASV", "ASV")] % Unique') stripped_expression = 'Tuple[FeatureData[Taxonomy], DistanceMatrix]' complex_expression = ( 'Tuple[FeatureData[Taxonomy % Properties("SILVA")] % Axis("ASV")' ', DistanceMatrix % Axes("ASV", "ASV")] % Unique' ) stripped_expression = "Tuple[FeatureData[Taxonomy], DistanceMatrix]" reconstructed_expression = _strip_properties(complex_expression) Loading @@ -159,26 +174,22 @@ def test_strip_properties_complex(): def test_strip_properties_keeps_different_binop(): expression_with_different_binop = \ ('FeatureData[Taxonomy % Properties("SILVIA"), ' 'Taxonomy & Properties]') stripped_expression = \ 'FeatureData[Taxonomy, Taxonomy & Properties]' expression_with_different_binop = 'FeatureData[Taxonomy % Properties("SILVIA"), ' "Taxonomy & Properties]" stripped_expression = "FeatureData[Taxonomy, Taxonomy & Properties]" reconstructed_expression = \ _strip_properties(expression_with_different_binop) reconstructed_expression = _strip_properties(expression_with_different_binop) assert reconstructed_expression == stripped_expression def test_strip_properties_multiple_strings(): simple_expression = 'Taxonomy % Properties("SILVIA")' stripped_simple_expression = 'Taxonomy' stripped_simple_expression = "Taxonomy" reconstructed_simple_expression = _strip_properties(simple_expression) single_expression = 'FeatureData[Taxonomy % Properties("SILVIA")]' stripped_single_expression = 'FeatureData[Taxonomy]' stripped_single_expression = "FeatureData[Taxonomy]" reconstructed_single_expression = _strip_properties(single_expression) Loading