Writing High-Frequency Trajectories When an Event is Detected

In this tutorial, we will showcase how dupin’s online capabilities can be used to trigger HOOMD’s burst writer to dump high-resolution trajectory data only during specific events detected by dupin. This approach has significant implications for performance and storage. The main advantage is that it allows us to write to disk only the segments of the trajectory that are of interest, at an extremely high frequency, without dumping the entire trajectory.

To achieve this, we create a temporary buffer that stores trajectory frames in memory. When dupin triggers, this buffer is dumped onto the disk. Online detection requires some CPU cycles to run periodically, which can impact performance of the MD program depending on the order parameters used for detection. In practice, it is often sufficient to track very simple properties that are computed by the MD driver (in this case, HOOMD) anyway, such as pressure, total system energy, or volume (depending on whether we are running NVT/NVE or NPT ensembles).

The render function in the next (hidden) cell will render the snapshot at the detected change point using fresnel. You can find the source code for this function on github.

Here, we provide an example. First we melt a simple cubic crystal of Lennard-Jones particles, melt the crystal, compress the liquid to required density and equilibrate:

[2]:
# %%
import itertools
import warnings

import gsd
import gsd.hoomd
import hoomd
import hoomd.md

warnings.filterwarnings("ignore")

# make initial system
m = 9
N_particles = 4 * m**3
spacing = 1.3
K = math.ceil(N_particles ** (1 / 3))
L = K * spacing
x = np.linspace(-L / 2, L / 2, K, endpoint=False)
position = list(itertools.product(x, repeat=3))
N_B = N_particles // 20

frame = gsd.hoomd.Frame()
frame.particles.N = N_particles
frame.particles.position = position[0:N_particles]
frame.particles.typeid = [0] * N_particles
frame.configuration.box = [L, L, L, 0, 0, 0]
frame.particles.types = ["A", "B"]
typeid = np.asarray(frame.particles.typeid)
typeid[frame.particles.N // 2 : frame.particles.N // 2 + N_B] = 1
frame.particles.typeid = typeid

with gsd.hoomd.open(name="lattice.gsd", mode="w") as f:
    f.append(frame)
print("Initial lattice")
render(frame)

# melt the crystal

cpu = hoomd.device.CPU()
simulation = hoomd.Simulation(device=cpu, seed=667)
simulation.create_state_from_gsd(filename="lattice.gsd")

integrator = hoomd.md.Integrator(dt=0.005)
cell = hoomd.md.nlist.Cell(buffer=0.4)
lj = hoomd.md.pair.LJ(nlist=cell)
lj.params[("A", "A")] = dict(epsilon=1, sigma=1)
lj.params[("A", "B")] = dict(epsilon=1, sigma=1)
lj.params[("B", "B")] = dict(epsilon=1, sigma=1)
lj.r_cut[("A", "A")] = 2.5
lj.r_cut[("A", "B")] = 2.5
lj.r_cut[("B", "B")] = 2.5
integrator.forces.append(lj)

nvt = hoomd.md.methods.ConstantVolume(
    filter=hoomd.filter.All(),
    thermostat=hoomd.md.methods.thermostats.Bussi(kT=1.5),
)
integrator.methods.append(nvt)
simulation.operations.integrator = integrator
simulation.state.thermalize_particle_momenta(filter=hoomd.filter.All(), kT=1.5)
thermodynamic_properties = hoomd.md.compute.ThermodynamicQuantities(
    filter=hoomd.filter.All()
)
simulation.operations.computes.append(thermodynamic_properties)
simulation.run(10000)
hoomd.write.GSD.write(state=simulation.state, filename="random.gsd", mode="wb")
print("randomized configuration")
render(simulation.state.get_snapshot())

# compress the system
ramp = hoomd.variant.Ramp(A=0, B=1, t_start=simulation.timestep, t_ramp=20000)
steps = range(0, 40000, 20)
y = [ramp(step) for step in steps]
rho = simulation.state.N_particles / simulation.state.box.volume
initial_box = simulation.state.box
final_box = hoomd.Box.from_box(initial_box)  # make a copy of initial_box
final_rho = 0.8
final_box.volume = simulation.state.N_particles / final_rho
box_resize_trigger = hoomd.trigger.Periodic(10)
box_resize = hoomd.update.BoxResize(
    box1=initial_box, box2=final_box, variant=ramp, trigger=box_resize_trigger
)
simulation.operations.updaters.append(box_resize)
simulation.run(20001)
simulation.operations.updaters.remove(box_resize)
hoomd.write.GSD.write(
    state=simulation.state, filename="compressed.gsd", mode="wb"
)

print("Compressed configuration")
render(simulation.state.get_snapshot())

simulation.run(50000)
hoomd.write.GSD.write(
    state=simulation.state, filename="equilibrated.gsd", mode="wb"
)
print("Equilibrated system")
render(simulation.state.get_snapshot())
Initial lattice
randomized configuration
Compressed configuration
Equilibrated system
[2]:
../../_images/tutorials_02-HOOMD-burst-writer-with-online-detection_HOOMD-burst-with-online_2_1.png

Burst writer

Finally, we run the equilibrated LJ system for 25000 steps, then change the LJ potential of particle type 2 to be much stronger and shorter ranged. This should initiate a nucleation event of only part of the system. After 50000 steps we again change the LJ potential of particle type 2 to be the same as interactions of particle type 1. To detect these events we implement a custom Online Writer which dumps 2500 frames spaced 10 time steps apart each time dupin detects a transition. Each event will be written in a new file starting with burst and ending with .gsd . We monitor potential energy to detect possible events in this example. Keep in mind that this cell might take a while to run:

[3]:
import ruptures as rpt

import dupin as du

gsd_writer = hoomd.write.GSD(
    filename="trajectory.gsd", trigger=hoomd.trigger.Periodic(1000), mode="wb"
)
log = hoomd.logging.Logger()
log.add(simulation, quantities=["timestep", "walltime"])
log.add(lj, quantities=["energy", "energies"])
log.add(thermodynamic_properties)
gsd_writer.logger = log

simulation.operations.writers.append(gsd_writer)


class OnlineWriter(hoomd.custom.Action):
    def __init__(
        self,
        burst_trigger,  # how many time steps to wait before adding a frame to burst buffer
        filename_base,
        max_burst_size,  # maximum number of frames in burst buffer
        signal_aggregator,
        sweep_detector,
        potential,
        simulation: hoomd.Simulation,
        custom_writer_trigger,  # how often to check event detection
        skip_first_N=0,
    ):
        self.starting_simulation_time = simulation.timestep
        self.dupin_event_times = []
        self.custom_writer_trigger = custom_writer_trigger
        self.skip_first_N = skip_first_N
        self.counter = 0
        self.potential = potential
        self.trigger = burst_trigger
        self.filename_base = filename_base
        self.max_burst_size = max_burst_size
        self.simulation = simulation
        self.burst_event_reporter = []
        self.make_new_burst()
        self.transitioning = False
        self.transition_timestep = None
        self.signal_aggregator = signal_aggregator
        self.sweep_detector = sweep_detector
        self.burst_size_in_simulation_time = (
            self.max_burst_size * self.trigger.period
        )
        self.burst_size_in_signal_time = (
            self.burst_size_in_simulation_time / custom_writer_trigger.period
        )
        self.transitioning_transitions = []

    def act(self, timestep):
        data = self.potential.energies
        self.signal_aggregator.accumulate(data)
        current_simulation_time = timestep - self.starting_simulation_time
        # check if skip_first_N is satisfied, and burst buffer is full,
        if (
            self.counter > 0 or self.skip_first_N < current_simulation_time
        ) and len(self.burst_writer) == self.max_burst_size:
            check = self.sweep_detector.fit(
                self.signal_aggregator.to_dataframe()
            )
            new_check = []
            for detected_online_event in check:
                actual_detected_online_event = (
                    current_simulation_time / self.custom_writer_trigger.period
                    - len(self.signal_aggregator.signals)
                    + detected_online_event
                )
                # check if the event detected is within the burst buffer still
                if (
                    np.abs(
                        actual_detected_online_event
                        - current_simulation_time
                        / self.custom_writer_trigger.period
                    )
                    > self.burst_size_in_signal_time
                ):
                    continue
                isnew = True
                # check if it overlaps with previously detected events
                for event in self.dupin_event_times:
                    if (
                        np.abs(actual_detected_online_event - event)
                        < self.burst_size_in_signal_time
                    ):
                        isnew = False
                        break
                if isnew:
                    new_check.append(actual_detected_online_event)
            if len(new_check) > 0:
                self.dupin_event_times.append(new_check[-1])
                self.burst_event_reporter[-1][
                    "event_timesteps_simulation_time"
                ].append(
                    2 * current_simulation_time
                    - (new_check[-1] + len(self.signal_aggregator.signals))
                    * self.custom_writer_trigger.period
                )
                self.burst_event_reporter[-1][
                    "event_timesteps_signal_time"
                ].append(new_check[-1])

                if not self.transitioning:
                    self.transitioning = True
                self.transitioning_transitions.append(
                    timestep
                    - np.abs(
                        new_check[-1] * self.custom_writer_trigger.period
                        - current_simulation_time
                    )
                )
                self.transitioning_transitions.append(
                    self.starting_simulation_time
                    + new_check[-1] * self.custom_writer_trigger.period
                )
                self.transition_timestep = np.mean(
                    self.transitioning_transitions
                )

            if self.transitioning:
                buffer_end_when_centered_at_transition_timestep = (
                    self.transition_timestep
                    + self.burst_size_in_simulation_time * 0.5
                )
                if buffer_end_when_centered_at_transition_timestep <= timestep:
                    self.pop_burst(timestep)

    def pop_burst(self, timestep):
        self.transitioning_transitions = []
        self.burst_event_reporter[-1]["burst_end_timestep_simulation_time"] = (
            timestep
        )
        self.burst_event_reporter[-1]["burst_end_timestep_signal_time"] = len(
            self.signal_aggregator.signals
        )
        self.burst_event_reporter[-1][
            "burst_start_timestep_simulation_time"
        ] = timestep - self.burst_size_in_simulation_time
        self.burst_event_reporter[-1]["burst_start_timestep_signal_time"] = (
            len(self.signal_aggregator.signals) - self.burst_size_in_signal_time
        )
        # for HOOMD 5.0 or later burst writer can be selectively dumped for a subset of
        # frames if desired.
        self.burst_writer.dump()
        self.burst_writer.flush()
        self.simulation.operations.writers.remove(self.burst_writer)
        self.counter += 1
        self.make_new_burst()
        self.transitioning = False

    def make_new_burst(self):
        full_filename_base = self.filename_base + f"{self.counter}"
        full_filename = self.filename_base + f"{self.counter}.gsd"
        report = dict(
            name=full_filename_base,
            filename=full_filename,
            event_timesteps_simulation_time=[],
            event_timesteps_signal_time=[],
            burst_start_timestep_simulation_time=0,
            burst_end_timestep_simulation_time=0,
            burst_start_timestep_signal_time=0,
            burst_end_timestep_signal_time=0,
        )
        self.burst_event_reporter.append(report)
        log = hoomd.logging.Logger()
        log.add(self.simulation, quantities=["timestep", "walltime"])
        log.add(self.potential, quantities=["energy", "energies"])
        self.burst_writer = hoomd.write.Burst(
            trigger=self.trigger,
            filename=full_filename,
            max_burst_size=self.max_burst_size,
            write_at_start=True,
            mode="wb",
        )
        self.burst_writer.logger = log
        self.simulation.operations.writers.append(self.burst_writer)


def custom_dupin_generator_fn(data_du):
    return {"energy": data_du}


custom_writer_trigger = hoomd.trigger.Periodic(1000)

signal_aggregator = du.data.aggregate.SignalAggregator(
    du.data.base.CustomGenerator(custom_dupin_generator_fn).pipe(
        du.data.reduce.NthGreatest(
            (
                -10,
                -50,
            )
        )
    ),
)

sweep_detector = du.detect.SweepDetector(
    rpt.Dynp(custom_cost=du.detect.CostLinearFit()),
    max_change_points=3,
)

dupin_online_writer = hoomd.write.CustomWriter(
    action=OnlineWriter(
        hoomd.trigger.Periodic(10),
        "burst",
        2500,
        signal_aggregator,
        sweep_detector,
        lj,
        simulation,
        custom_writer_trigger,
        10000,
    ),
    trigger=custom_writer_trigger,
)
simulation.operations += dupin_online_writer


simulation.run(25000)
## Apply the new potential
integrator.forces[-1].params[("B", "B")] = dict(epsilon=2.5, sigma=0.5)
simulation.run(100000)

# apply old potential
integrator.forces[-1].params[("B", "B")] = dict(epsilon=1, sigma=1)
simulation.run(25000)
gsd_writer.flush()
print("events detected by online burst writer")
print(dupin_online_writer.action.burst_event_reporter)
events detected by online burst writer
[{'name': 'burst0', 'filename': 'burst0.gsd', 'event_timesteps_simulation_time': [6999.000000000007], 'event_timesteps_signal_time': [19.999], 'burst_start_timestep_simulation_time': 88000, 'burst_end_timestep_simulation_time': 113000, 'burst_start_timestep_signal_time': 8.0, 'burst_end_timestep_signal_time': 33}, {'name': 'burst1', 'filename': 'burst1.gsd', 'event_timesteps_simulation_time': [13999.0], 'event_timesteps_signal_time': [124.999], 'burst_start_timestep_simulation_time': 194000, 'burst_end_timestep_simulation_time': 219000, 'burst_start_timestep_signal_time': 114.0, 'burst_end_timestep_signal_time': 139}, {'name': 'burst2', 'filename': 'burst2.gsd', 'event_timesteps_simulation_time': [], 'event_timesteps_signal_time': [], 'burst_start_timestep_simulation_time': 0, 'burst_end_timestep_simulation_time': 0, 'burst_start_timestep_signal_time': 0, 'burst_end_timestep_signal_time': 0}]

(OPTIONAL) Render key events

Renders first snapshot, and for each event renders snapshot of first frame of burst, event point frame, and last burst frame, and last frame of the simulation. This block can take a couple of minutes to render, and it can be skipped.

[4]:
import gsd.hoomd

# render start, before, middle and after for each event detected and end of sim.
frames_to_render = [0]
for trans in dupin_online_writer.action.burst_event_reporter[:-1]:
    frames_to_render.append(int(trans["burst_start_timestep_signal_time"]))
    frames_to_render.append(int(round(trans["event_timesteps_signal_time"][0])))
    frames_to_render.append(int(trans["burst_end_timestep_signal_time"]))
frames_to_render.append(-1)
with gsd.hoomd.open("trajectory.gsd") as f:
    frames = []
    for ff in frames_to_render:
        frames.append(f[int(ff)])
    render_movie(frames)
../../_images/tutorials_02-HOOMD-burst-writer-with-online-detection_HOOMD-burst-with-online_6_0.png

Detected events and burst coverage

In the following plot we show how the burst writer covered the following parts of the trajectory based on the detected events:

[5]:
import gsd.hoomd
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D

# Example data structures
burst_ends = []
change_points = []
for trans in dupin_online_writer.action.burst_event_reporter[:-1]:
    burst_ends.append(
        (
            trans["burst_start_timestep_signal_time"],
            trans["burst_end_timestep_signal_time"],
        )
    )
    change_points.extend(trans["event_timesteps_signal_time"])


def custom_dupin_generator_fn(data_du):
    return {"energy": data_du}


custom_writer_trigger = hoomd.trigger.Periodic(1000)

signal_aggregator = du.data.aggregate.SignalAggregator(
    du.data.base.CustomGenerator(custom_dupin_generator_fn).pipe(
        du.data.reduce.NthGreatest(
            (
                -10,
                -50,
            )
        )
    )
)

energy = []
timesteps = []
with gsd.hoomd.open("trajectory.gsd") as f:
    for frame in f:
        energy.append(frame.log["particles/md/pair/LJ/energies"])
        signal_aggregator.accumulate(frame.log["particles/md/pair/LJ/energies"])
        timesteps.append(frame.configuration.step)

# Plot the data
data = signal_aggregator.to_dataframe().to_numpy()
lines = plt.plot(
    data,
    label=signal_aggregator.to_dataframe().columns.to_list(),
)

for change_point in change_points:
    plt.axvline(change_point, color="k", linestyle="--")

# Highlight burst regions
for burst_start, burst_end in burst_ends:
    plt.axvspan(burst_start, burst_end, color="red", alpha=0.3)

# Create custom legend handles
legend_elements = [
    Line2D([0], [0], color="k", linestyle="--", label="Change Point"),
    Line2D([0], [0], color="red", lw=4, alpha=0.3, label="Burst Region"),
]
legend_elements.extend([line for line in lines])

plt.xlabel("signal time")
plt.ylabel("Energy")

plt.title("Energy vs Time with Highlighted Burst Regions and Change Points")
plt.legend(handles=legend_elements)
plt.show()
../../_images/tutorials_02-HOOMD-burst-writer-with-online-detection_HOOMD-burst-with-online_8_0.png