Composing Compute Nodes into a Workflow¶
Analysis tasks often need more than a single compute step. Radial distribution functions require neighbour lists; cluster analysis feeds on per-frame descriptors; mean-squared displacements chain into autocorrelation functions. Doing this by hand — calling each node, threading outputs into inputs, checking that upstream ran before downstream — is tedious and easy to get wrong.
Workflow makes this mechanical. You register named compute nodes, declare
which parameters come from which upstream node (or from an external input),
and the workflow handles topological order and result caching. It adds zero
dependencies beyond Python's standard library.
A linear chain¶
The simplest workflow is a straight pipeline. Node a runs first; node b
takes a's output as its x parameter.
from molpy.compute import Workflow
from molpy.compute.base import Compute
class Square(Compute):
"""Return the square of a number."""
def __init__(self):
super().__init__()
def _compute(self, x):
return x * x
class AddOne(Compute):
"""Add one."""
def __init__(self):
super().__init__()
def _compute(self, x):
return x + 1
wf = Workflow()
wf.add("square", Square())
wf.add("add_one", AddOne(), inputs={"x": "square"})
results = wf.run(x=3)
print(results) # {'square': 9, 'add_one': 10}
wf.add() returns the node name, so you can chain calls:
External inputs¶
When a parameter name in inputs does not match any registered node, the
workflow treats it as an external input. You must supply it to run().
wf = Workflow()
wf.add("square", Square())
# The parameter name "x" does not match any node → external input
results = wf.run(x=5)
print(results) # {'square': 25}
If you forget an external input, run() raises WorkflowMissingInputError
before executing any node — no partial execution.
Diamond reuse¶
When two downstream nodes share an upstream, the upstream runs exactly once. This is not a special code path — it falls out of the result cache naturally.
class Count(Compute):
"""Counts how many times it was called."""
def __init__(self):
super().__init__()
self.call_count = 0
def _compute(self, x):
self.call_count += 1
return x
wf = Workflow()
upstream = Count()
wf.add("upstream", upstream)
wf.add("branch_a", AddOne(), inputs={"x": "upstream"})
wf.add("branch_b", AddOne(), inputs={"x": "upstream"})
results = wf.run(x=1)
assert upstream.call_count == 1 # not 2
Real example: radial distribution function¶
A radial distribution function g(r) needs both a frame (for the box) and a neighbour list (for the pair distances). We express this as a two-node workflow.
import numpy as np
from molpy.compute import Workflow, NeighborList, RDF
import molpy
# Build a simple test frame — 10 atoms in a 10 Å cube
rng = np.random.default_rng(42)
xyz = rng.uniform(0.0, 10.0, size=(10, 3))
frame = molpy.Frame()
frame["atoms"] = {"x": xyz[:, 0], "y": xyz[:, 1], "z": xyz[:, 2]}
frame.box = molpy.Box.cubic(10.0)
wf = Workflow()
wf.add("nlist", NeighborList(cutoff=5.0))
wf.add("rdf", RDF(n_bins=100, r_max=10.0),
inputs={"frames": "frame", "neighbors": "nlist"})
results = wf.run(frame=frame)
rdf_array = np.asarray(results["rdf"].rdf)
print(f"g(r) has {len(rdf_array)} bins, max value {rdf_array.max():.3f}")
NeighborList needs only the frame, so it appears as a single external input.
RDF needs both the original frame (for box dimensions) and the neighbour
list — so its inputs map references both "frame" (external) and "nlist"
(upstream node).
Introspection¶
You can inspect the workflow before running it.
wf.nodes # ['nlist', 'rdf'] — insertion order
wf.external_inputs # {'frame'} — all unregistered source names
wf.topological_order() # ['nlist', 'rdf'] — execution order
wf.predecessors("rdf") # {'nlist'} — node predecessors only (no externals)
predecessors() deliberately excludes external inputs — it describes the
internal DAG topology, not every dependency.
Cycle detection¶
If adding a node would create a cycle, add() raises WorkflowCycleError
and rolls back — the workflow state is unchanged.
wf = Workflow()
wf.add("a", Square())
# b depends on a → OK
wf.add("b", Square(), inputs={"x": "a"})
# Adding a node that creates a back-edge a → b closes the cycle
try:
wf.add("c", Square(), inputs={"x": "b"}) # OK, linear
except WorkflowCycleError:
pass # not reached — this is valid
Cycle detection happens at registration time, not at execution time. You get immediate feedback.
Immutability contract¶
The workflow never mutates registered compute nodes. Calling run() twice
with the same external inputs produces identical results, and node.dump()
(which serialises the node's configuration) returns the same dictionary before
and after.
When not to use Workflow¶
Workflow is serial and synchronous. It does not (yet) support parallel
execution, conditional nodes, or streaming. For those patterns, use
TopologicalSorter.get_ready() / .done() directly, or wait for a future
version.