HPyX Quickstart¶
This notebook walks through HPyX v1's public API end-to-end. It's a runnable companion to the Usage Guide and demonstrates the same examples in a hands-on environment.
You'll learn how to:
- Submit work to HPX and retrieve results via
hpyx.Future. - Compose futures with
when_all,when_any, anddataflow. - Use
HPXExecutoras a drop-inconcurrent.futures.Executor. - Integrate HPyX with
asyncio(await,wrap_future,run_in_executor). - Drive a
daskgraph through HPyX as the scheduler. - See real concurrency on free-threaded Python 3.13t.
Prerequisites. Use the
notebookpixi environment — it pairs HPyX with JupyterLab on regular (non-free-threaded) Python 3.13 because JupyterLab's transitive CFFI dependency does not yet support free-threading. From the repo root:pixi run -e notebook lab # launch JupyterLab on this notebook pixi run -e notebook execute-notebook # run all cells in place (CI-friendly)The final scaling cell only shows true parallelism on
python-freethreading; under regular Python 3.13 the GIL serializes pure-Python callables, so the speedup is modest. To see the full benefit on free-threaded Python, run that cell separately viapixi run -e test-py313t python -c '...'.
1. Set up the runtime¶
HPyX auto-initializes the HPX runtime on first use. Calling hpyx.init(os_threads=...) upfront is optional but useful when you want to pin thread count for reproducible demos.
import hpyx
hpyx.init(os_threads=4)
print(f"HPyX version : {hpyx.__version__}")
print(f"Runtime up : {hpyx.is_running()}")
print(f"Worker count : {hpyx.debug.get_num_worker_threads()}")
HPyX version : 2025.8.29.dev33 Runtime up : True Worker count : 4
2. Submit a single task¶
hpyx.async_(fn, *args, **kwargs) schedules fn on an HPX worker thread and returns a hpyx.Future. The future implements the full concurrent.futures.Future protocol plus HPX-native .then and asyncio await support.
def square(x):
return x * x
fut = hpyx.async_(square, 12)
print("future repr:", fut)
print("result :", fut.result())
print("done :", fut.done())
future repr: <hpyx.Future state=pending> result : 144 done : True
Verify that the callable actually ran on an HPX worker (not the calling thread):
import threading
main_tid = threading.get_ident()
def report_thread():
return {
"hpx_worker_id": hpyx.debug.get_worker_thread_id(),
"py_thread_id": threading.get_ident(),
}
info = hpyx.async_(report_thread).result()
print("main thread id :", main_tid)
print("task ran on :", info)
assert info["hpx_worker_id"] >= 0 # -1 means "not on an HPX thread"
main thread id : 8406835776
task ran on : {'hpx_worker_id': 2, 'py_thread_id': 6167326720}
Exceptions propagate through result() cleanly:
def boom():
raise ValueError("upstream-error")
fut = hpyx.async_(boom)
try:
fut.result()
except ValueError as e:
print("caught:", repr(e))
caught: ValueError('upstream-error')
3. Compose futures with combinators¶
HPyX exposes four composition primitives at the package root:
| Function | Result |
|---|---|
when_all(*futs) |
Future resolving to a tuple of all input results |
when_any(*futs) |
Future resolving to (index, [Future, ...]) when any one completes |
dataflow(fn, *futs, **kwargs) |
Calls fn(*resolved_values, **kwargs) once all inputs complete |
shared_future(f) / ready_future(value) |
Sharable view / pre-completed future |
f1 = hpyx.async_(lambda: 1)
f2 = hpyx.async_(lambda: 2)
f3 = hpyx.async_(lambda: 3)
tuple_result = hpyx.when_all(f1, f2, f3).result()
print("when_all :", tuple_result)
when_all : (1, 2, 3)
import time
def slow():
time.sleep(0.2)
return "slow"
f_slow = hpyx.async_(slow)
f_fast = hpyx.async_(lambda: "fast")
idx, futures_list = hpyx.when_any(f_slow, f_fast).result()
print(f"first to finish was index {idx} → {futures_list[idx].result()!r}")
first to finish was index 1 → 'fast'
def combine(a, b, c, *, scale=1):
return (a + b + c) * scale
result = hpyx.dataflow(combine, f1, f2, f3, scale=10)
print("dataflow :", result.result())
dataflow : 60
Exception short-circuit. If any input future raises, when_all and dataflow propagate the first failure unchanged — the downstream callable in dataflow is not invoked.
def raise_(): raise RuntimeError("upstream-fail")
f_ok = hpyx.async_(lambda: 1)
f_bad = hpyx.async_(raise_)
try:
hpyx.dataflow(lambda a, b: a + b, f_ok, f_bad).result()
except RuntimeError as e:
print("propagated:", repr(e))
propagated: RuntimeError('upstream-fail')
4. Use HPXExecutor as a concurrent.futures.Executor¶
hpyx.HPXExecutor is a real subclass of concurrent.futures.Executor. Code that already targets the standard library (submit, map, shutdown, the context manager) Just Works.
import concurrent.futures
with hpyx.HPXExecutor() as ex:
print("is Executor :", isinstance(ex, concurrent.futures.Executor))
fut = ex.submit(pow, 2, 10)
print("submit pow :", fut.result())
print("map squares :", list(ex.map(lambda x: x * x, range(8))))
is Executor : True submit pow : 1024 map squares : [0, 1, 4, 9, 16, 25, 36, 49]
Per-handle shutdown does not affect other live handles or stop the runtime — atexit owns process-level teardown because HPX cannot restart in-process.
ex_a = hpyx.HPXExecutor()
ex_b = hpyx.HPXExecutor()
ex_a.shutdown()
print("ex_b still works:", ex_b.submit(lambda: "ok").result())
ex_b.shutdown()
ex_b still works: ok
5. asyncio integration¶
hpyx.Future is awaitable. Three patterns are equivalent for asyncio code:
- Direct
await fut— usesFuture.__await__and posts back vialoop.call_soon_threadsafe. asyncio.wrap_future(fut)— works becausehpyx.Futureis a realconcurrent.futures.Futuresubclass.loop.run_in_executor(HPXExecutor(), fn, ...)— stdlib pattern, no HPyX-specific syntax.
Plus the hpyx.aio.await_all / await_any async-friendly wrappers around the combinators.
import asyncio
# Modern Jupyter supports top-level `await` directly, so we don't need
# `asyncio.run(...)` here — the cell runs inside the kernel's existing loop.
direct = await hpyx.async_(lambda: "direct await")
wrapped = await asyncio.wrap_future(hpyx.async_(lambda: "wrap_future"))
loop = asyncio.get_running_loop()
with hpyx.HPXExecutor() as ex:
run_in_exec = await loop.run_in_executor(ex, pow, 2, 10)
gathered = await hpyx.aio.await_all(
hpyx.async_(lambda: 1),
hpyx.async_(lambda: 2),
hpyx.async_(lambda: 3),
)
direct, wrapped, run_in_exec, gathered
('direct await', 'wrap_future', 1024, (1, 2, 3))
The bridge does not block the event loop. The next cell runs an asyncio.sleep counter task in parallel with a 100ms HPX time.sleep — the counter still ticks while HPX is busy because the event loop is free.
iterations = 0
async def counter():
global iterations
while iterations < 100:
iterations += 1
await asyncio.sleep(0.001)
def slow():
time.sleep(0.1)
return "hpx-done"
counter_task = asyncio.create_task(counter())
result = await hpyx.async_(slow)
await counter_task
print(f"hpx result : {result}")
print(f"loop ticks : {iterations} (would be ~0 if the bridge blocked)")
hpx result : hpx-done loop ticks : 100 (would be ~0 if the bridge blocked)
6. Use HPXExecutor as a dask scheduler¶
Because HPXExecutor is a concurrent.futures.Executor, dask accepts it directly as a scheduler — no HPyX-side adapter required. This was the integration that motivated the v1 executor rewrite.
import dask.array as da
import numpy as np
with hpyx.HPXExecutor() as ex:
x = da.arange(1_000_000, chunks=50_000)
total = x.sum().compute(scheduler=ex)
expected = np.arange(1_000_000).sum()
print(f"hpx + dask sum : {int(total)}")
print(f"expected : {int(expected)}")
assert int(total) == int(expected)
hpx + dask sum : 499999500000 expected : 499999500000
rng = np.random.default_rng(0)
a_np = rng.random((128, 128))
b_np = rng.random((128, 128))
a = da.from_array(a_np, chunks=(32, 32))
b = da.from_array(b_np, chunks=(32, 32))
with hpyx.HPXExecutor() as ex:
c = (a @ b).compute(scheduler=ex)
np.testing.assert_allclose(c, a_np @ b_np, rtol=1e-10)
print("chunked matmul matches numpy reference ✓")
chunked matmul matches numpy reference ✓
7. See free-threaded Python 3.13t actually parallelize¶
On free-threaded Python the GIL is gone, so HPX's worker pool can run pure-Python callables truly in parallel. Submit N × time.sleep(0.1) and watch the wall-clock collapse from N × 0.1s (serial) to ~ceil(N / os_threads) × 0.1s (parallel).
N = 20
start = time.perf_counter()
with hpyx.HPXExecutor() as ex:
futs = [ex.submit(time.sleep, 0.1) for _ in range(N)]
for f in futs:
f.result()
elapsed = time.perf_counter() - start
serial = N * 0.1
print(f"{N} x 0.1s sleeps via HPXExecutor (os_threads=4)")
print(f" elapsed : {elapsed:.2f}s")
print(f" serial : {serial:.2f}s (lower-bound serial)")
print(f" speedup : {serial / elapsed:.1f}x")
20 x 0.1s sleeps via HPXExecutor (os_threads=4) elapsed : 0.52s serial : 2.00s (lower-bound serial) speedup : 3.8x
Where to next?¶
- Usage Guide — full reference for every API surface in this notebook, plus error handling, performance considerations, and best practices.
- Architecture Decisions — the why behind the design (why
launch::async, whyconcurrent.futures.Futureinheritance, why per-handle shutdown, etc.). - API Reference — auto-generated module docs.
- User Guides — audience-specific deep dives: Scientific Python, concurrent.futures, HPX Native, Dask integration, asyncio, Diagnostics, and Free-Threaded Python 3.13t.