From c6c29451137cc98740789346c9001ba8b8cbe30a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Mon, 9 Feb 2026 19:48:43 +0100 Subject: [PATCH 1/6] Fix: Implement one _refresh_memory_dict() function --- src/executorlib/task_scheduler/file/shared.py | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index c65409ca..f4caddde 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -90,15 +90,10 @@ def execute_tasks_h5( if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]: if task_dict["wait"] and wait: while len(memory_dict) > 0: - memory_dict = { - key: _check_task_output( - task_key=key, - future_obj=value, - cache_directory=cache_dir_dict[key], - ) - for key, value in memory_dict.items() - if not value.done() - } + memory_dict = _refresh_memory_dict( + memory_dict=memory_dict, + cache_dir_dict=cache_dir_dict, + ) if not task_dict["cancel_futures"] and wait: if ( terminate_function is not None @@ -114,15 +109,10 @@ def execute_tasks_h5( backend=backend, ) else: - memory_dict = { - key: _check_task_output( - task_key=key, - future_obj=value, - cache_directory=cache_dir_dict[key], - ) - for key, value in memory_dict.items() - if not value.done() - } + memory_dict = _refresh_memory_dict( + memory_dict=memory_dict, + cache_dir_dict=cache_dir_dict, + ) for value in memory_dict.values(): if not value.done(): value.cancel() @@ -193,15 +183,10 @@ def execute_tasks_h5( cache_dir_dict[task_key] = cache_directory future_queue.task_done() else: - memory_dict = { - key: _check_task_output( - task_key=key, - future_obj=value, - cache_directory=cache_dir_dict[key], - ) - for key, value in memory_dict.items() - if not value.done() - } + memory_dict = _refresh_memory_dict( + memory_dict=memory_dict, + cache_dir_dict=cache_dir_dict, + ) def _check_task_output( @@ -275,3 +260,15 @@ def _convert_args_and_kwargs( else: task_kwargs[key] = arg return task_args, task_kwargs, future_wait_key_lst + + +def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict: + return { + key: _check_task_output( + task_key=key, + future_obj=value, + cache_directory=cache_dir_dict[key], + ) + for key, value in memory_dict.items() + if not value.done() + } From b0255c1ba0a14b7e581966370a2fda42dde01dcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Mon, 9 Feb 2026 20:30:52 +0100 Subject: [PATCH 2/6] Add _cancel_processes() --- src/executorlib/task_scheduler/file/shared.py | 59 +++++++++++++++---- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index f4caddde..668b412b 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -95,19 +95,12 @@ def execute_tasks_h5( cache_dir_dict=cache_dir_dict, ) if not task_dict["cancel_futures"] and wait: - if ( - terminate_function is not None - and terminate_function == terminate_subprocess - ): - for task in process_dict.values(): - terminate_function(task=task) - elif terminate_function is not None: - for queue_id in process_dict.values(): - terminate_function( - queue_id=queue_id, - config_directory=pysqa_config_directory, - backend=backend, - ) + _cancel_processes( + terminate_function=terminate_function, + process_dict=process_dict, + pysqa_config_directory=pysqa_config_directory, + backend=backend, + ) else: memory_dict = _refresh_memory_dict( memory_dict=memory_dict, @@ -263,6 +256,16 @@ def _convert_args_and_kwargs( def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict: + """ + Refresh memory dictionary + + Args: + memory_dict (dict): dictionary with task keys and future objects + cache_dir_dict (dict): dictionary with task keys and cache directories + + Returns: + dict: Updated memory dictionary + """ return { key: _check_task_output( task_key=key, @@ -272,3 +275,33 @@ def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict: for key, value in memory_dict.items() if not value.done() } + + +def _cancel_processes( + terminate_function: callable, + process_dict: dict, + pysqa_config_directory: str, + backend: str +): + """ + Cancel processes + + Args: + terminate_function (callable): The function to terminate the tasks. + process_dict (dict): dictionary with task keys and process reference. + pysqa_config_directory (str): path to the pysqa config directory (only for pysqa based backend). + backend (str): name of the backend used to spawn tasks. + """ + if ( + terminate_function is not None + and terminate_function == terminate_subprocess + ): + for task in process_dict.values(): + terminate_function(task=task) + elif terminate_function is not None: + for queue_id in process_dict.values(): + terminate_function( + queue_id=queue_id, + config_directory=pysqa_config_directory, + backend=backend, + ) From 7910aca82c66758485f7309c04fed98b43564e0e Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Mon, 9 Feb 2026 19:33:27 +0000 Subject: [PATCH 3/6] Format black --- src/executorlib/task_scheduler/file/shared.py | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 668b412b..9145fa6a 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -91,19 +91,19 @@ def execute_tasks_h5( if task_dict["wait"] and wait: while len(memory_dict) > 0: memory_dict = _refresh_memory_dict( - memory_dict=memory_dict, + memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, ) if not task_dict["cancel_futures"] and wait: _cancel_processes( - terminate_function=terminate_function, - process_dict=process_dict, - pysqa_config_directory=pysqa_config_directory, + terminate_function=terminate_function, + process_dict=process_dict, + pysqa_config_directory=pysqa_config_directory, backend=backend, ) else: memory_dict = _refresh_memory_dict( - memory_dict=memory_dict, + memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, ) for value in memory_dict.values(): @@ -177,7 +177,7 @@ def execute_tasks_h5( future_queue.task_done() else: memory_dict = _refresh_memory_dict( - memory_dict=memory_dict, + memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, ) @@ -257,8 +257,8 @@ def _convert_args_and_kwargs( def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict: """ - Refresh memory dictionary - + Refresh memory dictionary + Args: memory_dict (dict): dictionary with task keys and future objects cache_dir_dict (dict): dictionary with task keys and cache directories @@ -278,24 +278,21 @@ def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict: def _cancel_processes( - terminate_function: callable, - process_dict: dict, - pysqa_config_directory: str, - backend: str + terminate_function: callable, + process_dict: dict, + pysqa_config_directory: str, + backend: str, ): """ Cancel processes - + Args: terminate_function (callable): The function to terminate the tasks. process_dict (dict): dictionary with task keys and process reference. pysqa_config_directory (str): path to the pysqa config directory (only for pysqa based backend). backend (str): name of the backend used to spawn tasks. """ - if ( - terminate_function is not None - and terminate_function == terminate_subprocess - ): + if terminate_function is not None and terminate_function == terminate_subprocess: for task in process_dict.values(): terminate_function(task=task) elif terminate_function is not None: From 0502bc7e5443550fc0beeb35b6121643bbf49ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Mon, 9 Feb 2026 20:37:26 +0100 Subject: [PATCH 4/6] mypy fixes --- src/executorlib/task_scheduler/file/shared.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 9145fa6a..35572681 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -278,10 +278,10 @@ def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict: def _cancel_processes( - terminate_function: callable, + terminate_function: Callable, process_dict: dict, - pysqa_config_directory: str, - backend: str, + pysqa_config_directory: Optional[str] = None, + backend: Optional[str] = None, ): """ Cancel processes @@ -295,7 +295,7 @@ def _cancel_processes( if terminate_function is not None and terminate_function == terminate_subprocess: for task in process_dict.values(): terminate_function(task=task) - elif terminate_function is not None: + elif terminate_function is not None and backend is not None and pysqa_config_directory is not None: for queue_id in process_dict.values(): terminate_function( queue_id=queue_id, From 866e3f61ff841e288f36b610815ef1713fa03c67 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 9 Feb 2026 19:37:44 +0000 Subject: [PATCH 5/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/task_scheduler/file/shared.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 35572681..e00b0f65 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -295,7 +295,11 @@ def _cancel_processes( if terminate_function is not None and terminate_function == terminate_subprocess: for task in process_dict.values(): terminate_function(task=task) - elif terminate_function is not None and backend is not None and pysqa_config_directory is not None: + elif ( + terminate_function is not None + and backend is not None + and pysqa_config_directory is not None + ): for queue_id in process_dict.values(): terminate_function( queue_id=queue_id, From 3f5455930e368036e59f50e7bd14c4241b7d7e2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Mon, 9 Feb 2026 20:40:06 +0100 Subject: [PATCH 6/6] more fixes --- src/executorlib/task_scheduler/file/shared.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 35572681..edc528f8 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -278,8 +278,8 @@ def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict: def _cancel_processes( - terminate_function: Callable, process_dict: dict, + terminate_function: Optional[Callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, ): @@ -287,8 +287,8 @@ def _cancel_processes( Cancel processes Args: - terminate_function (callable): The function to terminate the tasks. process_dict (dict): dictionary with task keys and process reference. + terminate_function (callable): The function to terminate the tasks. pysqa_config_directory (str): path to the pysqa config directory (only for pysqa based backend). backend (str): name of the backend used to spawn tasks. """