Loading src/common/gen.py 0 → 100644 +105 −0 Original line number Diff line number Diff line import csv import random import string import os import argparse def generate_gibberish(length=5): """Generates a random string of alphabetic gibberish.""" return ''.join(random.choices(string.ascii_letters, k=length)) def generate_data_by_type(column_type): """Generates data based on the specified column type.""" if column_type == 'int': return str(random.randint(0, 1000)) elif column_type == 'float': return f"{random.uniform(0, 1000):.2f}" elif column_type == 'string': return generate_gibberish(random.randint(3, 10)) else: return generate_gibberish(random.randint(3, 10)) def convert_size_to_bytes(size_str): """Convert a human-readable file size (e.g., 512MB) into bytes.""" size_str = size_str.upper() size_units = {'KB': 1024, 'MB': 1024 ** 2, 'GB': 1024 ** 3, 'TB': 1024 ** 4} # Split the number and the unit (assume space or no space between) size_value, size_unit = ''.join(filter(str.isdigit, size_str)), ''.join(filter(str.isalpha, size_str)) if size_unit not in size_units: raise ValueError(f"Invalid size unit: {size_unit}. Use KB, MB, GB, or TB.") return int(size_value) * size_units[size_unit] def generate_csv(file_path, n_columns=5, target_file_size=None, target_row_count=None, column_types={}): """Generates a CSV file with either a target size or target row count.""" if target_file_size and target_row_count: raise ValueError("Please specify either target file size or target row count, not both.") # Generate gibberish column names columns = [generate_gibberish() for _ in range(n_columns)] # Write CSV with open(file_path, mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(columns) # Write header row_count = 0 current_file_size = 0 while True: row = [] for i in range(n_columns): column_type = column_types.get(columns[i], 'string') row.append(generate_data_by_type(column_type)) writer.writerow(row) row_count += 1 if target_row_count and row_count >= target_row_count: break current_file_size = os.path.getsize(file_path) if target_file_size and current_file_size >= target_file_size: break print(f"CSV file generated at {file_path} with {row_count} rows.") def main(): parser = argparse.ArgumentParser(description="Generate a large CSV file with random data.") parser.add_argument('-c', '--col', type=int, required=True, help="Number of columns in the CSV.") group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-r', '--row', type=int, help="Number of rows in the CSV.") group.add_argument('-s', '--size', type=str, help="Target file size (e.g., 512MB or 1GB).") args = parser.parse_args() # Parse size if provided target_file_size = None if args.size: try: target_file_size = convert_size_to_bytes(args.size) except ValueError as e: print(e) return # Generate the CSV generate_csv( file_path='output.csv', n_columns=args.col, target_row_count=args.row, target_file_size=target_file_size, column_types={} # You can define the column types here if needed ) if __name__ == "__main__": main() No newline at end of file tests/test_common/test_env.py 0 → 100644 +60 −0 Original line number Diff line number Diff line import os import pytest from common.env import boolify, check_environment def test_boolify_true(): true_val = [1, "1", "TRUE", "True", "true", 't', 'T', True] for val in true_val: assert boolify(val) is True, f"Expected true {val} but did not get it." def test_boolify_false(): false_val = [0, "0", "FALSE", "False", "false",'f','F', False] for val in false_val: assert boolify(val) is False, f"Expected False {val} but did not get it." def test_boolify_raises_typeerror(): non_bool_values = ["maybe", 2, [], (), 0.5] for val in non_bool_values: with pytest.raises(TypeError, match="unable to evaluate expected boolean"): boolify(val) def test_check_environment_existing_variables(): os.environ["TEST_VAR"] = "test_value" assert ( check_environment("TEST_VAR", default="default_value") == "test_value" ), "Something really bad happened" def test_check_environment_non_existing_with_default(): if "NON_EXISTING_VAR" in os.environ: del os.environ["NON_EXISTING_VAR"] assert ( check_environment("NON_EXISTING_VAR", default="default_value") == "default_value" ), "Default value is broken" def test_check_environment_type_conversion_to_bool(): os.environ["BOOL_VAR"] = "True" assert ( check_environment("BOOL_VAR", default=False) is True ), "Failed to boolify and check_environment" # TODO - checks for conversion to int, etc def check_environment_type_conversion_to_int(): os.environ["INT_VAR"] = "0" assert ( check_environment("INT_VAR", default=1) == 0 ), "Failed to cast to int and check_environment" def teardown_function(function): for var in ["TEST_VAR", "BOOL_VAR"]: if var in os.environ: del os.environ[var] No newline at end of file tests/test_common/test_file_operations.py 0 → 100644 +116 −0 Original line number Diff line number Diff line """Tests for File Operations""" from pathlib import Path import json import pytest import os from common.file_operations import ( path_exists, create_directory, delete_directory, tree, delete_file, read_file, append_file, write_file, ) @pytest.fixture def setup_file(tmp_path): """Create a temporary file for testing.""" d = tmp_path / "sub" d.mkdir() p = d / "test.txt" p.write_text("content") return p @pytest.fixture def setup_directory(tmp_path): """Create a temporary directory for testing.""" d = tmp_path / "subdir" d.mkdir() return d @pytest.fixture def setup_populated_directory(tmp_path): """Create a temporary directory for testing.""" d = tmp_path / "subdir" d.mkdir() p = d / "subsubdir" p.mkdir() f = d / "myfile.txt" f.write_text("foo") return d def test_path_exists(setup_file): """Test that the path exists function works.""" assert path_exists(setup_file) is True assert path_exists("nonexistent_file") is False def test_create_directory(tmp_path, setup_directory): """Test that you can create a directory.""" assert ( create_directory(str(setup_directory)) == f"{str(setup_directory)} already exists." ) new_dir = tmp_path / "newdir" assert create_directory(str(new_dir)) is True def test_delete_directory(setup_directory): """Test that deleting a directory works as expected.""" assert ( delete_directory(str(setup_directory)) == f"{str(setup_directory)} deleted successfully." ) assert delete_directory("nodir") == f"nodir does not exist." def test_tree(setup_populated_directory, setup_file, capsys): """Test that the tree command works as expected.""" print(setup_populated_directory) tree(str(setup_populated_directory)) captured = capsys.readouterr() print(captured) assert "Directory" in captured.out assert "File" in captured.out def test_read_file(setup_file): """Test reading an existing file.""" assert read_file(str(setup_file)) == "content" assert read_file("nonexistent") is None def test_append_file(setup_file): """Test that appending to a file works.""" content_to_append = " is my favorite content" append_file(str(setup_file), content_to_append) assert setup_file.read_text() == "content is my favorite content" def test_write_file(tmp_path): """Test that writing a file works.""" file_path = tmp_path / "write_test.txt" write_file(str(file_path), "new content") assert file_path.read_text() == "new content" json_path = tmp_path / "path.json" dictionary = {"key": "value"} dict_str = str(dictionary).replace("'", '"') json_path.write_text(dict_str) assert json_path.read_text() == dict_str # TODO @Bhaskar, fix this so that the write_file method works here # dict_path = tmp_path / "dict.json" # write_file(str(dict_path), dictionary) # assert dict_path.read_text() == dict_str # write_file('new_dict', dictionary | dictionary2) def test_delete_file(setup_file): """Test deletion of an existing file.""" assert delete_file(setup_file) == f"File: {str(setup_file)} deleted successfully." assert delete_file("nonexistent") == f"Error in deleting the file: nonexistent." Loading
src/common/gen.py 0 → 100644 +105 −0 Original line number Diff line number Diff line import csv import random import string import os import argparse def generate_gibberish(length=5): """Generates a random string of alphabetic gibberish.""" return ''.join(random.choices(string.ascii_letters, k=length)) def generate_data_by_type(column_type): """Generates data based on the specified column type.""" if column_type == 'int': return str(random.randint(0, 1000)) elif column_type == 'float': return f"{random.uniform(0, 1000):.2f}" elif column_type == 'string': return generate_gibberish(random.randint(3, 10)) else: return generate_gibberish(random.randint(3, 10)) def convert_size_to_bytes(size_str): """Convert a human-readable file size (e.g., 512MB) into bytes.""" size_str = size_str.upper() size_units = {'KB': 1024, 'MB': 1024 ** 2, 'GB': 1024 ** 3, 'TB': 1024 ** 4} # Split the number and the unit (assume space or no space between) size_value, size_unit = ''.join(filter(str.isdigit, size_str)), ''.join(filter(str.isalpha, size_str)) if size_unit not in size_units: raise ValueError(f"Invalid size unit: {size_unit}. Use KB, MB, GB, or TB.") return int(size_value) * size_units[size_unit] def generate_csv(file_path, n_columns=5, target_file_size=None, target_row_count=None, column_types={}): """Generates a CSV file with either a target size or target row count.""" if target_file_size and target_row_count: raise ValueError("Please specify either target file size or target row count, not both.") # Generate gibberish column names columns = [generate_gibberish() for _ in range(n_columns)] # Write CSV with open(file_path, mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(columns) # Write header row_count = 0 current_file_size = 0 while True: row = [] for i in range(n_columns): column_type = column_types.get(columns[i], 'string') row.append(generate_data_by_type(column_type)) writer.writerow(row) row_count += 1 if target_row_count and row_count >= target_row_count: break current_file_size = os.path.getsize(file_path) if target_file_size and current_file_size >= target_file_size: break print(f"CSV file generated at {file_path} with {row_count} rows.") def main(): parser = argparse.ArgumentParser(description="Generate a large CSV file with random data.") parser.add_argument('-c', '--col', type=int, required=True, help="Number of columns in the CSV.") group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-r', '--row', type=int, help="Number of rows in the CSV.") group.add_argument('-s', '--size', type=str, help="Target file size (e.g., 512MB or 1GB).") args = parser.parse_args() # Parse size if provided target_file_size = None if args.size: try: target_file_size = convert_size_to_bytes(args.size) except ValueError as e: print(e) return # Generate the CSV generate_csv( file_path='output.csv', n_columns=args.col, target_row_count=args.row, target_file_size=target_file_size, column_types={} # You can define the column types here if needed ) if __name__ == "__main__": main() No newline at end of file
tests/test_common/test_env.py 0 → 100644 +60 −0 Original line number Diff line number Diff line import os import pytest from common.env import boolify, check_environment def test_boolify_true(): true_val = [1, "1", "TRUE", "True", "true", 't', 'T', True] for val in true_val: assert boolify(val) is True, f"Expected true {val} but did not get it." def test_boolify_false(): false_val = [0, "0", "FALSE", "False", "false",'f','F', False] for val in false_val: assert boolify(val) is False, f"Expected False {val} but did not get it." def test_boolify_raises_typeerror(): non_bool_values = ["maybe", 2, [], (), 0.5] for val in non_bool_values: with pytest.raises(TypeError, match="unable to evaluate expected boolean"): boolify(val) def test_check_environment_existing_variables(): os.environ["TEST_VAR"] = "test_value" assert ( check_environment("TEST_VAR", default="default_value") == "test_value" ), "Something really bad happened" def test_check_environment_non_existing_with_default(): if "NON_EXISTING_VAR" in os.environ: del os.environ["NON_EXISTING_VAR"] assert ( check_environment("NON_EXISTING_VAR", default="default_value") == "default_value" ), "Default value is broken" def test_check_environment_type_conversion_to_bool(): os.environ["BOOL_VAR"] = "True" assert ( check_environment("BOOL_VAR", default=False) is True ), "Failed to boolify and check_environment" # TODO - checks for conversion to int, etc def check_environment_type_conversion_to_int(): os.environ["INT_VAR"] = "0" assert ( check_environment("INT_VAR", default=1) == 0 ), "Failed to cast to int and check_environment" def teardown_function(function): for var in ["TEST_VAR", "BOOL_VAR"]: if var in os.environ: del os.environ[var] No newline at end of file
tests/test_common/test_file_operations.py 0 → 100644 +116 −0 Original line number Diff line number Diff line """Tests for File Operations""" from pathlib import Path import json import pytest import os from common.file_operations import ( path_exists, create_directory, delete_directory, tree, delete_file, read_file, append_file, write_file, ) @pytest.fixture def setup_file(tmp_path): """Create a temporary file for testing.""" d = tmp_path / "sub" d.mkdir() p = d / "test.txt" p.write_text("content") return p @pytest.fixture def setup_directory(tmp_path): """Create a temporary directory for testing.""" d = tmp_path / "subdir" d.mkdir() return d @pytest.fixture def setup_populated_directory(tmp_path): """Create a temporary directory for testing.""" d = tmp_path / "subdir" d.mkdir() p = d / "subsubdir" p.mkdir() f = d / "myfile.txt" f.write_text("foo") return d def test_path_exists(setup_file): """Test that the path exists function works.""" assert path_exists(setup_file) is True assert path_exists("nonexistent_file") is False def test_create_directory(tmp_path, setup_directory): """Test that you can create a directory.""" assert ( create_directory(str(setup_directory)) == f"{str(setup_directory)} already exists." ) new_dir = tmp_path / "newdir" assert create_directory(str(new_dir)) is True def test_delete_directory(setup_directory): """Test that deleting a directory works as expected.""" assert ( delete_directory(str(setup_directory)) == f"{str(setup_directory)} deleted successfully." ) assert delete_directory("nodir") == f"nodir does not exist." def test_tree(setup_populated_directory, setup_file, capsys): """Test that the tree command works as expected.""" print(setup_populated_directory) tree(str(setup_populated_directory)) captured = capsys.readouterr() print(captured) assert "Directory" in captured.out assert "File" in captured.out def test_read_file(setup_file): """Test reading an existing file.""" assert read_file(str(setup_file)) == "content" assert read_file("nonexistent") is None def test_append_file(setup_file): """Test that appending to a file works.""" content_to_append = " is my favorite content" append_file(str(setup_file), content_to_append) assert setup_file.read_text() == "content is my favorite content" def test_write_file(tmp_path): """Test that writing a file works.""" file_path = tmp_path / "write_test.txt" write_file(str(file_path), "new content") assert file_path.read_text() == "new content" json_path = tmp_path / "path.json" dictionary = {"key": "value"} dict_str = str(dictionary).replace("'", '"') json_path.write_text(dict_str) assert json_path.read_text() == dict_str # TODO @Bhaskar, fix this so that the write_file method works here # dict_path = tmp_path / "dict.json" # write_file(str(dict_path), dictionary) # assert dict_path.read_text() == dict_str # write_file('new_dict', dictionary | dictionary2) def test_delete_file(setup_file): """Test deletion of an existing file.""" assert delete_file(setup_file) == f"File: {str(setup_file)} deleted successfully." assert delete_file("nonexistent") == f"Error in deleting the file: nonexistent."