Skip to content

Compute

The compute module contains algorithms for calculating various molecular properties.

MCD (Molecular Connectivity Diagram)

Mean Displacement Correlation (MCD) computation for diffusion analysis.

This module implements the MCD method for computing self and distinct displacement correlations (MSD) from molecular dynamics trajectories.

Adapted from the tame library (https://github.com/Roy-Kid/tame).

MCDCompute

MCDCompute(tags, max_dt, dt, center_of_mass=None)

Bases: Compute['Trajectory', MCDResult]

Compute Mean Displacement Correlations (MSD) for diffusion analysis.

This class implements the MCD method which computes time correlation functions of particle displacements. It returns raw MSD values without fitting. It supports:

  • Self diffusion: MSD_i = <(r_i(t+dt) - r_i(t))²>
  • Distinct diffusion: <(r_i(t+dt) - r_i(t)) · (r_j(t+dt) - r_j(t))>

Parameters:

Name Type Description Default
tags list[str]

List of atom type specifications. Each tag can be: - Single integer (e.g., "3"): Self-diffusion MSD of type 3 - Two integers separated by comma (e.g., "3,4"): Distinct diffusion correlation between types 3 and 4

required
max_dt float

Maximum time lag in ps

required
dt float

Timestep in ps

required
center_of_mass dict[int, float] | None

Optional dict mapping element types to masses for COM removal. Format: {element_type: mass}, e.g., {1: 1.008, 6: 12.011}

None

Examples:

>>> from molpy.io import read_h5_trajectory
>>> traj = read_h5_trajectory("trajectory.h5")
>>>
>>> # Compute self-diffusion MSD of atom type 3
>>> mcd = MCDCompute(tags=["3"], max_dt=30.0, dt=0.01)
>>> result = mcd(traj)
>>> print(result.correlations["3"])  # MSD values at each time lag
>>>
>>> # Compute distinct diffusion between types 3 and 4
>>> mcd = MCDCompute(tags=["3,4"], max_dt=30.0, dt=0.01)
>>> result = mcd(traj)
>>> print(result.correlations["3,4"])  # Correlation values
Source code in src/molpy/compute/mcd.py
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(
    self,
    tags: list[str],
    max_dt: float,
    dt: float,
    center_of_mass: dict[int, float] | None = None,
):
    self.tags = tags
    self.max_dt = max_dt
    self.dt = dt
    self.center_of_mass = center_of_mass
    self.n_cache = int(max_dt / dt)

compute

compute(input)

Compute MCD from trajectory.

Parameters:

Name Type Description Default
input 'Trajectory'

Input trajectory object

required

Returns:

Type Description
MCDResult

MCDResult containing correlation functions (MSD values)

Source code in src/molpy/compute/mcd.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def compute(self, input: "Trajectory") -> MCDResult:
    """Compute MCD from trajectory.

    Args:
        input: Input trajectory object

    Returns:
        MCDResult containing correlation functions (MSD values)
    """
    trajectory = input
    # Extract data from trajectory
    coords_list = []
    elems_list = []
    box_list = []

    for frame in trajectory:
        if "atoms" not in frame:
            raise ValueError("Frame must contain 'atoms' block")

        atoms = frame["atoms"]
        if "x" not in atoms or "y" not in atoms or "z" not in atoms:
            raise ValueError("Atoms block must contain x, y, z coordinates")
        if "type" not in atoms:
            raise ValueError("Atoms block must contain 'type' field")

        coords = np.column_stack([atoms["x"], atoms["y"], atoms["z"]])
        coords_list.append(coords)
        elems_list.append(atoms["type"])

        # Get box from metadata
        if "box" in frame.metadata:
            box = frame.metadata["box"]
            box_list.append(box)
        else:
            raise ValueError("Frame must contain box information in metadata")

    coords_traj = np.array(coords_list)  # (n_frames, n_atoms, 3)
    elems_traj = np.array(elems_list)  # (n_frames, n_atoms)

    # Unwrap coordinates using Box.diff_dr() directly
    n_frames, n_atoms, n_dim = coords_traj.shape
    coords_unwrapped = np.zeros_like(coords_traj)
    coords_unwrapped[0] = coords_traj[0].copy()

    for i in range(1, n_frames):
        box = box_list[i]
        # Compute displacement and apply minimum image convention
        displacement = coords_traj[i] - coords_traj[i - 1]
        displacement_mic = box.diff_dr(displacement)
        coords_unwrapped[i] = coords_unwrapped[i - 1] + displacement_mic

    # Apply center of mass correction if requested
    if self.center_of_mass is not None:
        coords_unwrapped = self._remove_com(coords_unwrapped, elems_traj[0])

    # Compute correlations for each tag
    correlations = self._compute_correlations(coords_unwrapped, elems_traj[0])

    # Create time array
    time_array = np.arange(self.n_cache, dtype=np.float64) * self.dt

    return MCDResult(
        time=time_array,
        correlations=correlations,
    )

PMSD (Periodic Minimum Square Displacement)

Polarization Mean Square Displacement (PMSD) computation.

This module implements the PMSD method for computing polarization fluctuations in ionic systems from molecular dynamics trajectories.

Adapted from the tame library (https://github.com/Roy-Kid/tame).

PMSDCompute

PMSDCompute(cation_type, anion_type, max_dt, dt)

Bases: Compute['Trajectory', PMSDResult]

Compute Polarization Mean Square Displacement for ionic systems.

This class computes the PMSD which measures polarization fluctuations in ionic systems. The polarization is defined as:

P(t) = Σ_cations r_i(t) - Σ_anions r_j(t)

And the PMSD is:

PMSD(dt) = <(P(t+dt) - P(t))²>_t

Parameters:

Name Type Description Default
cation_type int

Atom type index for cations

required
anion_type int

Atom type index for anions

required
max_dt float

Maximum time lag in ps

required
dt float

Timestep in ps

required

Examples:

>>> from molpy.io import read_h5_trajectory
>>> traj = read_h5_trajectory("ionic_liquid.h5")
>>>
>>> # Compute PMSD for Li+ (type 1) and PF6- (type 2)
>>> pmsd = PMSDCompute(cation_type=1, anion_type=2, max_dt=30.0, dt=0.01)
>>> result = pmsd(traj)
>>>
>>> # Plot results
>>> import matplotlib.pyplot as plt
>>> plt.plot(result.time, result.pmsd)
>>> plt.xlabel("Time lag (ps)")
>>> plt.ylabel("PMSD (Ų)")
>>> plt.show()
Source code in src/molpy/compute/pmsd.py
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self,
    cation_type: int,
    anion_type: int,
    max_dt: float,
    dt: float,
):
    self.cation_type = cation_type
    self.anion_type = anion_type
    self.max_dt = max_dt
    self.dt = dt
    self.n_cache = int(max_dt / dt)

compute

compute(trajectory)

Compute PMSD from trajectory.

Parameters:

Name Type Description Default
trajectory 'Trajectory'

Input trajectory object

required

Returns:

Type Description
PMSDResult

PMSDResult containing time points and PMSD values

Source code in src/molpy/compute/pmsd.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def compute(self, trajectory: "Trajectory") -> PMSDResult:
    """Compute PMSD from trajectory.

    Args:
        trajectory: Input trajectory object

    Returns:
        PMSDResult containing time points and PMSD values
    """
    # Extract data from trajectory
    coords_list = []
    elems_list = []
    box_list = []

    for frame in trajectory:
        if "atoms" not in frame:
            raise ValueError("Frame must contain 'atoms' block")

        atoms = frame["atoms"]
        if "x" not in atoms or "y" not in atoms or "z" not in atoms:
            raise ValueError("Atoms block must contain x, y, z coordinates")
        if "type" not in atoms:
            raise ValueError("Atoms block must contain 'type' field")

        coords = np.column_stack([atoms["x"], atoms["y"], atoms["z"]])
        coords_list.append(coords)
        elems_list.append(atoms["type"])

        # Get box from metadata
        if "box" in frame.metadata:
            box = frame.metadata["box"]
            box_list.append(box)
        else:
            raise ValueError("Frame must contain box information in metadata")

    coords_traj = np.array(coords_list)  # (n_frames, n_atoms, 3)
    elems_traj = np.array(elems_list)  # (n_frames, n_atoms)

    # Get masks for cations and anions (use first frame types)
    cation_mask = elems_traj[0] == self.cation_type
    anion_mask = elems_traj[0] == self.anion_type

    # Extract coordinates for cations and anions
    coords_cations = coords_traj[:, cation_mask, :]
    coords_anions = coords_traj[:, anion_mask, :]

    # Unwrap coordinates using Box.diff_dr() directly
    n_frames = coords_traj.shape[0]

    # Unwrap cations
    coords_cations_unwrapped = np.zeros_like(coords_cations)
    coords_cations_unwrapped[0] = coords_cations[0].copy()
    for i in range(1, n_frames):
        displacement = coords_cations[i] - coords_cations[i - 1]
        displacement_mic = box_list[i].diff_dr(displacement)
        coords_cations_unwrapped[i] = (
            coords_cations_unwrapped[i - 1] + displacement_mic
        )

    # Unwrap anions
    coords_anions_unwrapped = np.zeros_like(coords_anions)
    coords_anions_unwrapped[0] = coords_anions[0].copy()
    for i in range(1, n_frames):
        displacement = coords_anions[i] - coords_anions[i - 1]
        displacement_mic = box_list[i].diff_dr(displacement)
        coords_anions_unwrapped[i] = (
            coords_anions_unwrapped[i - 1] + displacement_mic
        )

    # Compute total polarization at each frame
    # P(t) = Σ r_cation - Σ r_anion
    polarization = np.sum(coords_cations_unwrapped, axis=1) - np.sum(
        coords_anions_unwrapped, axis=1
    )  # (n_frames, 3)

    # Compute PMSD using time cache
    pmsd = self._compute_pmsd(polarization)

    time_array = np.arange(self.n_cache) * self.dt

    return PMSDResult(
        time=time_array,
        pmsd=pmsd,
    )

RDKit Integration

RDKit-based compute operations for RDKitAdapter.

This module provides compute nodes that operate on RDKitAdapter instances to perform RDKit-based molecular operations like 3D coordinate generation and geometry optimization.

Generate3D dataclass

Generate3D(add_hydrogens=True, sanitize=True, embed=True, optimize=True, max_embed_attempts=10, embed_random_seed=0, max_opt_iters=200, forcefield='UFF', update_internal=True)

Bases: Compute[RDKitAdapter, RDKitAdapter]

RDKit-based 3D generation pipeline for RDKitAdapter.

This compute node performs a configurable sequence of RDKit operations on a RDKitAdapter: - Add explicit hydrogens (optional) - Sanitize molecule (optional) - Generate 3D coordinates via embedding (optional) - Optimize geometry with force field (optional)

The node mutates the passed RDKitAdapter, updating both the external Chem.Mol and optionally the internal Atomistic structure.

Attributes:

Name Type Description
add_hydrogens bool

Whether to add explicit hydrogens before embedding

sanitize bool

Whether to sanitize the molecule

embed bool

Whether to perform 3D coordinate embedding

optimize bool

Whether to optimize geometry after embedding

max_embed_attempts int

Maximum number of embedding attempts

embed_random_seed int | None

Random seed for embedding (None for random)

max_opt_iters int

Maximum optimization iterations

forcefield str

Force field to use ("UFF" or "MMFF94")

update_internal bool

Whether to sync internal structure after modifications

Examples:

>>> from molpy.adapter.rdkit import RDKitAdapter
>>> from molpy.compute.rdkit import Generate3D
>>> from molpy import Atomistic
>>>
>>> # Create adapter from Atomistic
>>> atomistic = Atomistic()
>>> # ... add atoms and bonds ...
>>> adapter = RDKitAdapter(internal=atomistic)
>>>
>>> # Generate 3D coordinates
>>> op = Generate3D(
...     add_hydrogens=True,
...     sanitize=True,
...     embed=True,
...     optimize=True
... )
>>> adapter = op(adapter)  # Mutates adapter
>>>
>>> # Access updated structure
>>> updated_atomistic = adapter.get_internal()

compute

compute(input)

Execute the 3D generation pipeline.

Parameters:

Name Type Description Default
input RDKitAdapter

RDKitAdapter to process

required

Returns:

Type Description
RDKitAdapter

The same RDKitAdapter instance (mutated)

Raises:

Type Description
ValueError

If adapter has no external representation and sync fails

RuntimeError

If embedding or optimization fails

Source code in src/molpy/compute/rdkit.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def compute(self, input: RDKitAdapter) -> RDKitAdapter:
    """Execute the 3D generation pipeline.

    Args:
        input: RDKitAdapter to process

    Returns:
        The same RDKitAdapter instance (mutated)

    Raises:
        ValueError: If adapter has no external representation and sync fails
        RuntimeError: If embedding or optimization fails
    """
    if not input.has_external():
        input.sync_to_external()

    mol = input.get_external()
    mol = Chem.Mol(mol)

    if self.add_hydrogens:
        mol.UpdatePropertyCache(strict=False)
        # Store original atom count to identify newly added atoms
        original_count = mol.GetNumAtoms()
        mol = Chem.AddHs(mol, addCoords=True)
        # Assign MP_ID to newly added hydrogen atoms
        # Use negative indices to avoid conflicts with existing atoms
        for idx in range(original_count, mol.GetNumAtoms()):
            rd_atom = mol.GetAtomWithIdx(idx)
            if not rd_atom.HasProp(MP_ID):
                # Use negative index as temporary id for new atoms
                rd_atom.SetIntProp(MP_ID, -(idx - original_count + 1))

    if self.sanitize:
        try:
            Chem.SanitizeMol(mol)
        except Exception as e:
            raise RuntimeError(
                f"Sanitization failed: {e}. "
                "The molecule may have invalid valency or other issues."
            ) from e

    if self.embed:
        if mol.GetNumAtoms() == 0:
            raise ValueError("Cannot embed 3D coordinates for empty molecule")

        params = AllChem.ETKDGv3()  # type: ignore[attr-defined]
        if self.embed_random_seed is not None:
            params.randomSeed = int(self.embed_random_seed)
        params.useRandomCoords = True

        embed_result = AllChem.EmbedMolecule(mol, params)  # type: ignore[attr-defined]

        attempts = 1
        while embed_result == -1 and attempts < self.max_embed_attempts:
            params.useRandomCoords = True
            if self.embed_random_seed is not None:
                params.randomSeed = int(self.embed_random_seed) + attempts
            embed_result = AllChem.EmbedMolecule(mol, params)  # type: ignore[attr-defined]
            attempts += 1

        if embed_result == -1:
            raise RuntimeError(
                f"3D embedding failed after {self.max_embed_attempts} attempts. "
                "The molecule may be too large or have structural issues."
            )

    if self.optimize:
        # Use OptimizeGeometry recipe for optimization
        optimizer = OptimizeGeometry(
            max_opt_iters=self.max_opt_iters,
            forcefield=self.forcefield,
            update_internal=False,  # We'll update internal at the end
            raise_on_failure=False,  # Don't raise, just warn
        )
        input.set_external(mol)
        input = optimizer(input)
        mol = input.get_external()

    input.set_external(mol)

    if self.update_internal:
        input.sync_to_internal()

    return input

OptimizeGeometry dataclass

OptimizeGeometry(max_opt_iters=200, forcefield='UFF', update_internal=True, raise_on_failure=False)

Bases: Compute[RDKitAdapter, RDKitAdapter]

RDKit-based geometry optimization for RDKitAdapter.

This compute node optimizes the geometry of a molecule using RDKit's force field methods (UFF or MMFF94).

Attributes:

Name Type Description
max_opt_iters int

Maximum optimization iterations

forcefield str

Force field to use ("UFF" or "MMFF94")

update_internal bool

Whether to sync internal structure after optimization

raise_on_failure bool

Whether to raise exception on optimization failure (default: False) If False, warnings are issued but optimization continues

Examples:

>>> from molpy.adapter.rdkit import RDKitAdapter
>>> from molpy.compute.rdkit import OptimizeGeometry
>>>
>>> adapter = RDKitAdapter(internal=atomistic)
>>> optimizer = OptimizeGeometry(forcefield="UFF", max_opt_iters=200)
>>> adapter = optimizer(adapter)

compute

compute(input)

Execute geometry optimization.

Parameters:

Name Type Description Default
input RDKitAdapter

RDKitAdapter to process

required

Returns:

Type Description
RDKitAdapter

The same RDKitAdapter instance (mutated)

Raises:

Type Description
ValueError

If no conformer found

RuntimeError

If optimization fails and raise_on_failure=True

Source code in src/molpy/compute/rdkit.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def compute(self, input: RDKitAdapter) -> RDKitAdapter:
    """Execute geometry optimization.

    Args:
        input: RDKitAdapter to process

    Returns:
        The same RDKitAdapter instance (mutated)

    Raises:
        ValueError: If no conformer found
        RuntimeError: If optimization fails and raise_on_failure=True
    """
    if not input.has_external():
        input.sync_to_external()

    original_mol = input.get_external()
    # Create a copy to avoid modifying the original
    # Chem.Mol() creates a deep copy, but we need to ensure conformer is copied
    mol = Chem.Mol(original_mol)

    # Explicitly copy conformer if it wasn't copied automatically
    if mol.GetNumConformers() == 0 and original_mol.GetNumConformers() > 0:
        original_conf = original_mol.GetConformer()
        new_conf = Chem.Conformer(mol.GetNumAtoms())
        for i in range(mol.GetNumAtoms()):
            pos = original_conf.GetAtomPosition(i)
            new_conf.SetAtomPosition(i, pos)
        mol.AddConformer(new_conf, assignId=True)

    # Sanitize molecule to ensure proper atom types and hybridization
    # This is required for UFF/MMFF optimization
    try:
        Chem.SanitizeMol(mol)
    except Exception as e:
        # If sanitization fails, try with less strict settings
        try:
            mol.UpdatePropertyCache(strict=False)
            # Try to fix common issues
            for atom in mol.GetAtoms():
                if atom.GetHybridization() == Chem.HybridizationType.UNSPECIFIED:
                    # Try to infer hybridization from connectivity
                    num_neighbors = atom.GetDegree()
                    if num_neighbors == 0:
                        atom.SetHybridization(Chem.HybridizationType.S)
                    elif num_neighbors == 1:
                        atom.SetHybridization(Chem.HybridizationType.SP)
                    elif num_neighbors == 2:
                        atom.SetHybridization(Chem.HybridizationType.SP2)
                    elif num_neighbors == 3:
                        atom.SetHybridization(Chem.HybridizationType.SP3)
                    elif num_neighbors == 4:
                        atom.SetHybridization(Chem.HybridizationType.SP3D)
            mol.UpdatePropertyCache(strict=False)
        except Exception as e2:
            raise RuntimeError(
                f"Failed to prepare molecule for optimization: {e}. "
                f"Fallback also failed: {e2}. "
                "The molecule may have structural issues."
            ) from e

    # Update property cache before optimization (required by RDKit)
    mol.UpdatePropertyCache(strict=False)

    if mol.GetNumConformers() == 0:
        raise ValueError(
            "Cannot optimize geometry: no conformer found. "
            "Enable embedding or provide a molecule with coordinates."
        )

    if self.forcefield == "UFF":
        # Store coordinates before optimization to check if they changed
        conf_before = mol.GetConformer() if mol.GetNumConformers() > 0 else None
        coords_before = None
        if conf_before is not None:
            coords_before = [
                conf_before.GetAtomPosition(i) for i in range(mol.GetNumAtoms())
            ]

        opt_result = AllChem.UFFOptimizeMolecule(  # type: ignore[attr-defined]
            mol, maxIters=int(self.max_opt_iters)
        )

        # Check if coordinates actually changed
        coords_changed = False
        if conf_before is not None and mol.GetNumConformers() > 0:
            conf_after = mol.GetConformer()
            if coords_before is not None:
                for i in range(mol.GetNumAtoms()):
                    pos_before = coords_before[i]
                    pos_after = conf_after.GetAtomPosition(i)
                    # Use a more lenient threshold (1e-5) to detect coordinate changes
                    # This accounts for numerical precision and ensures we catch real optimizations
                    if (
                        abs(pos_before.x - pos_after.x) > 1e-5
                        or abs(pos_before.y - pos_after.y) > 1e-5
                        or abs(pos_before.z - pos_after.z) > 1e-5
                    ):
                        coords_changed = True
                        break

        if opt_result != 0:
            msg = (
                f"UFF optimization returned code {opt_result}. "
                f"Code 1 typically means convergence not reached within {self.max_opt_iters} iterations. "
                "The structure may still be improved."
            )
            if self.raise_on_failure:
                raise RuntimeError(msg)
            else:
                warnings.warn(msg, UserWarning)
        elif not coords_changed:
            # Optimization returned success (0) but coordinates didn't change
            # This can happen if the structure is already optimized
            # However, if there are long bonds, this might indicate an issue
            # Check if there are unusually long bonds that should have been optimized
            max_bond_length = 0.0
            if mol.GetNumConformers() > 0:
                conf = mol.GetConformer()
                for bond in mol.GetBonds():
                    begin_idx = bond.GetBeginAtomIdx()
                    end_idx = bond.GetEndAtomIdx()
                    pos1 = conf.GetAtomPosition(begin_idx)
                    pos2 = conf.GetAtomPosition(end_idx)
                    bond_length = (
                        (pos1.x - pos2.x) ** 2
                        + (pos1.y - pos2.y) ** 2
                        + (pos1.z - pos2.z) ** 2
                    ) ** 0.5
                    max_bond_length = max(max_bond_length, bond_length)

            if (
                max_bond_length > 2.0
            ):  # Typical bond length is ~1.5 Å, >2.0 is suspicious
                warnings.warn(
                    f"UFF optimization completed but coordinates did not change, "
                    f"despite long bonds detected (max bond length: {max_bond_length:.3f} Å). "
                    f"This may indicate the optimization did not work properly. "
                    f"Consider using MMFF94 force field or increasing max_opt_iters.",
                    UserWarning,
                )
            else:
                warnings.warn(
                    "UFF optimization completed but coordinates did not change. "
                    "The structure may already be optimized or at a local minimum. "
                    "For ring/cyclic structures, this often indicates the geometry is already optimal.",
                    UserWarning,
                )
    elif self.forcefield == "MMFF94":
        try:
            opt_result = AllChem.MMFFOptimizeMolecule(  # type: ignore[attr-defined]
                mol, maxIters=int(self.max_opt_iters)
            )
            if opt_result != 0:
                msg = (
                    f"MMFF94 optimization returned code {opt_result}. "
                    f"Code 1 typically means convergence not reached within {self.max_opt_iters} iterations."
                )
                if self.raise_on_failure:
                    raise RuntimeError(msg)
                else:
                    warnings.warn(msg, UserWarning)
        except Exception as e:
            msg = (
                f"MMFF94 optimization failed: {e}. "
                "MMFF parameters may not be available for this molecule."
            )
            if self.raise_on_failure:
                raise RuntimeError(msg) from e
            else:
                warnings.warn(msg, UserWarning)
    else:
        raise ValueError(
            f"Unknown force field: {self.forcefield}. Use 'UFF' or 'MMFF94'."
        )

    input.set_external(mol)

    if self.update_internal:
        # For geometry optimization, only update coordinates, preserve topology
        # This preserves bond types and other topology properties
        input.sync_to_internal(update_topology=False)

    return input

Result

Result classes for compute operations.

This module defines result types returned by compute operations.

MCDResult dataclass

MCDResult(meta=dict(), time=(lambda: np.array([]))(), correlations=dict())

Bases: TimeSeriesResult

Results from Mean Displacement Correlation calculation.

Attributes:

Name Type Description
time NDArray[float64]

Time lag values (in ps)

correlations dict[str, NDArray[float64]]

Dictionary mapping tag names to correlation function arrays (MSD values). Each array has shape (n_time_lags,)

PMSDResult dataclass

PMSDResult(meta=dict(), time=(lambda: np.array([]))(), pmsd=(lambda: np.array([]))())

Bases: TimeSeriesResult

Results from Polarization Mean Square Displacement calculation.

Attributes:

Name Type Description
time NDArray[float64]

Time lag values (in ps)

pmsd NDArray[float64]

Polarization MSD values at each time lag, shape (n_time_lags,)

Result dataclass

Result(meta=dict())

Base class for computation results.

Subclasses should define specific fields for their result data.

to_dict

to_dict()

Convert result to dictionary representation.

Source code in src/molpy/compute/result.py
24
25
26
def to_dict(self) -> dict[str, Any]:
    """Convert result to dictionary representation."""
    return {k: v for k, v in self.__dict__.items()}

TimeSeriesResult dataclass

TimeSeriesResult(meta=dict(), time=(lambda: np.array([]))())

Bases: Result

Base class for time-series analysis results.

Attributes:

Name Type Description
time NDArray[float64]

Time points for the analysis (in ps or frames)

Time Series

Time-series analysis operations for trajectory data.

This module provides utilities for computing time-correlation functions, mean squared displacements, and other time-series statistics commonly used in molecular dynamics trajectory analysis.

Adapted from the tame library (https://github.com/Roy-Kid/tame).

TimeAverage

TimeAverage(shape, dtype=np.float64, dropnan='partial')

Compute running time average with NaN handling.

This class accumulates data over time and computes the average, with options for handling NaN values.

Parameters:

Name Type Description Default
shape tuple[int, ...]

Shape of data arrays to average

required
dtype dtype

Data type for accumulated arrays

float64
dropnan Literal['none', 'partial', 'all']

How to handle NaN values: - 'none': Include NaN values in average (result may be NaN) - 'partial': Ignore individual NaN entries - 'all': Skip entire frame if any NaN is present

'partial'

Examples:

>>> avg = TimeAverage(shape=(10,), dropnan='partial')
>>> avg.update(np.array([1.0, 2.0, np.nan, 4.0]))
>>> avg.update(np.array([2.0, 3.0, 3.0, 5.0]))
>>> result = avg.get()  # [1.5, 2.5, 3.0, 4.5]
Source code in src/molpy/compute/time_series.py
102
103
104
105
106
107
108
109
110
111
112
113
def __init__(
    self,
    shape: tuple[int, ...],
    dtype: np.dtype = np.float64,
    dropnan: Literal["none", "partial", "all"] = "partial",
):
    self.shape = shape
    self.dtype = dtype
    self.dropnan = dropnan
    self.cumsum = np.zeros(shape, dtype=dtype)
    self.count = np.zeros(shape, dtype=dtype) if dropnan == "partial" else 0
    self._n_frames = 0

get

get()

Get current time-averaged value.

Returns:

Type Description
NDArray

Time-averaged data array

Source code in src/molpy/compute/time_series.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def get(self) -> NDArray:
    """Get current time-averaged value.

    Returns:
        Time-averaged data array
    """
    if isinstance(self.count, np.ndarray):
        # Avoid division by zero
        with np.errstate(divide="ignore", invalid="ignore"):
            result = self.cumsum / self.count
            result[self.count == 0] = np.nan
        return result
    else:
        if self.count == 0:
            return np.full(self.shape, np.nan, dtype=self.dtype)
        return self.cumsum / self.count

reset

reset()

Reset accumulator to initial state.

Source code in src/molpy/compute/time_series.py
162
163
164
165
166
167
168
169
def reset(self) -> None:
    """Reset accumulator to initial state."""
    self.cumsum.fill(0)
    if isinstance(self.count, np.ndarray):
        self.count.fill(0)
    else:
        self.count = 0
    self._n_frames = 0

update

update(new_data)

Add new data to running average.

Parameters:

Name Type Description Default
new_data NDArray

New data array to include in average

required
Source code in src/molpy/compute/time_series.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def update(self, new_data: NDArray) -> None:
    """Add new data to running average.

    Args:
        new_data: New data array to include in average
    """
    if new_data.shape != self.shape:
        raise ValueError(
            f"Data shape {new_data.shape} doesn't match expected shape {self.shape}"
        )

    nan_mask = np.isnan(new_data)

    if self.dropnan == "all":
        # Skip entire frame if any NaN present
        if nan_mask.any():
            return
        self.cumsum += new_data
        self.count += 1
    elif self.dropnan == "partial":
        # Ignore individual NaN entries
        self.cumsum += np.nan_to_num(new_data, nan=0.0)
        self.count += (~nan_mask).astype(self.dtype)
    else:  # dropnan == "none"
        # Include NaN values (may propagate NaN)
        self.cumsum += new_data
        self.count += 1

    self._n_frames += 1

TimeCache

TimeCache(cache_size, shape, dtype=np.float64, default_val=np.nan)

Cache previous N frames of trajectory data for correlation calculations.

This class maintains a rolling buffer of the most recent N frames of data, which is essential for computing time-correlation functions like MSD and ACF.

Parameters:

Name Type Description Default
cache_size int

Number of frames to cache (maximum time lag)

required
shape tuple[int, ...]

Shape of data arrays to cache (e.g., (n_atoms, 3) for coordinates)

required
dtype dtype

Data type for cached arrays

float64
default_val float

Default value to fill cache initially (default: NaN)

nan

Examples:

>>> cache = TimeCache(cache_size=100, shape=(10, 3))
>>> coords = np.random.randn(10, 3)
>>> cache.update(coords)
>>> cached_data = cache.get()  # Shape: (100, 10, 3)
Source code in src/molpy/compute/time_series.py
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    cache_size: int,
    shape: tuple[int, ...],
    dtype: np.dtype = np.float64,
    default_val: float = np.nan,
):
    self.cache_size = cache_size
    self.shape = shape
    self.dtype = dtype
    # Initialize cache with default values
    self.cache = np.full((cache_size, *shape), default_val, dtype=dtype)
    self._count = 0

get

get()

Get cached data array.

Returns:

Type Description
NDArray

Cached data with shape (cache_size, *data_shape)

Source code in src/molpy/compute/time_series.py
67
68
69
70
71
72
73
def get(self) -> NDArray:
    """Get cached data array.

    Returns:
        Cached data with shape (cache_size, *data_shape)
    """
    return self.cache

reset

reset()

Reset cache to initial state.

Source code in src/molpy/compute/time_series.py
75
76
77
78
def reset(self) -> None:
    """Reset cache to initial state."""
    self.cache.fill(np.nan)
    self._count = 0

update

update(new_data)

Add new frame to cache, shifting older frames.

Parameters:

Name Type Description Default
new_data NDArray

New data array to add (shape must match self.shape)

required
Source code in src/molpy/compute/time_series.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def update(self, new_data: NDArray) -> None:
    """Add new frame to cache, shifting older frames.

    Args:
        new_data: New data array to add (shape must match self.shape)
    """
    if new_data.shape != self.shape:
        raise ValueError(
            f"Data shape {new_data.shape} doesn't match cache shape {self.shape}"
        )

    # Shift cache and add new data at front
    new_val = np.expand_dims(new_data, 0)
    self.cache = np.concatenate([new_val, self.cache], axis=0)[:-1]
    self._count += 1

compute_acf

compute_acf(data, cache_size, dropnan='partial')

Compute autocorrelation function over trajectory.

Calculates: _{i,t}

The particle dimension is averaged, and the time dimension is accumulated using a rolling cache to compute correlations at different time lags.

Parameters:

Name Type Description Default
data NDArray

Trajectory data with shape (n_frames, n_particles, n_dim)

required
cache_size int

Maximum time lag (dt) to compute, in frames

required
dropnan Literal['none', 'partial', 'all']

How to handle NaN values in averaging

'partial'

Returns:

Type Description
NDArray

ACF array with shape (cache_size,) containing ACF at each time lag

Examples:

>>> # Velocity autocorrelation
>>> n_frames, n_particles = 1000, 100
>>> velocities = np.random.randn(n_frames, n_particles, 3)
>>> acf = compute_acf(velocities, cache_size=100)
Source code in src/molpy/compute/time_series.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def compute_acf(
    data: NDArray,
    cache_size: int,
    dropnan: Literal["none", "partial", "all"] = "partial",
) -> NDArray:
    """Compute autocorrelation function over trajectory.

    Calculates: <v_i(0) · v_i(dt)>_{i,t}

    The particle dimension is averaged, and the time dimension is accumulated
    using a rolling cache to compute correlations at different time lags.

    Args:
        data: Trajectory data with shape (n_frames, n_particles, n_dim)
        cache_size: Maximum time lag (dt) to compute, in frames
        dropnan: How to handle NaN values in averaging

    Returns:
        ACF array with shape (cache_size,) containing ACF at each time lag

    Examples:
        >>> # Velocity autocorrelation
        >>> n_frames, n_particles = 1000, 100
        >>> velocities = np.random.randn(n_frames, n_particles, 3)
        >>> acf = compute_acf(velocities, cache_size=100)
    """
    n_frames, n_particles, n_dim = data.shape

    # Initialize cache and accumulator
    cache = TimeCache(cache_size, shape=(n_particles, n_dim))
    avg = TimeAverage(shape=(cache_size,), dropnan=dropnan)

    # Iterate through trajectory
    for frame_idx in range(n_frames):
        current_data = data[frame_idx]
        cache.update(current_data)

        # Compute dot product with all cached frames
        cached_data = cache.get()  # (cache_size, n_particles, n_dim)
        dot_product = cached_data * current_data[np.newaxis, :, :]

        # Sum over dimensions and average over particles
        acf_frame = np.mean(np.sum(dot_product, axis=2), axis=1)  # (cache_size,)

        # Accumulate time average
        avg.update(acf_frame)

    return avg.get()

compute_msd

compute_msd(data, cache_size, dropnan='partial')

Compute mean squared displacement over trajectory.

Calculates: <(r_i(t+dt) - r_i(t))^2>_{i,t}

The particle dimension is averaged, and the time dimension is accumulated using a rolling cache to compute correlations at different time lags.

Parameters:

Name Type Description Default
data NDArray

Trajectory data with shape (n_frames, n_particles, n_dim)

required
cache_size int

Maximum time lag (dt) to compute, in frames

required
dropnan Literal['none', 'partial', 'all']

How to handle NaN values in averaging

'partial'

Returns:

Type Description
NDArray

MSD array with shape (cache_size,) containing MSD at each time lag

Examples:

>>> # Simple 1D random walk
>>> n_frames, n_particles = 1000, 100
>>> positions = np.cumsum(np.random.randn(n_frames, n_particles, 1), axis=0)
>>> msd = compute_msd(positions, cache_size=100)
>>> # MSD should grow linearly with time for random walk
Source code in src/molpy/compute/time_series.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def compute_msd(
    data: NDArray,
    cache_size: int,
    dropnan: Literal["none", "partial", "all"] = "partial",
) -> NDArray:
    """Compute mean squared displacement over trajectory.

    Calculates: <(r_i(t+dt) - r_i(t))^2>_{i,t}

    The particle dimension is averaged, and the time dimension is accumulated
    using a rolling cache to compute correlations at different time lags.

    Args:
        data: Trajectory data with shape (n_frames, n_particles, n_dim)
        cache_size: Maximum time lag (dt) to compute, in frames
        dropnan: How to handle NaN values in averaging

    Returns:
        MSD array with shape (cache_size,) containing MSD at each time lag

    Examples:
        >>> # Simple 1D random walk
        >>> n_frames, n_particles = 1000, 100
        >>> positions = np.cumsum(np.random.randn(n_frames, n_particles, 1), axis=0)
        >>> msd = compute_msd(positions, cache_size=100)
        >>> # MSD should grow linearly with time for random walk
    """
    n_frames, n_particles, n_dim = data.shape

    # Initialize cache and accumulator
    cache = TimeCache(cache_size, shape=(n_particles, n_dim))
    avg = TimeAverage(shape=(cache_size,), dropnan=dropnan)

    # Iterate through trajectory
    for frame_idx in range(n_frames):
        current_data = data[frame_idx]
        cache.update(current_data)

        # Compute displacement from current frame to all cached frames
        cached_data = cache.get()  # (cache_size, n_particles, n_dim)
        displacement = cached_data - current_data[np.newaxis, :, :]

        # Compute squared displacement and average over particles
        sq_displacement = np.sum(displacement**2, axis=2)  # (cache_size, n_particles)
        msd_frame = np.mean(sq_displacement, axis=1)  # (cache_size,)

        # Accumulate time average
        avg.update(msd_frame)

    return avg.get()