Commit ccf6bca0 authored by mvdbeek's avatar mvdbeek Committed by John Chilton
Browse files

Allow ``decompress="true"`` in test comparison methods

This means you can provide a short fragment of expected content
(compressed or uncompressed) in your test data and compare it with the
produced output. This was only enabled for the default `files_diff`
method previously.
parent 70570eea
Loading
Loading
Loading
Loading
+23 −21
Original line number Diff line number Diff line
@@ -224,8 +224,17 @@ def files_delta(file1, file2, attributes=None):
        raise AssertionError('Files %s=%db but %s=%db - compare by size (delta_frac=%s) failed' % (file1, s1, file2, s2, delta_frac))


def get_compressed_formats(attributes):
    attributes = attributes or {}
    decompress = attributes.get("decompress")
    # None means all compressed formats are allowed
    return None if decompress else []


def files_diff(file1, file2, attributes=None):
    """Check the contents of 2 files for differences."""
    attributes = attributes or {}

    def get_lines_diff(diff):
        count = 0
        for line in diff:
@@ -234,14 +243,7 @@ def files_diff(file1, file2, attributes=None):
        return count

    if not filecmp.cmp(file1, file2, shallow=False):
        if attributes is None:
            attributes = {}
        decompress = attributes.get("decompress", None)
        if decompress:
            # None means all compressed formats are allowed
            compressed_formats = None
        else:
            compressed_formats = []
        compressed_formats = get_compressed_formats(attributes)
        is_pdf = False
        try:
            with get_fileobj(file2, compressed_formats=compressed_formats) as fh:
@@ -309,12 +311,14 @@ def files_diff(file1, file2, attributes=None):

def files_re_match(file1, file2, attributes=None):
    """Check the contents of 2 files for differences using re.match."""
    attributes = attributes or {}
    join_char = ''
    to_strip = os.linesep
    compressed_formats = get_compressed_formats(attributes)
    try:
        with open(file2, encoding='utf-8') as fh:
        with get_fileobj(file2, compressed_formats=compressed_formats) as fh:
            history_data = fh.readlines()
        with open(file1, encoding='utf-8') as fh:
        with get_fileobj(file1, compressed_formats=compressed_formats) as fh:
            local_file = fh.readlines()
    except UnicodeDecodeError:
        join_char = b''
@@ -324,8 +328,6 @@ def files_re_match(file1, file2, attributes=None):
        with open(file1, 'rb') as fh:
            local_file = fh.readlines()
    assert len(local_file) == len(history_data), 'Data File and Regular Expression File contain a different number of lines (%d != %d)\nHistory Data (first 40 lines):\n%s' % (len(local_file), len(history_data), join_char.join(history_data[:40]))
    if attributes is None:
        attributes = {}
    if attributes.get('sort', False):
        history_data.sort()
    lines_diff = int(attributes.get('lines_diff', 0))
@@ -343,11 +345,13 @@ def files_re_match(file1, file2, attributes=None):

def files_re_match_multiline(file1, file2, attributes=None):
    """Check the contents of 2 files for differences using re.match in multiline mode."""
    attributes = attributes or {}
    join_char = ''
    compressed_formats = get_compressed_formats(attributes)
    try:
        with open(file2, encoding='utf-8') as fh:
        with get_fileobj(file2, compressed_formats=compressed_formats) as fh:
            history_data = fh.readlines()
        with open(file1, encoding='utf-8') as fh:
        with get_fileobj(file1, compressed_formats=compressed_formats) as fh:
            local_file = fh.read()
    except UnicodeDecodeError:
        join_char = b''
@@ -355,8 +359,6 @@ def files_re_match_multiline(file1, file2, attributes=None):
            history_data = fh.readlines()
        with open(file1, 'rb') as fh:
            local_file = fh.read()
    if attributes is None:
        attributes = {}
    if attributes.get('sort', False):
        history_data.sort()
    history_data = join_char.join(history_data)
@@ -367,11 +369,13 @@ def files_re_match_multiline(file1, file2, attributes=None):
def files_contains(file1, file2, attributes=None):
    """Check the contents of file2 for substrings found in file1, on a per-line basis."""
    # TODO: allow forcing ordering of contains
    attributes = attributes or {}
    to_strip = os.linesep
    compressed_formats = get_compressed_formats(attributes)
    try:
        with open(file2, encoding='utf-8') as fh:
        with get_fileobj(file2, compressed_formats=compressed_formats) as fh:
            history_data = fh.read()
        with open(file1, encoding='utf-8') as fh:
        with get_fileobj(file1, compressed_formats=compressed_formats) as fh:
            local_file = fh.readlines()
    except UnicodeDecodeError:
        to_strip = os.linesep.encode('utf-8')
@@ -379,8 +383,6 @@ def files_contains(file1, file2, attributes=None):
            history_data = fh.read()
        with open(file1, 'rb') as fh:
            local_file = fh.readlines()
    if attributes is None:
        attributes = {}
    lines_diff = int(attributes.get('lines_diff', 0))
    line_diff_count = 0
    for contains in local_file:
@@ -388,4 +390,4 @@ def files_contains(file1, file2, attributes=None):
        if contains not in history_data:
            line_diff_count += 1
        if line_diff_count > lines_diff:
            raise AssertionError("Failed to find '%s' in history data. (lines_diff=%i)" % (contains, lines_diff))
            raise AssertionError(f"Failed to find '{contains}' in history data. (lines_diff={lines_diff}).")
+7 −3
Original line number Diff line number Diff line
import collections
import gzip
import tempfile

import pytest
@@ -22,15 +23,17 @@ TestFile = collections.namedtuple('TestFile', 'value path')

def test_file_list():
    files = []
    for b, ext in [(F1, '.txt'), (F2, '.txt'), (F3, '.pdf'), (F4, '.txt'), (MULTILINE_MATCH, '.txt')]:
    for b, ext in [(F1, '.txt'), (F2, '.txt'), (F3, '.pdf'), (F4, '.txt'), (MULTILINE_MATCH, '.txt'), (F1, '.txt.gz')]:
        with tempfile.NamedTemporaryFile(mode='wb', suffix=ext, delete=False) as out:
            if ext == '.txt.gz':
                b = gzip.compress(b)
            out.write(b)
        files.append(TestFile(b, out.name))
    return files


def generate_tests(multiline=False):
    f1, f2, f3, f4, multiline_match = test_file_list()
    f1, f2, f3, f4, multiline_match, f5 = test_file_list()
    if multiline:
        tests = [(multiline_match, f1, {'lines_diff': 0, 'sort': True}, None)]
    else:
@@ -39,12 +42,13 @@ def generate_tests(multiline=False):
        (f1, f2, {'lines_diff': 0, 'sort': True}, AssertionError),
        (f1, f3, None, AssertionError),
        (f1, f4, None, None),
        (f1, f5, {'decompress': True}, None),
    ])
    return tests


def generate_tests_sim_size():
    f1, f2, f3, f4, multiline_match = test_file_list()
    f1, f2, f3, f4, multiline_match, f5 = test_file_list()
    # tests for equal files
    tests = [(f1, f1, None, None),  # pass default values
             (f1, f1, {'delta': 0}, None),  # pass for values that should always pass