Skip to main content

hydro_lang/sim/
flow.rs

1//! Entrypoint for compiling and running Hydro simulations.
2
3use std::cell::RefCell;
4use std::collections::{BTreeMap, HashMap, HashSet};
5use std::panic::RefUnwindSafe;
6use std::rc::Rc;
7
8use dfir_lang::graph::{DfirGraph, FlatGraphBuilder, FlatGraphBuilderOutput};
9use libloading::Library;
10use slotmap::{SecondaryMap, SparseSecondaryMap};
11
12use super::builder::SimBuilder;
13use super::compiled::{CompiledSim, CompiledSimInstance};
14use super::graph::{SimDeploy, SimExternal, SimNode, compile_sim, create_sim_graph_trybuild};
15use crate::compile::builder::StmtId;
16use crate::compile::ir::HydroRoot;
17use crate::location::LocationKey;
18use crate::location::dynamic::LocationId;
19use crate::prelude::Cluster;
20use crate::sim::graph::SimExternalPortRegistry;
21use crate::staging_util::Invariant;
22
23/// A not-yet-compiled simulator for a Hydro program.
24pub struct SimFlow<'a> {
25    pub(crate) ir: Vec<HydroRoot>,
26
27    /// SimNode for each Process.
28    pub(crate) processes: SparseSecondaryMap<LocationKey, SimNode>,
29    /// SimNode for each Cluster.
30    pub(crate) clusters: SparseSecondaryMap<LocationKey, SimNode>,
31    /// SimExternal for each External.
32    pub(crate) externals: SparseSecondaryMap<LocationKey, SimExternal>,
33
34    /// Max size of each cluster.
35    pub(crate) cluster_max_sizes: SparseSecondaryMap<LocationKey, usize>,
36    /// Handle to state handling `external`s' ports.
37    pub(crate) externals_port_registry: Rc<RefCell<SimExternalPortRegistry>>,
38
39    /// The program version each location belongs to (all `0` for a single-version flow). Every
40    /// location has an entry.
41    pub(crate) location_version: SecondaryMap<LocationKey, u32>,
42
43    /// The correspondence-chain root key for each location: cross-version "same logical location"
44    /// grouping (see [`Cluster::next_version`](crate::location::Cluster::next_version)). Every
45    /// location has an entry (its own key unless it is a `next_version` successor).
46    pub(crate) location_group: SecondaryMap<LocationKey, LocationKey>,
47
48    /// When true, the simulator only tests safety properties (not liveness).
49    pub(crate) test_safety_only: bool,
50
51    /// When true, consistency assertions are skipped (treated as identity no-ops).
52    /// When false (default), encountering a consistency assertion panics because
53    /// validating consistency assertions is not yet supported in the simulator.
54    pub(crate) skip_consistency_assertions: bool,
55
56    /// Number of iterations to use for fuzzing, defaults to 8192
57    pub(crate) unit_test_fuzz_iterations: usize,
58
59    pub(crate) _phantom: Invariant<'a>,
60}
61
62impl<'a> SimFlow<'a> {
63    /// Sets the maximum size of the given cluster in the simulation.
64    pub fn with_cluster_size<C>(mut self, cluster: &Cluster<'a, C>, max_size: usize) -> Self {
65        self.cluster_max_sizes.insert(cluster.key, max_size);
66        self
67    }
68
69    /// Opts in to safety-only testing, which is required when using
70    /// [`lossy_delayed_forever`](crate::networking::NetworkingConfig::lossy_delayed_forever)
71    /// networking.
72    ///
73    /// The simulator models dropped messages as indefinitely delayed, which means
74    /// it only tests safety properties—not liveness—since messages may never arrive.
75    /// Calling this method acknowledges that the simulation will not verify that the
76    /// program eventually makes progress.
77    pub fn test_safety_only(mut self) -> Self {
78        self.test_safety_only = true;
79        self
80    }
81
82    /// Opts in to skipping consistency assertions. When enabled, `assert_is_consistent`
83    /// nodes are treated as identity no-ops in the simulator. When disabled (the default),
84    /// encountering a consistency assertion will panic because validating consistency
85    /// assertions is not yet supported in the simulator.
86    pub fn skip_consistency_assertions(mut self) -> Self {
87        self.skip_consistency_assertions = true;
88        self
89    }
90
91    /// Sets the number of fuzz iterations for this test. Overrides the
92    /// the default value of 8192
93    pub fn unit_test_fuzz_iterations(mut self, iterations: usize) -> Self {
94        self.unit_test_fuzz_iterations = iterations;
95        self
96    }
97
98    /// Executes the given closure with a single instance of the compiled simulation.
99    pub fn with_instance<T>(self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
100        self.compiled().with_instance(thunk)
101    }
102
103    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
104    /// closure will be repeatedly executed with instances of the Hydro program where the
105    /// batching boundaries, order of messages, and retries are varied.
106    ///
107    /// During development, you should run the test that invokes this function with the `cargo sim`
108    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
109    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
110    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
111    /// be executed, and if no reproducer is found a small number of random executions will be
112    /// performed.
113    pub fn fuzz(self, thunk: impl AsyncFn() + RefUnwindSafe) {
114        self.compiled().fuzz(thunk)
115    }
116
117    /// Exhaustively searches all possible executions of the simulation. The provided
118    /// closure will be repeatedly executed with instances of the Hydro program where the
119    /// batching boundaries, order of messages, and retries are varied.
120    ///
121    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
122    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
123    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
124    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
125    ///
126    /// Returns the number of distinct executions explored.
127    pub fn exhaustive(self, thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
128        self.compiled().exhaustive(thunk)
129    }
130
131    /// Compiles the simulation into a dynamically loadable library, and returns a handle to it.
132    pub fn compiled(mut self) -> CompiledSim {
133        use dfir_lang::graph::{eliminate_extra_unions_tees, partition_graph};
134
135        let is_multi_version = self.location_version.values().any(|&v| v > 0);
136
137        let mut sim_emit = SimBuilder {
138            process_graphs: BTreeMap::new(),
139            cluster_graphs: BTreeMap::new(),
140            process_tick_dfirs: BTreeMap::new(),
141            cluster_tick_dfirs: BTreeMap::new(),
142            extra_stmts_global: vec![],
143            extra_stmts_cluster: BTreeMap::new(),
144            next_hoff_id: crate::Counter::default(),
145            test_safety_only: self.test_safety_only,
146            skip_consistency_assertions: self.skip_consistency_assertions,
147            channel_tables: BTreeMap::new(),
148        };
149
150        // Ensure the default (0) external is always present.
151        self.externals.insert(
152            LocationKey::FIRST,
153            SimExternal {
154                shared_inner: self.externals_port_registry.clone(),
155            },
156        );
157
158        let mut seen_tees_instantiate: HashMap<_, _> = HashMap::new();
159        let mut seen_cluster_members = HashSet::new();
160        self.ir.iter_mut().for_each(|leaf| {
161            leaf.compile_network::<SimDeploy>(
162                &mut SparseSecondaryMap::new(),
163                &mut seen_tees_instantiate,
164                &mut seen_cluster_members,
165                &self.processes,
166                &self.clusters,
167                &self.externals,
168                &mut (),
169            );
170        });
171
172        if is_multi_version {
173            super::versioned_network::splice_versioned_networks(
174                &mut self.ir,
175                &self.location_group,
176                &self.location_version,
177            );
178        }
179
180        let mut seen_tees = HashMap::new();
181        let mut built_tees = HashMap::new();
182        let mut next_stmt_id = crate::Counter::<StmtId>::default();
183        let mut fold_hooked_idents = HashSet::new();
184        for leaf in &mut self.ir {
185            leaf.emit(
186                &mut sim_emit,
187                &mut seen_tees,
188                &mut built_tees,
189                &mut next_stmt_id,
190                &mut fold_hooked_idents,
191            );
192        }
193
194        fn build_graphs(
195            graphs: BTreeMap<LocationId, FlatGraphBuilder>,
196        ) -> BTreeMap<LocationId, DfirGraph> {
197            graphs
198                .into_iter()
199                .map(|(l, g)| {
200                    let FlatGraphBuilderOutput { mut flat_graph, .. } =
201                        g.build().expect("Failed to build DFIR flat graph.");
202                    eliminate_extra_unions_tees(&mut flat_graph);
203                    (
204                        l,
205                        partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
206                    )
207                })
208                .collect()
209        }
210
211        let process_graphs = build_graphs(sim_emit.process_graphs);
212        let cluster_graphs = build_graphs(sim_emit.cluster_graphs);
213        let process_tick_graphs = build_graphs(sim_emit.process_tick_dfirs);
214        let cluster_tick_graphs = build_graphs(sim_emit.cluster_tick_dfirs);
215
216        #[expect(
217            clippy::disallowed_methods,
218            reason = "nondeterministic iteration order, fine for checks"
219        )]
220        for c in self.clusters.keys() {
221            assert!(
222                self.cluster_max_sizes.contains_key(c),
223                "Cluster {:?} missing max size; call with_cluster_size() before compiled()",
224                c
225            );
226        }
227
228        let (cluster_max_sizes, cluster_member_ids) = self.cluster_sizing();
229
230        let (bin, trybuild) = create_sim_graph_trybuild(
231            process_graphs,
232            cluster_graphs,
233            cluster_max_sizes,
234            cluster_member_ids,
235            process_tick_graphs,
236            cluster_tick_graphs,
237            sim_emit.extra_stmts_global,
238            sim_emit.extra_stmts_cluster,
239        );
240
241        let out = compile_sim(bin, trybuild).unwrap();
242        let lib = unsafe { Library::new(&out).unwrap() };
243
244        CompiledSim {
245            _path: out,
246            lib,
247            externals_port_registry: self.externals_port_registry.take(),
248            unit_test_fuzz_iterations: self.unit_test_fuzz_iterations,
249        }
250    }
251
252    /// Computes each cluster's merged size and the global member-id slice it constructs.
253    ///
254    /// Corresponding clusters (a [`next_version`](crate::location::Cluster::next_version) chain)
255    /// share a group key; the merged size for a group is the sum of its per-version sizes, and each
256    /// version gets a contiguous member-id slice assigned in version order. A single-version
257    /// cluster is the degenerate case: its own group, one version, slice `0..size`.
258    fn cluster_sizing(
259        &self,
260    ) -> (
261        SparseSecondaryMap<LocationKey, usize>,
262        BTreeMap<LocationId, Vec<u32>>,
263    ) {
264        // Group corresponding clusters by their shared group key, recording each version's size.
265        let mut sizes_by_group: BTreeMap<LocationKey, BTreeMap<u32, usize>> = BTreeMap::new();
266        #[expect(
267            clippy::disallowed_methods,
268            reason = "each cluster key is unique; iteration order does not affect the result"
269        )]
270        for key in self.clusters.keys() {
271            let group = self.location_group[key];
272            let version = self.location_version[key];
273            let size = *self.cluster_max_sizes.get(key).unwrap_or_else(|| {
274                panic!(
275                    "cluster {key:?} missing max size; `compiled()` asserts every cluster has one \
276                     before calling `cluster_sizing`"
277                )
278            });
279            let prev = sizes_by_group
280                .entry(group)
281                .or_default()
282                .insert(version, size);
283            assert!(
284                prev.is_none(),
285                "multi-version simulation has two corresponding clusters at the same version; \
286                 each `next_version()` call must advance to a distinct version"
287            );
288        }
289
290        // Each cluster location gets the merged total size (so its membership lists the union) and
291        // its own contiguous slice of the global member-id range, assigned in version order.
292        let mut cluster_sizes: SparseSecondaryMap<LocationKey, usize> = SparseSecondaryMap::new();
293        let mut cluster_member_ids: BTreeMap<LocationId, Vec<u32>> = BTreeMap::new();
294        #[expect(
295            clippy::disallowed_methods,
296            reason = "each cluster key is unique; iteration order does not affect the result"
297        )]
298        for key in self.clusters.keys() {
299            let group = self.location_group[key];
300            let version = self.location_version[key];
301            let per_version = &sizes_by_group[&group];
302            let merged_total: usize = per_version.values().sum();
303            let offset: u32 = per_version.range(..version).map(|(_, &n)| n as u32).sum();
304            let size = *per_version
305                .get(&version)
306                .expect("every (group, version) was recorded by the first pass above")
307                as u32;
308            cluster_sizes.insert(key, merged_total);
309            cluster_member_ids.insert(LocationId::Cluster(key), (offset..offset + size).collect());
310        }
311
312        (cluster_sizes, cluster_member_ids)
313    }
314}