From fc20627af2f74968cf2ac6077640babe0b2c8185 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 22 Dec 2022 16:01:28 +0100 Subject: [PATCH 01/14] Add local subprocess cluster --- distributed/deploy/subprocess.py | 114 ++++++++++++++++++++ distributed/deploy/tests/test_subprocess.py | 33 ++++++ 2 files changed, 147 insertions(+) create mode 100644 distributed/deploy/subprocess.py create mode 100644 distributed/deploy/tests/test_subprocess.py diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py new file mode 100644 index 00000000000..c73e02f42f1 --- /dev/null +++ b/distributed/deploy/subprocess.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import asyncio +import copy +import json +import logging +import math +from typing import Any + +from dask.system import CPU_COUNT + +from distributed import Scheduler +from distributed.deploy.spec import ProcessInterface, SpecCluster +from distributed.deploy.utils import nprocesses_nthreads +from distributed.worker_memory import parse_memory_limit + +logger = logging.getLogger(__name__) + + +class SubprocessWorker(ProcessInterface): + scheduler: str + worker_class: str + worker_options: dict + name: str | None + process: asyncio.subprocess.Process | None + + def __init__( + self, + scheduler: str, + worker_class: str = "distributed.Nanny", + name: str | None = None, + worker_options: dict | None = None, + **kwargs: Any, + ) -> None: + self.scheduler = scheduler + self.worker_class = worker_class + self.name = name + self.worker_options = copy.copy(worker_options or {}) + self.process = None + logger.info(kwargs) + super().__init__(**kwargs) + + async def start(self) -> None: + self.process = await asyncio.create_subprocess_exec( + "dask", + "spec", + self.scheduler, + "--spec", + json.dumps( + {0: {"cls": self.worker_class, "opts": {**self.worker_options}}} + ), + ) + await super().start() + + async def close(self) -> None: + if self.process: + self.process.kill() + await self.process.communicate() + await super().close() + + +def SubprocessCluster( + host: str | None = None, + scheduler_port: int = 0, + scheduler_options: dict | None = None, + dashboard_address: str = ":8787", + worker_class: str = "distributed.Nanny", + n_workers: int | None = None, + threads_per_worker: int | None = None, + worker_dashboard_address: str | None = None, + worker_options: dict | None = None, + **kwargs: Any, +) -> SpecCluster: + if not host: + host = "127.0.0.1" + worker_options = worker_options or {} + scheduler_options = scheduler_options or {} + if n_workers is None and threads_per_worker is None: + n_workers, threads_per_worker = nprocesses_nthreads() + if n_workers is None and threads_per_worker is not None: + n_workers = max(1, CPU_COUNT // threads_per_worker) + if n_workers and threads_per_worker is None: + # Overcommit threads per worker, rather than undercommit + threads_per_worker = max(1, int(math.ceil(CPU_COUNT / n_workers))) + if n_workers and "memory_limit" not in worker_options: + worker_options["memory_limit"] = parse_memory_limit( + "auto", 1, n_workers, logger=logger + ) + assert n_workers + + scheduler_options.update( + { + "host": host, + "port": scheduler_port, + "dashboard": dashboard_address is not None, + "dashboard_address": dashboard_address, + } + ) + worker_options.update( + { + "host": host, + "nthreads": threads_per_worker, + "dashboard": worker_dashboard_address is not None, + "dashboard_address": worker_dashboard_address, + } + ) + + scheduler = {"cls": Scheduler, "options": scheduler_options} + worker = { + "cls": SubprocessWorker, + "options": {"worker_class": worker_class, "worker_options": worker_options}, + } + workers = {i: worker for i in range(n_workers)} + return SpecCluster(workers, scheduler, name="SubprocessCluster", **kwargs) diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py new file mode 100644 index 00000000000..e70fc43c1a6 --- /dev/null +++ b/distributed/deploy/tests/test_subprocess.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from distributed import Client +from distributed.deploy.subprocess import SubprocessCluster +from distributed.utils_test import gen_test + + +@gen_test() +async def test_basic(): + async with SubprocessCluster( + asynchronous=True, + dashboard_address=":0", + scheduler_options={"idle_timeout": "5s"}, + worker_options={"death_timeout": "5s"}, + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + result = await client.submit(lambda x: x + 1, 10) + assert result == 11 + assert not cluster._supports_scaling + assert "Subprocess" in repr(cluster) + + +@gen_test() +async def test_n_workers(): + async with SubprocessCluster( + asynchronous=True, dashboard_address=":0", n_workers=2 + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + assert len(cluster.workers) == 2 + result = await client.submit(lambda x: x + 1, 10) + assert result == 11 + assert not cluster._supports_scaling + assert "Subprocess" in repr(cluster) From c7389dec99122a86aca8315812ae88765c38e576 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 22 Dec 2022 16:23:35 +0100 Subject: [PATCH 02/14] Silence logs --- distributed/deploy/subprocess.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index c73e02f42f1..7e39b40bb08 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -53,9 +53,10 @@ async def start(self) -> None: await super().start() async def close(self) -> None: - if self.process: + if self.process and self.process.returncode is None: self.process.kill() - await self.process.communicate() + await self.process.wait() + self.process = None await super().close() @@ -69,12 +70,14 @@ def SubprocessCluster( threads_per_worker: int | None = None, worker_dashboard_address: str | None = None, worker_options: dict | None = None, + silence_logs: int = logging.WARN, **kwargs: Any, ) -> SpecCluster: if not host: host = "127.0.0.1" worker_options = worker_options or {} scheduler_options = scheduler_options or {} + if n_workers is None and threads_per_worker is None: n_workers, threads_per_worker = nprocesses_nthreads() if n_workers is None and threads_per_worker is not None: @@ -102,6 +105,7 @@ def SubprocessCluster( "nthreads": threads_per_worker, "dashboard": worker_dashboard_address is not None, "dashboard_address": worker_dashboard_address, + "silence_logs": silence_logs, } ) @@ -111,4 +115,10 @@ def SubprocessCluster( "options": {"worker_class": worker_class, "worker_options": worker_options}, } workers = {i: worker for i in range(n_workers)} - return SpecCluster(workers, scheduler, name="SubprocessCluster", **kwargs) + return SpecCluster( + workers, + scheduler, + name="SubprocessCluster", + silence_logs=silence_logs, + **kwargs, + ) From 8290bbea0df4424f343b80de3647479ac0a7a96b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 12:25:07 +0100 Subject: [PATCH 03/14] Allow scaling --- distributed/deploy/subprocess.py | 7 ++++--- distributed/deploy/tests/test_subprocess.py | 23 +++++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 7e39b40bb08..3e9e76a6cef 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -89,7 +89,7 @@ def SubprocessCluster( worker_options["memory_limit"] = parse_memory_limit( "auto", 1, n_workers, logger=logger ) - assert n_workers + assert n_workers is not None scheduler_options.update( { @@ -116,8 +116,9 @@ def SubprocessCluster( } workers = {i: worker for i in range(n_workers)} return SpecCluster( - workers, - scheduler, + workers=workers, + scheduler=scheduler, + worker=worker, name="SubprocessCluster", silence_logs=silence_logs, **kwargs, diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py index e70fc43c1a6..cc4ea8f3c6a 100644 --- a/distributed/deploy/tests/test_subprocess.py +++ b/distributed/deploy/tests/test_subprocess.py @@ -31,3 +31,26 @@ async def test_n_workers(): assert result == 11 assert not cluster._supports_scaling assert "Subprocess" in repr(cluster) + + +@gen_test() +async def test_scale_up_and_down(): + async with SubprocessCluster( + n_workers=0, + silence_logs=False, + dashboard_address=":0", + asynchronous=True, + ) as cluster: + async with Client(cluster, asynchronous=True) as c: + + assert not cluster.workers + + cluster.scale(2) + await c.wait_for_workers(2) + assert len(cluster.workers) == 2 + assert len(cluster.scheduler.workers) == 2 + + cluster.scale(1) + await cluster + + assert len(cluster.workers) == 1 From 4b0bc689c7b8531251f609676d7b0279d529af83 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 12:25:36 +0100 Subject: [PATCH 04/14] Minor --- distributed/deploy/tests/test_subprocess.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py index cc4ea8f3c6a..6f5d6682ac7 100644 --- a/distributed/deploy/tests/test_subprocess.py +++ b/distributed/deploy/tests/test_subprocess.py @@ -16,7 +16,7 @@ async def test_basic(): async with Client(cluster, asynchronous=True) as client: result = await client.submit(lambda x: x + 1, 10) assert result == 11 - assert not cluster._supports_scaling + assert cluster._supports_scaling assert "Subprocess" in repr(cluster) @@ -29,7 +29,7 @@ async def test_n_workers(): assert len(cluster.workers) == 2 result = await client.submit(lambda x: x + 1, 10) assert result == 11 - assert not cluster._supports_scaling + assert cluster._supports_scaling assert "Subprocess" in repr(cluster) From cd5094fe4a89e58af5e11ad8dfc922ecbb088b8a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 12:51:18 +0100 Subject: [PATCH 05/14] Do not support Windows --- distributed/deploy/subprocess.py | 7 +++++++ distributed/deploy/tests/test_subprocess.py | 17 ++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 3e9e76a6cef..c8c6f999132 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -10,6 +10,7 @@ from dask.system import CPU_COUNT from distributed import Scheduler +from distributed.compatibility import WINDOWS from distributed.deploy.spec import ProcessInterface, SpecCluster from distributed.deploy.utils import nprocesses_nthreads from distributed.worker_memory import parse_memory_limit @@ -32,6 +33,9 @@ def __init__( worker_options: dict | None = None, **kwargs: Any, ) -> None: + if WINDOWS: + # FIXME: distributed#7434 + raise RuntimeError("SubprocessWorker does not support Windows.") self.scheduler = scheduler self.worker_class = worker_class self.name = name @@ -73,6 +77,9 @@ def SubprocessCluster( silence_logs: int = logging.WARN, **kwargs: Any, ) -> SpecCluster: + if WINDOWS: + # FIXME: distributed#7434 + raise RuntimeError("SubprocessCluster does not support Windows.") if not host: host = "127.0.0.1" worker_options = worker_options or {} diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py index 6f5d6682ac7..91b1bf30139 100644 --- a/distributed/deploy/tests/test_subprocess.py +++ b/distributed/deploy/tests/test_subprocess.py @@ -1,10 +1,14 @@ from __future__ import annotations +import pytest + from distributed import Client -from distributed.deploy.subprocess import SubprocessCluster +from distributed.compatibility import WINDOWS +from distributed.deploy.subprocess import SubprocessCluster, SubprocessWorker from distributed.utils_test import gen_test +@pytest.mark.skipif(WINDOWS, reason="distributed#7434") @gen_test() async def test_basic(): async with SubprocessCluster( @@ -20,6 +24,7 @@ async def test_basic(): assert "Subprocess" in repr(cluster) +@pytest.mark.skipif(WINDOWS, reason="distributed#7434") @gen_test() async def test_n_workers(): async with SubprocessCluster( @@ -33,6 +38,7 @@ async def test_n_workers(): assert "Subprocess" in repr(cluster) +@pytest.mark.skipif(WINDOWS, reason="distributed#7434") @gen_test() async def test_scale_up_and_down(): async with SubprocessCluster( @@ -54,3 +60,12 @@ async def test_scale_up_and_down(): await cluster assert len(cluster.workers) == 1 + + +@pytest.mark.skipif(not WINDOWS, reason="Windows-specific error testing") +def test_raise_on_windows(): + with pytest.raises(RuntimeError, match="not support Windows"): + SubprocessCluster() + + with pytest.raises(RuntimeError, match="not support Windows"): + SubprocessWorker(scheduler="tcp://127.0.0.1:8786") From 22030d979fd81f5d3d9e77d058ed184fb4ae20bb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 12:51:58 +0100 Subject: [PATCH 06/14] minor --- distributed/deploy/tests/test_subprocess.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py index 91b1bf30139..86034219eb7 100644 --- a/distributed/deploy/tests/test_subprocess.py +++ b/distributed/deploy/tests/test_subprocess.py @@ -62,7 +62,9 @@ async def test_scale_up_and_down(): assert len(cluster.workers) == 1 -@pytest.mark.skipif(not WINDOWS, reason="Windows-specific error testing") +@pytest.mark.skipif( + not WINDOWS, reason="Windows-specific error testing (distributed#7434)" +) def test_raise_on_windows(): with pytest.raises(RuntimeError, match="not support Windows"): SubprocessCluster() From 41b71d6fa59315660d39659291071079e7b1f058 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 13:55:29 +0100 Subject: [PATCH 07/14] Add docstrings --- distributed/deploy/subprocess.py | 47 ++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index c8c6f999132..02c9dab7bd1 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -72,11 +72,54 @@ def SubprocessCluster( worker_class: str = "distributed.Nanny", n_workers: int | None = None, threads_per_worker: int | None = None, - worker_dashboard_address: str | None = None, worker_options: dict | None = None, silence_logs: int = logging.WARN, **kwargs: Any, ) -> SpecCluster: + """Create in-process scheduler and workers in dedicated subprocesses + + This creates a "cluster" of a scheduler running in the current process and + workers running in dedicated subprocesses. + + Parameters + ---------- + host: + Host address on which the scheduler will listen, defaults to localhost + scheduler_port: + Port fo the scheduler, defaults to 0 to choose a random port + scheduler_options: + Keywords to pass on to scheduler + dashboard_address: + Address on which to listen for the Bokeh diagnostics server like + 'localhost:8787' or '0.0.0.0:8787', defaults to ':8787' + + Set to ``None`` to disable the dashboard. + Use ':0' for a random port. + worker_class: + Worker class to instantiate workers from, defaults to 'distributed.Nanny' + n_workers: + Number of workers to start + threads: + Number of threads per each worker + worker_options: + Keywords to pass on to the ``Worker`` class constructor + silence_logs: + Level of logs to print out to stdout, defaults to ``logging.WARN`` + + Use a falsy value like False or None to disable log silencing. + + Examples + -------- + >>> cluster = SubprocessCluster() # Create a subprocess cluster #doctest: +SKIP + >>> cluster # doctest: +SKIP + SubprocessCluster(SubprocessCluster, 'tcp://127.0.0.1:61207', workers=5, threads=10, memory=16.00 GiB) + + >>> c = Client(cluster) # connect to subprocess cluster # doctest: +SKIP + + Scale the cluster to three workers + + >>> cluster.scale(3) # doctest: +SKIP + """ if WINDOWS: # FIXME: distributed#7434 raise RuntimeError("SubprocessCluster does not support Windows.") @@ -110,8 +153,6 @@ def SubprocessCluster( { "host": host, "nthreads": threads_per_worker, - "dashboard": worker_dashboard_address is not None, - "dashboard_address": worker_dashboard_address, "silence_logs": silence_logs, } ) From f4caedd7974dd0e43d0654607cbc245990bf5792 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 13:56:00 +0100 Subject: [PATCH 08/14] Minor --- distributed/deploy/subprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 02c9dab7bd1..e551e532c91 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -76,7 +76,7 @@ def SubprocessCluster( silence_logs: int = logging.WARN, **kwargs: Any, ) -> SpecCluster: - """Create in-process scheduler and workers in dedicated subprocesses + """Create in-process scheduler and workers running in dedicated subprocesses This creates a "cluster" of a scheduler running in the current process and workers running in dedicated subprocesses. From 8a8c31daca60df3bf9f2f0d0ef9b70481758aff8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 13:57:12 +0100 Subject: [PATCH 09/14] Rename options to kwargs --- distributed/deploy/subprocess.py | 24 ++++++++++----------- distributed/deploy/tests/test_subprocess.py | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index e551e532c91..7d52ea34756 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -67,12 +67,12 @@ async def close(self) -> None: def SubprocessCluster( host: str | None = None, scheduler_port: int = 0, - scheduler_options: dict | None = None, + scheduler_kwargs: dict | None = None, dashboard_address: str = ":8787", worker_class: str = "distributed.Nanny", n_workers: int | None = None, threads_per_worker: int | None = None, - worker_options: dict | None = None, + worker_kwargs: dict | None = None, silence_logs: int = logging.WARN, **kwargs: Any, ) -> SpecCluster: @@ -87,7 +87,7 @@ def SubprocessCluster( Host address on which the scheduler will listen, defaults to localhost scheduler_port: Port fo the scheduler, defaults to 0 to choose a random port - scheduler_options: + scheduler_kwargs: Keywords to pass on to scheduler dashboard_address: Address on which to listen for the Bokeh diagnostics server like @@ -101,7 +101,7 @@ def SubprocessCluster( Number of workers to start threads: Number of threads per each worker - worker_options: + worker_kwargs: Keywords to pass on to the ``Worker`` class constructor silence_logs: Level of logs to print out to stdout, defaults to ``logging.WARN`` @@ -125,8 +125,8 @@ def SubprocessCluster( raise RuntimeError("SubprocessCluster does not support Windows.") if not host: host = "127.0.0.1" - worker_options = worker_options or {} - scheduler_options = scheduler_options or {} + worker_kwargs = worker_kwargs or {} + scheduler_kwargs = scheduler_kwargs or {} if n_workers is None and threads_per_worker is None: n_workers, threads_per_worker = nprocesses_nthreads() @@ -135,13 +135,13 @@ def SubprocessCluster( if n_workers and threads_per_worker is None: # Overcommit threads per worker, rather than undercommit threads_per_worker = max(1, int(math.ceil(CPU_COUNT / n_workers))) - if n_workers and "memory_limit" not in worker_options: - worker_options["memory_limit"] = parse_memory_limit( + if n_workers and "memory_limit" not in worker_kwargs: + worker_kwargs["memory_limit"] = parse_memory_limit( "auto", 1, n_workers, logger=logger ) assert n_workers is not None - scheduler_options.update( + scheduler_kwargs.update( { "host": host, "port": scheduler_port, @@ -149,7 +149,7 @@ def SubprocessCluster( "dashboard_address": dashboard_address, } ) - worker_options.update( + worker_kwargs.update( { "host": host, "nthreads": threads_per_worker, @@ -157,10 +157,10 @@ def SubprocessCluster( } ) - scheduler = {"cls": Scheduler, "options": scheduler_options} + scheduler = {"cls": Scheduler, "options": scheduler_kwargs} worker = { "cls": SubprocessWorker, - "options": {"worker_class": worker_class, "worker_options": worker_options}, + "options": {"worker_class": worker_class, "worker_options": worker_kwargs}, } workers = {i: worker for i in range(n_workers)} return SpecCluster( diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py index 86034219eb7..f15b1ac1adb 100644 --- a/distributed/deploy/tests/test_subprocess.py +++ b/distributed/deploy/tests/test_subprocess.py @@ -14,8 +14,8 @@ async def test_basic(): async with SubprocessCluster( asynchronous=True, dashboard_address=":0", - scheduler_options={"idle_timeout": "5s"}, - worker_options={"death_timeout": "5s"}, + scheduler_kwargs={"idle_timeout": "5s"}, + worker_kwargs={"death_timeout": "5s"}, ) as cluster: async with Client(cluster, asynchronous=True) as client: result = await client.submit(lambda x: x + 1, 10) From 23edf756faf7d1adaa1a39754d02ffef5aee3908 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 13:59:21 +0100 Subject: [PATCH 10/14] Experimental warning --- distributed/deploy/subprocess.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 7d52ea34756..c3d15eb202e 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -81,6 +81,10 @@ def SubprocessCluster( This creates a "cluster" of a scheduler running in the current process and workers running in dedicated subprocesses. + .. warning:: + + This function is experimental + Parameters ---------- host: From b1adc3b2f410552a91812678c57e85814242d46c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 14:02:01 +0100 Subject: [PATCH 11/14] Export SubprocessCluster --- distributed/__init__.py | 9 ++++++++- distributed/deploy/__init__.py | 1 + distributed/deploy/subprocess.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/distributed/__init__.py b/distributed/__init__.py index 279796ff97b..d8e20f367ff 100644 --- a/distributed/__init__.py +++ b/distributed/__init__.py @@ -27,7 +27,13 @@ wait, ) from distributed.core import Status, connect, rpc -from distributed.deploy import Adaptive, LocalCluster, SpecCluster, SSHCluster +from distributed.deploy import ( + Adaptive, + LocalCluster, + SpecCluster, + SSHCluster, + SubprocessCluster, +) from distributed.diagnostics.plugin import ( CondaInstall, Environ, @@ -134,6 +140,7 @@ def _(): "SpecCluster", "Status", "Sub", + "SubprocessCluster", "TimeoutError", "UploadDirectory", "UploadFile", diff --git a/distributed/deploy/__init__.py b/distributed/deploy/__init__.py index baba6b077af..6c3996606f1 100644 --- a/distributed/deploy/__init__.py +++ b/distributed/deploy/__init__.py @@ -7,6 +7,7 @@ from distributed.deploy.local import LocalCluster from distributed.deploy.spec import ProcessInterface, SpecCluster from distributed.deploy.ssh import SSHCluster +from distributed.deploy.subprocess import SubprocessCluster with suppress(ImportError): from distributed.deploy.ssh import SSHCluster diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index c3d15eb202e..431f43f6753 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -9,10 +9,10 @@ from dask.system import CPU_COUNT -from distributed import Scheduler from distributed.compatibility import WINDOWS from distributed.deploy.spec import ProcessInterface, SpecCluster from distributed.deploy.utils import nprocesses_nthreads +from distributed.scheduler import Scheduler from distributed.worker_memory import parse_memory_limit logger = logging.getLogger(__name__) From 72d8c3203caa84029daa007ab10027ffa1dd4546 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 14:11:01 +0100 Subject: [PATCH 12/14] Docstrings --- distributed/deploy/subprocess.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 431f43f6753..30118a366d5 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -19,9 +19,23 @@ class SubprocessWorker(ProcessInterface): + """A local Dask worker running in a dedicated subprocess + + Parameters + ---------- + scheduler: + Address of the scheduler + worker_class: + Python class to use to create the worker, defaults to 'distributed.Nanny' + name: + Name of the worker + worker_kwargs: + Keywords to pass on to the ``Worker`` class constructor + """ + scheduler: str worker_class: str - worker_options: dict + worker_kwargs: dict name: str | None process: asyncio.subprocess.Process | None @@ -30,8 +44,7 @@ def __init__( scheduler: str, worker_class: str = "distributed.Nanny", name: str | None = None, - worker_options: dict | None = None, - **kwargs: Any, + worker_kwargs: dict | None = None, ) -> None: if WINDOWS: # FIXME: distributed#7434 @@ -39,10 +52,9 @@ def __init__( self.scheduler = scheduler self.worker_class = worker_class self.name = name - self.worker_options = copy.copy(worker_options or {}) + self.worker_kwargs = copy.copy(worker_kwargs or {}) self.process = None - logger.info(kwargs) - super().__init__(**kwargs) + super().__init__() async def start(self) -> None: self.process = await asyncio.create_subprocess_exec( @@ -50,9 +62,7 @@ async def start(self) -> None: "spec", self.scheduler, "--spec", - json.dumps( - {0: {"cls": self.worker_class, "opts": {**self.worker_options}}} - ), + json.dumps({0: {"cls": self.worker_class, "opts": {**self.worker_kwargs}}}), ) await super().start() @@ -164,7 +174,7 @@ def SubprocessCluster( scheduler = {"cls": Scheduler, "options": scheduler_kwargs} worker = { "cls": SubprocessWorker, - "options": {"worker_class": worker_class, "worker_options": worker_kwargs}, + "options": {"worker_class": worker_class, "worker_kwargs": worker_kwargs}, } workers = {i: worker for i in range(n_workers)} return SpecCluster( From 6e284430767a902088023e85bc7e03bf441aa640 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 14:18:37 +0100 Subject: [PATCH 13/14] kwarg precedence --- distributed/deploy/subprocess.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 30118a366d5..b58f33a9e08 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -7,6 +7,8 @@ import math from typing import Any +import toolz + from dask.system import CPU_COUNT from distributed.compatibility import WINDOWS @@ -155,20 +157,22 @@ def SubprocessCluster( ) assert n_workers is not None - scheduler_kwargs.update( + scheduler_kwargs = toolz.merge( { "host": host, "port": scheduler_port, "dashboard": dashboard_address is not None, "dashboard_address": dashboard_address, - } + }, + scheduler_kwargs, ) - worker_kwargs.update( + worker_kwargs = toolz.merge( { "host": host, "nthreads": threads_per_worker, "silence_logs": silence_logs, - } + }, + worker_kwargs, ) scheduler = {"cls": Scheduler, "options": scheduler_kwargs} From 3a2d42d1da61f74b9e61c4b493d393ca0f21cd36 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 23 Dec 2022 14:29:07 +0100 Subject: [PATCH 14/14] Kill all subprocesses --- distributed/deploy/subprocess.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index b58f33a9e08..0b4323ba107 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -7,6 +7,7 @@ import math from typing import Any +import psutil import toolz from dask.system import CPU_COUNT @@ -70,6 +71,8 @@ async def start(self) -> None: async def close(self) -> None: if self.process and self.process.returncode is None: + for child in psutil.Process(self.process.pid).children(recursive=True): + child.kill() self.process.kill() await self.process.wait() self.process = None