Skip to content

RainingComputers/deepslate

Repository files navigation

Deepslate Logo

Deepslate

Write crash resistant programs for financial applications, microservice orchestration, human in the loop applications and agentic workflows.

Deepslate is a durable execution interpreter for a minimal subset of Go and Python, it is an alternative to temporal and azure durable functions. What makes deepslate unique is that unlike existing durable execution engines, deepslate does not use event replay. It compiles the flow functions into custom bytecode and runs them on a custom control flow interpreter, deepslate then uses snapshots of the interpreter for crash recovery. This enables deepslate replay long running workflows very quickly and use less storage space.

This project is still being worked on and is not ready for public release.

Getting started

  • Install the golang toolchain

  • Create an empty project folder

    mkdir counter-example
    cd counter-example
    
  • Create virtual env

    python3 -m venv .venv
    source .venv/bin/activate
    
  • Clone deepslate

    git clone git@github.com:RainingComputers/deepslate.git ../deepslate
    
  • Build and install deepslate

    make -C ../deepslate pypiwheel
    pip install ../deepslate/dist/*.whl
    
  • Install fastapi and uvicorn

    pip install fastapi uvicorn
    
  • Start postgres using docker

    docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres:18-alpine
    
  • Save the example flow and server as server.py

    import uvicorn
    from fastapi import FastAPI
    
    from deepslate import flow, sleep, new_postgres, spawn_workers, spawn_inspector
    
    DBURL = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable"
    
    
    # durable flow that counts up forever, one tick per second,
    # can survive crashes and restarts
    @flow
    async def counter():
        count: int = 0
    
        while True:
            sleep(1.0)
            count = count + 1
    
    def create_app() -> FastAPI:
        # connect to postgres and start deepslate
        platform, _ = new_postgres(1, DBURL, "localhost:8765", 100, 100, 5)
        spawn_fn, _, _, _ = spawn_workers(4, platform, [counter()])
        spawn_inspector(platform, "localhost:8080")
    
        app = FastAPI()
        app.state.spawn = spawn_fn
    
        # spawn a new durable counter flow with the given name
        @app.post("/counter/{name}")
        def spawn_counter(name: str):
            id_, exists = app.state.spawn(name, "counter", {})
            return {"id": id_, "exists": exists}
    
        return app
    
    
    app = create_app()
    
    
    if __name__ == "__main__":
        uvicorn.run(app, host="0.0.0.0", port=8000, workers=1)
  • Run the example server

    python server.py
    
  • Spawn a flow

    curl -X POST http://localhost:8000/counter/my-counter
    
  • Visit the dashboard at localhost:8080 and inspect the my-counter isolate

  • Kill the server with Ctrl+C (leave postgres running)

    ^C
    
  • Restart the server

    python server.py
    
  • Visit the dashboard again at localhost:8080 and observe that count continues from where it left off (survived the crash) and that the flow continues to run sleeping and looping

Flow definition language

Deepslate lets you write durable workflows in a restricted subset language within @flow functions. The @flow function is meant to express only critical glue logic that requires durability. Flow functions can call other ordinary python functions where no restrictions exist.

You can think of a @flow function as a durable goroutine that can survive application restarts, deepslate provides golang-like concurrency constructs in python, i.e. spawning, channels (queue), wait group, mutex (lock) and select statement.

  • Flow definition: an async def function marked @flow, with typed params and an optional return type

    from deepslate import flow
    
    @flow
    async def add(a: int, b: int) -> int:
        return a + b
  • Types: int, bool, str, float, bytes, list[T], dict[K, V], @dataclass types, the result type T | DSError, and sync primitives dpslio.Queue[T], dpslio.Lock, dpslio.WaitGroup

  • Variable declaration: every variable needs a type annotation, the value is optional, e.g. count: int = 0 or result: bytes | DSError

  • Assignment: rebind an already declared variable, e.g. count = count + 1

  • Operators: arithmetic + - * / %, comparison == != < > <= >=, boolean and or not, and unary - not

  • Literals: int, float, str, True or False, list [1, 2, 3], and dict {"a": 1}

  • if / elif / else: conditional branching, e.g. if x > 0: ... elif x == 0: ... else: ...

  • while: loop while a condition holds, including infinite loops, e.g. while True: sleep(1.0)

  • for: iterate over a list, or use enumerate for index and value, e.g. for i, num in enumerate(numbers): ...

  • match / case: match a value against cases, e.g. match status: case 200: ... case _: ...

  • return: return a value or exit the flow, e.g. return result or bare return

  • DSError and isinstance: deepslate does not support exceptions, durable operations return the result type T | DSError, narrow it with isinstance on the success type and propagate by returning the error

    from deepslate import flow, http_listen, DSError
    
    @flow
    async def handle():
        data: bytes | DSError
        data = http_listen("POST", "/numbers")
        if not isinstance(data, bytes):
            return data
  • sleep: sleep for given number of seconds, deepslate flows can sleep for days or even months surviving restarts

    from deepslate import flow, sleep
    
    @flow
    async def delay():
        sleep(1.0)
  • http_listen: suspend until an HTTP request hits the given route, returning its body

    from deepslate import flow, http_listen, DSError
    
    @flow
    async def listener():
        data: bytes | DSError
        data = http_listen("POST", "/numbers")
  • http_request: make a durable outbound HTTP call

    from deepslate import flow, http_request, DSError
    
    @flow
    async def fetch():
        resp: bytes | DSError
        resp = http_request("GET", "http://localhost:8000/health", {}, b"")
  • json_marshal and json_unmarshal: encode to and decode from JSON bytes, decode takes the target type as a generic param

    from deepslate import flow, json_marshal, json_unmarshal, DSError
    
    @flow
    async def codec(data: bytes):
        nums: list[int] | DSError
        nums = json_unmarshal[list[int]](data)
        raw: bytes | DSError
        raw = json_marshal([1, 2, 3])
  • pure function: @pure marked deterministic functions that don't have any side effects, run inline without a checkpoint, e.g. status = get_status(resp) where:

    from deepslate import pure
    
    @pure
    def get_status(resp: HealthResponse) -> str:
        return resp.Status
  • impure function: @impure marked side-effect functions whose result is run once and checkpointed, e.g. insert_event(name) where:

    import psycopg2
    
    from deepslate import flow, impure
    
    @impure
    def insert_event(name: str) -> None:
        conn = psycopg2.connect(DBURL)
        cur = conn.cursor()
        cur.execute("INSERT INTO events (name) VALUES (%s)", (name,))
        conn.commit()
        conn.close()
    
    
    @flow
    async def record(name: str):
        insert_event(name)
  • Flow call: call another @flow function directly, it runs durably and returns its value

    from deepslate import flow
    
    @flow
    async def adder(a: int, b: int) -> int:
        return a + b
    
    
    @flow
    async def caller():
        total: int = adder(3, 5)
  • create_task: spawn a child flow that runs concurrently

    from deepslate import flow, sleep, dpslio
    
    @flow
    async def child(n: int):
        sleep(1.0)
    
    
    @flow
    async def parent():
        dpslio.create_task(child(1))
  • Queue: a durable channel, put and get are awaited

    from deepslate import flow, dpslio
    
    @flow
    async def consumer(ch: dpslio.Queue[int]):
        value: int = await ch.get()
    
    
    @flow
    async def producer():
        ch: dpslio.Queue[int] = dpslio.Queue[int]()
        await ch.put(1)
        dpslio.create_task(consumer(ch))
  • WaitGroup: wait for N concurrent children to finish

    from deepslate import flow, sleep, dpslio
    
    @flow
    async def task(wg: dpslio.WaitGroup):
        sleep(1.0)
        await wg.done()
    
    
    @flow
    async def parent():
        wg: dpslio.WaitGroup = dpslio.WaitGroup()
        wg.add(2)
        dpslio.create_task(task(wg))
        dpslio.create_task(task(wg))
        await wg.wait()
  • Lock: a durable mutex for exclusive sections

    from deepslate import flow, dpslio
    
    @flow
    async def worker(mu: dpslio.Lock):
        await mu.acquire()
        count: int = 1
        await mu.release()
  • select: proceed with whichever channel is ready first

    from deepslate import flow, dpslio
    
    @flow
    async def race(ch1: dpslio.Queue[int], ch2: dpslio.Queue[int]):
        result: int = 0
        match dpslio.select:
            case _ if v := await ch1.get():
                result = v
            case _ if v := await ch2.get():
                result = v

About

Snapshotting durable execution engine. Write crash resistant programs for financial applications, microservice orchestration, human in the loop applications and agentic workflows.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors