Writing High-Frequency Trajectories When an Event is Detected¶
In this tutorial, we will showcase how dupin’s online capabilities can be used to trigger HOOMD’s burst writer to dump high-resolution trajectory data only during specific events detected by dupin. This approach has significant implications for performance and storage. The main advantage is that it allows us to write to disk only the segments of the trajectory that are of interest, at an extremely high frequency, without dumping the entire trajectory.
To achieve this, we create a temporary buffer that stores trajectory frames in memory. When dupin triggers, this buffer is dumped onto the disk. Online detection requires some CPU cycles to run periodically, which can impact performance of the MD program depending on the order parameters used for detection. In practice, it is often sufficient to track very simple properties that are computed by the MD driver (in this case, HOOMD) anyway, such as pressure, total system energy, or volume (depending on whether we are running NVT/NVE or NPT ensembles).
The render function in the next (hidden) cell will render the snapshot at the detected change point using fresnel. You can find the source code for this function on github.
Here, we provide an example. First we melt a simple cubic crystal of Lennard-Jones particles, melt the crystal, compress the liquid to required density and equilibrate:
[2]:
# %%
import itertools
import warnings
import gsd
import gsd.hoomd
import hoomd
import hoomd.md
warnings.filterwarnings("ignore")
# make initial system
m = 9
N_particles = 4 * m**3
spacing = 1.3
K = math.ceil(N_particles ** (1 / 3))
L = K * spacing
x = np.linspace(-L / 2, L / 2, K, endpoint=False)
position = list(itertools.product(x, repeat=3))
N_B = N_particles // 20
frame = gsd.hoomd.Frame()
frame.particles.N = N_particles
frame.particles.position = position[0:N_particles]
frame.particles.typeid = [0] * N_particles
frame.configuration.box = [L, L, L, 0, 0, 0]
frame.particles.types = ["A", "B"]
typeid = np.asarray(frame.particles.typeid)
typeid[frame.particles.N // 2 : frame.particles.N // 2 + N_B] = 1
frame.particles.typeid = typeid
with gsd.hoomd.open(name="lattice.gsd", mode="w") as f:
f.append(frame)
print("Initial lattice")
render(frame)
# melt the crystal
cpu = hoomd.device.CPU()
simulation = hoomd.Simulation(device=cpu, seed=667)
simulation.create_state_from_gsd(filename="lattice.gsd")
integrator = hoomd.md.Integrator(dt=0.005)
cell = hoomd.md.nlist.Cell(buffer=0.4)
lj = hoomd.md.pair.LJ(nlist=cell)
lj.params[("A", "A")] = dict(epsilon=1, sigma=1)
lj.params[("A", "B")] = dict(epsilon=1, sigma=1)
lj.params[("B", "B")] = dict(epsilon=1, sigma=1)
lj.r_cut[("A", "A")] = 2.5
lj.r_cut[("A", "B")] = 2.5
lj.r_cut[("B", "B")] = 2.5
integrator.forces.append(lj)
nvt = hoomd.md.methods.ConstantVolume(
filter=hoomd.filter.All(),
thermostat=hoomd.md.methods.thermostats.Bussi(kT=1.5),
)
integrator.methods.append(nvt)
simulation.operations.integrator = integrator
simulation.state.thermalize_particle_momenta(filter=hoomd.filter.All(), kT=1.5)
thermodynamic_properties = hoomd.md.compute.ThermodynamicQuantities(
filter=hoomd.filter.All()
)
simulation.operations.computes.append(thermodynamic_properties)
simulation.run(10000)
hoomd.write.GSD.write(state=simulation.state, filename="random.gsd", mode="wb")
print("randomized configuration")
render(simulation.state.get_snapshot())
# compress the system
ramp = hoomd.variant.Ramp(A=0, B=1, t_start=simulation.timestep, t_ramp=20000)
steps = range(0, 40000, 20)
y = [ramp(step) for step in steps]
rho = simulation.state.N_particles / simulation.state.box.volume
initial_box = simulation.state.box
final_box = hoomd.Box.from_box(initial_box) # make a copy of initial_box
final_rho = 0.8
final_box.volume = simulation.state.N_particles / final_rho
box_resize_trigger = hoomd.trigger.Periodic(10)
box_resize = hoomd.update.BoxResize(
box1=initial_box, box2=final_box, variant=ramp, trigger=box_resize_trigger
)
simulation.operations.updaters.append(box_resize)
simulation.run(20001)
simulation.operations.updaters.remove(box_resize)
hoomd.write.GSD.write(
state=simulation.state, filename="compressed.gsd", mode="wb"
)
print("Compressed configuration")
render(simulation.state.get_snapshot())
simulation.run(50000)
hoomd.write.GSD.write(
state=simulation.state, filename="equilibrated.gsd", mode="wb"
)
print("Equilibrated system")
render(simulation.state.get_snapshot())
Initial lattice
randomized configuration
Compressed configuration
Equilibrated system
[2]:
Burst writer¶
Finally, we run the equilibrated LJ system for 25000 steps, then change the LJ potential of particle type 2 to be much stronger and shorter ranged. This should initiate a nucleation event of only part of the system. After 50000 steps we again change the LJ potential of particle type 2 to be the same as interactions of particle type 1. To detect these events we implement a custom Online Writer which dumps 2500 frames spaced 10 time steps apart each time dupin detects a transition. Each event will be written in a new file starting with burst and ending with .gsd . We monitor potential energy to detect possible events in this example. Keep in mind that this cell might take a while to run:
[3]:
import ruptures as rpt
import dupin as du
gsd_writer = hoomd.write.GSD(
filename="trajectory.gsd", trigger=hoomd.trigger.Periodic(1000), mode="wb"
)
log = hoomd.logging.Logger()
log.add(simulation, quantities=["timestep", "walltime"])
log.add(lj, quantities=["energy", "energies"])
log.add(thermodynamic_properties)
gsd_writer.logger = log
simulation.operations.writers.append(gsd_writer)
class OnlineWriter(hoomd.custom.Action):
def __init__(
self,
burst_trigger, # how many time steps to wait before adding a frame to burst buffer
filename_base,
max_burst_size, # maximum number of frames in burst buffer
signal_aggregator,
sweep_detector,
potential,
simulation: hoomd.Simulation,
custom_writer_trigger, # how often to check event detection
skip_first_N=0,
):
self.starting_simulation_time = simulation.timestep
self.dupin_event_times = []
self.custom_writer_trigger = custom_writer_trigger
self.skip_first_N = skip_first_N
self.counter = 0
self.potential = potential
self.trigger = burst_trigger
self.filename_base = filename_base
self.max_burst_size = max_burst_size
self.simulation = simulation
self.burst_event_reporter = []
self.make_new_burst()
self.transitioning = False
self.transition_timestep = None
self.signal_aggregator = signal_aggregator
self.sweep_detector = sweep_detector
self.burst_size_in_simulation_time = (
self.max_burst_size * self.trigger.period
)
self.burst_size_in_signal_time = (
self.burst_size_in_simulation_time / custom_writer_trigger.period
)
self.transitioning_transitions = []
def act(self, timestep):
data = self.potential.energies
self.signal_aggregator.accumulate(data)
current_simulation_time = timestep - self.starting_simulation_time
# check if skip_first_N is satisfied, and burst buffer is full,
if (
self.counter > 0 or self.skip_first_N < current_simulation_time
) and len(self.burst_writer) == self.max_burst_size:
check = self.sweep_detector.fit(
self.signal_aggregator.to_dataframe()
)
new_check = []
for detected_online_event in check:
actual_detected_online_event = (
current_simulation_time / self.custom_writer_trigger.period
- len(self.signal_aggregator.signals)
+ detected_online_event
)
# check if the event detected is within the burst buffer still
if (
np.abs(
actual_detected_online_event
- current_simulation_time
/ self.custom_writer_trigger.period
)
> self.burst_size_in_signal_time
):
continue
isnew = True
# check if it overlaps with previously detected events
for event in self.dupin_event_times:
if (
np.abs(actual_detected_online_event - event)
< self.burst_size_in_signal_time
):
isnew = False
break
if isnew:
new_check.append(actual_detected_online_event)
if len(new_check) > 0:
self.dupin_event_times.append(new_check[-1])
self.burst_event_reporter[-1][
"event_timesteps_simulation_time"
].append(
2 * current_simulation_time
- (new_check[-1] + len(self.signal_aggregator.signals))
* self.custom_writer_trigger.period
)
self.burst_event_reporter[-1][
"event_timesteps_signal_time"
].append(new_check[-1])
if not self.transitioning:
self.transitioning = True
self.transitioning_transitions.append(
timestep
- np.abs(
new_check[-1] * self.custom_writer_trigger.period
- current_simulation_time
)
)
self.transitioning_transitions.append(
self.starting_simulation_time
+ new_check[-1] * self.custom_writer_trigger.period
)
self.transition_timestep = np.mean(
self.transitioning_transitions
)
if self.transitioning:
buffer_end_when_centered_at_transition_timestep = (
self.transition_timestep
+ self.burst_size_in_simulation_time * 0.5
)
if buffer_end_when_centered_at_transition_timestep <= timestep:
self.pop_burst(timestep)
def pop_burst(self, timestep):
self.transitioning_transitions = []
self.burst_event_reporter[-1]["burst_end_timestep_simulation_time"] = (
timestep
)
self.burst_event_reporter[-1]["burst_end_timestep_signal_time"] = len(
self.signal_aggregator.signals
)
self.burst_event_reporter[-1][
"burst_start_timestep_simulation_time"
] = timestep - self.burst_size_in_simulation_time
self.burst_event_reporter[-1]["burst_start_timestep_signal_time"] = (
len(self.signal_aggregator.signals) - self.burst_size_in_signal_time
)
# for HOOMD 5.0 or later burst writer can be selectively dumped for a subset of
# frames if desired.
self.burst_writer.dump()
self.burst_writer.flush()
self.simulation.operations.writers.remove(self.burst_writer)
self.counter += 1
self.make_new_burst()
self.transitioning = False
def make_new_burst(self):
full_filename_base = self.filename_base + f"{self.counter}"
full_filename = self.filename_base + f"{self.counter}.gsd"
report = dict(
name=full_filename_base,
filename=full_filename,
event_timesteps_simulation_time=[],
event_timesteps_signal_time=[],
burst_start_timestep_simulation_time=0,
burst_end_timestep_simulation_time=0,
burst_start_timestep_signal_time=0,
burst_end_timestep_signal_time=0,
)
self.burst_event_reporter.append(report)
log = hoomd.logging.Logger()
log.add(self.simulation, quantities=["timestep", "walltime"])
log.add(self.potential, quantities=["energy", "energies"])
self.burst_writer = hoomd.write.Burst(
trigger=self.trigger,
filename=full_filename,
max_burst_size=self.max_burst_size,
write_at_start=True,
mode="wb",
)
self.burst_writer.logger = log
self.simulation.operations.writers.append(self.burst_writer)
def custom_dupin_generator_fn(data_du):
return {"energy": data_du}
custom_writer_trigger = hoomd.trigger.Periodic(1000)
signal_aggregator = du.data.aggregate.SignalAggregator(
du.data.base.CustomGenerator(custom_dupin_generator_fn).pipe(
du.data.reduce.NthGreatest(
(
-10,
-50,
)
)
),
)
sweep_detector = du.detect.SweepDetector(
rpt.Dynp(custom_cost=du.detect.CostLinearFit()),
max_change_points=3,
)
dupin_online_writer = hoomd.write.CustomWriter(
action=OnlineWriter(
hoomd.trigger.Periodic(10),
"burst",
2500,
signal_aggregator,
sweep_detector,
lj,
simulation,
custom_writer_trigger,
10000,
),
trigger=custom_writer_trigger,
)
simulation.operations += dupin_online_writer
simulation.run(25000)
## Apply the new potential
integrator.forces[-1].params[("B", "B")] = dict(epsilon=2.5, sigma=0.5)
simulation.run(100000)
# apply old potential
integrator.forces[-1].params[("B", "B")] = dict(epsilon=1, sigma=1)
simulation.run(25000)
gsd_writer.flush()
print("events detected by online burst writer")
print(dupin_online_writer.action.burst_event_reporter)
events detected by online burst writer
[{'name': 'burst0', 'filename': 'burst0.gsd', 'event_timesteps_simulation_time': [6999.000000000007], 'event_timesteps_signal_time': [19.999], 'burst_start_timestep_simulation_time': 88000, 'burst_end_timestep_simulation_time': 113000, 'burst_start_timestep_signal_time': 8.0, 'burst_end_timestep_signal_time': 33}, {'name': 'burst1', 'filename': 'burst1.gsd', 'event_timesteps_simulation_time': [13999.0], 'event_timesteps_signal_time': [124.999], 'burst_start_timestep_simulation_time': 194000, 'burst_end_timestep_simulation_time': 219000, 'burst_start_timestep_signal_time': 114.0, 'burst_end_timestep_signal_time': 139}, {'name': 'burst2', 'filename': 'burst2.gsd', 'event_timesteps_simulation_time': [], 'event_timesteps_signal_time': [], 'burst_start_timestep_simulation_time': 0, 'burst_end_timestep_simulation_time': 0, 'burst_start_timestep_signal_time': 0, 'burst_end_timestep_signal_time': 0}]
(OPTIONAL) Render key events¶
Renders first snapshot, and for each event renders snapshot of first frame of burst, event point frame, and last burst frame, and last frame of the simulation. This block can take a couple of minutes to render, and it can be skipped.
[4]:
import gsd.hoomd
# render start, before, middle and after for each event detected and end of sim.
frames_to_render = [0]
for trans in dupin_online_writer.action.burst_event_reporter[:-1]:
frames_to_render.append(int(trans["burst_start_timestep_signal_time"]))
frames_to_render.append(int(round(trans["event_timesteps_signal_time"][0])))
frames_to_render.append(int(trans["burst_end_timestep_signal_time"]))
frames_to_render.append(-1)
with gsd.hoomd.open("trajectory.gsd") as f:
frames = []
for ff in frames_to_render:
frames.append(f[int(ff)])
render_movie(frames)
Detected events and burst coverage¶
In the following plot we show how the burst writer covered the following parts of the trajectory based on the detected events:
[5]:
import gsd.hoomd
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
# Example data structures
burst_ends = []
change_points = []
for trans in dupin_online_writer.action.burst_event_reporter[:-1]:
burst_ends.append(
(
trans["burst_start_timestep_signal_time"],
trans["burst_end_timestep_signal_time"],
)
)
change_points.extend(trans["event_timesteps_signal_time"])
def custom_dupin_generator_fn(data_du):
return {"energy": data_du}
custom_writer_trigger = hoomd.trigger.Periodic(1000)
signal_aggregator = du.data.aggregate.SignalAggregator(
du.data.base.CustomGenerator(custom_dupin_generator_fn).pipe(
du.data.reduce.NthGreatest(
(
-10,
-50,
)
)
)
)
energy = []
timesteps = []
with gsd.hoomd.open("trajectory.gsd") as f:
for frame in f:
energy.append(frame.log["particles/md/pair/LJ/energies"])
signal_aggregator.accumulate(frame.log["particles/md/pair/LJ/energies"])
timesteps.append(frame.configuration.step)
# Plot the data
data = signal_aggregator.to_dataframe().to_numpy()
lines = plt.plot(
data,
label=signal_aggregator.to_dataframe().columns.to_list(),
)
for change_point in change_points:
plt.axvline(change_point, color="k", linestyle="--")
# Highlight burst regions
for burst_start, burst_end in burst_ends:
plt.axvspan(burst_start, burst_end, color="red", alpha=0.3)
# Create custom legend handles
legend_elements = [
Line2D([0], [0], color="k", linestyle="--", label="Change Point"),
Line2D([0], [0], color="red", lw=4, alpha=0.3, label="Burst Region"),
]
legend_elements.extend([line for line in lines])
plt.xlabel("signal time")
plt.ylabel("Energy")
plt.title("Energy vs Time with Highlighted Burst Regions and Change Points")
plt.legend(handles=legend_elements)
plt.show()