From 96a159965c9e16d098e8467e4f788a8ad6167d97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Mon, 9 Feb 2026 21:17:49 +0100 Subject: [PATCH 1/2] Feature: Cancel processes of cancelled futures --- src/executorlib/task_scheduler/file/shared.py | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 20c0a55a..9fb0227c 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -93,11 +93,15 @@ def execute_tasks_h5( memory_dict = _refresh_memory_dict( memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, + process_dict=process_dict, + terminate_function=terminate_function, + pysqa_config_directory=pysqa_config_directory, + backend=backend, ) if not task_dict["cancel_futures"] and wait: _cancel_processes( - terminate_function=terminate_function, process_dict=process_dict, + terminate_function=terminate_function, pysqa_config_directory=pysqa_config_directory, backend=backend, ) @@ -105,6 +109,10 @@ def execute_tasks_h5( memory_dict = _refresh_memory_dict( memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, + process_dict=process_dict, + terminate_function=terminate_function, + pysqa_config_directory=pysqa_config_directory, + backend=backend, ) for value in memory_dict.values(): if not value.done(): @@ -179,6 +187,10 @@ def execute_tasks_h5( memory_dict = _refresh_memory_dict( memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, + process_dict=process_dict, + terminate_function=terminate_function, + pysqa_config_directory=pysqa_config_directory, + backend=backend, ) @@ -255,17 +267,41 @@ def _convert_args_and_kwargs( return task_args, task_kwargs, future_wait_key_lst -def _refresh_memory_dict(memory_dict: dict, cache_dir_dict: dict) -> dict: +def _refresh_memory_dict( + memory_dict: dict, + cache_dir_dict: dict, + process_dict: dict, + terminate_function: Optional[Callable] = None, + pysqa_config_directory: Optional[str] = None, + backend: Optional[str] = None, +) -> dict: """ Refresh memory dictionary Args: memory_dict (dict): dictionary with task keys and future objects cache_dir_dict (dict): dictionary with task keys and cache directories + process_dict (dict): dictionary with task keys and process reference. + terminate_function (callable): The function to terminate the tasks. + pysqa_config_directory (str): path to the pysqa config directory (only for pysqa based backend). + backend (str): name of the backend used to spawn tasks. Returns: dict: Updated memory dictionary """ + cancelled_lst = [ + key for key, value in memory_dict.items() + if value.done() and value.cancelled() + ] + _cancel_processes( + process_dict={ + k:v for k, v in process_dict.items() + if k in cancelled_lst + }, + terminate_function=terminate_function, + pysqa_config_directory=pysqa_config_directory, + backend=backend, + ) return { key: _check_task_output( task_key=key, From cd044a2ae95482ea80b375ad15f5190eda0d45c6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 9 Feb 2026 20:18:35 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/task_scheduler/file/shared.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 9fb0227c..3f6aeeb6 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -268,7 +268,7 @@ def _convert_args_and_kwargs( def _refresh_memory_dict( - memory_dict: dict, + memory_dict: dict, cache_dir_dict: dict, process_dict: dict, terminate_function: Optional[Callable] = None, @@ -290,14 +290,10 @@ def _refresh_memory_dict( dict: Updated memory dictionary """ cancelled_lst = [ - key for key, value in memory_dict.items() - if value.done() and value.cancelled() + key for key, value in memory_dict.items() if value.done() and value.cancelled() ] _cancel_processes( - process_dict={ - k:v for k, v in process_dict.items() - if k in cancelled_lst - }, + process_dict={k: v for k, v in process_dict.items() if k in cancelled_lst}, terminate_function=terminate_function, pysqa_config_directory=pysqa_config_directory, backend=backend,