Loading src/imars3d/backend/util/functions.py +20 −0 Original line number Diff line number Diff line Loading @@ -29,6 +29,26 @@ def clamp_max_workers(max_workers: Union[int, None]) -> int: return result def calculate_chunksize(num_elements: int, max_workers: Union[int, None] = None, scale_factor: int = 4) -> int: """Calculate an optimal chunk size for multiprocessing. Parameters: - num_elements: The total number of elements to process. - max_workers: The number of workers (processes) to use. Defaults to clamped max. - scale_factor: A factor to fine-tune chunk size (default 4). Returns: - int: Suggested chunk size. """ # Calculate the number of workers workers = clamp_max_workers(max_workers) # Calculate chunk size based on number of elements and workers chunksize = max(1, num_elements // (workers * scale_factor)) return chunksize def to_time_str(value: datetime = datetime.now()) -> str: """ Convert the supplied datetime to a formatted string. Loading tests/unit/backend/util/test_util.py +47 −1 Original line number Diff line number Diff line # package imports from imars3d.backend.util.functions import clamp_max_workers, to_time_str from imars3d.backend.util.functions import clamp_max_workers, to_time_str, calculate_chunksize # third party imports import pytest from unittest.mock import patch # standard imports from datetime import datetime Loading @@ -13,6 +14,51 @@ def test_clamp_max_workers(): assert clamp_max_workers(-10) >= 1 @patch("multiprocessing.cpu_count", return_value=8) def test_chunksize_with_small_number_of_elements(mock_cpu_count): num_elements = 10 max_workers = None chunksize = calculate_chunksize(num_elements, max_workers) assert chunksize == 1 @patch("multiprocessing.cpu_count", return_value=8) def test_chunksize_with_large_number_of_elements(mock_cpu_count): num_elements = 10000 max_workers = None chunksize = calculate_chunksize(num_elements, max_workers) expected_chunksize = max(1, num_elements // (6 * 4)) # 6 workers, scale factor 4 assert chunksize == expected_chunksize @patch("multiprocessing.cpu_count", return_value=4) def test_chunksize_with_different_cpu_count(mock_cpu_count): num_elements = 10000 max_workers = None chunksize = calculate_chunksize(num_elements, max_workers) expected_chunksize = max(1, num_elements // (2 * 4)) # 2 workers (cpu_count - 2), scale factor 4 assert chunksize == expected_chunksize @patch("multiprocessing.cpu_count", return_value=8) def test_chunksize_with_max_workers(mock_cpu_count): num_elements = 10000 max_workers = 4 chunksize = calculate_chunksize(num_elements, max_workers) expected_chunksize = max(1, num_elements // (4 * 4)) # 4 workers manually set assert chunksize == expected_chunksize @patch("multiprocessing.cpu_count", return_value=8) def test_chunksize_with_custom_scale_factor(mock_cpu_count): num_elements = 10000 max_workers = None scale_factor = 2 chunksize = calculate_chunksize(num_elements, max_workers, scale_factor=scale_factor) expected_chunksize = max(1, num_elements // (6 * 2)) # 6 workers, scale factor 2 assert chunksize == expected_chunksize @pytest.mark.parametrize( "timestamp", [ Loading Loading
src/imars3d/backend/util/functions.py +20 −0 Original line number Diff line number Diff line Loading @@ -29,6 +29,26 @@ def clamp_max_workers(max_workers: Union[int, None]) -> int: return result def calculate_chunksize(num_elements: int, max_workers: Union[int, None] = None, scale_factor: int = 4) -> int: """Calculate an optimal chunk size for multiprocessing. Parameters: - num_elements: The total number of elements to process. - max_workers: The number of workers (processes) to use. Defaults to clamped max. - scale_factor: A factor to fine-tune chunk size (default 4). Returns: - int: Suggested chunk size. """ # Calculate the number of workers workers = clamp_max_workers(max_workers) # Calculate chunk size based on number of elements and workers chunksize = max(1, num_elements // (workers * scale_factor)) return chunksize def to_time_str(value: datetime = datetime.now()) -> str: """ Convert the supplied datetime to a formatted string. Loading
tests/unit/backend/util/test_util.py +47 −1 Original line number Diff line number Diff line # package imports from imars3d.backend.util.functions import clamp_max_workers, to_time_str from imars3d.backend.util.functions import clamp_max_workers, to_time_str, calculate_chunksize # third party imports import pytest from unittest.mock import patch # standard imports from datetime import datetime Loading @@ -13,6 +14,51 @@ def test_clamp_max_workers(): assert clamp_max_workers(-10) >= 1 @patch("multiprocessing.cpu_count", return_value=8) def test_chunksize_with_small_number_of_elements(mock_cpu_count): num_elements = 10 max_workers = None chunksize = calculate_chunksize(num_elements, max_workers) assert chunksize == 1 @patch("multiprocessing.cpu_count", return_value=8) def test_chunksize_with_large_number_of_elements(mock_cpu_count): num_elements = 10000 max_workers = None chunksize = calculate_chunksize(num_elements, max_workers) expected_chunksize = max(1, num_elements // (6 * 4)) # 6 workers, scale factor 4 assert chunksize == expected_chunksize @patch("multiprocessing.cpu_count", return_value=4) def test_chunksize_with_different_cpu_count(mock_cpu_count): num_elements = 10000 max_workers = None chunksize = calculate_chunksize(num_elements, max_workers) expected_chunksize = max(1, num_elements // (2 * 4)) # 2 workers (cpu_count - 2), scale factor 4 assert chunksize == expected_chunksize @patch("multiprocessing.cpu_count", return_value=8) def test_chunksize_with_max_workers(mock_cpu_count): num_elements = 10000 max_workers = 4 chunksize = calculate_chunksize(num_elements, max_workers) expected_chunksize = max(1, num_elements // (4 * 4)) # 4 workers manually set assert chunksize == expected_chunksize @patch("multiprocessing.cpu_count", return_value=8) def test_chunksize_with_custom_scale_factor(mock_cpu_count): num_elements = 10000 max_workers = None scale_factor = 2 chunksize = calculate_chunksize(num_elements, max_workers, scale_factor=scale_factor) expected_chunksize = max(1, num_elements // (6 * 2)) # 6 workers, scale factor 2 assert chunksize == expected_chunksize @pytest.mark.parametrize( "timestamp", [ Loading