Skip to main content

hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::{fmt, panic};
4use std::cell::{Cell, RefCell};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::context::DfirErased;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::mpsc::UnboundedSender;
22use tokio::sync::{Mutex, Notify};
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31use crate::sim::runtime::SimHook;
32
33struct QuiescenceState {
34    /// Set to true when the scheduler reaches quiescence; reset to false when new input is sent.
35    quiescent: Cell<bool>,
36    /// Notified when the scheduler reaches quiescence (wakes receivers waiting for data).
37    quiescence_notify: Notify,
38    /// Notified when new input is sent, signaling the scheduler to resume.
39    resume_notify: Notify,
40}
41
42impl QuiescenceState {
43    /// Signal that new input has been sent, waking the scheduler if it was quiescent.
44    fn resume(&self) {
45        self.quiescent.set(false);
46        self.resume_notify.notify_waiters();
47    }
48
49    /// Whether the scheduler is currently quiescent (no more progress possible without input).
50    fn is_quiescent(&self) -> bool {
51        self.quiescent.get()
52    }
53
54    /// Returns a future that completes when the scheduler next reaches quiescence.
55    fn notified(&self) -> tokio::sync::futures::Notified<'_> {
56        self.quiescence_notify.notified()
57    }
58
59    /// Enter quiescence and wait for new input before continuing.
60    async fn wait_for_resume(&self) {
61        self.quiescent.set(true);
62        self.quiescence_notify.notify_waiters();
63        self.resume_notify.notified().await;
64        self.quiescent.set(false);
65    }
66}
67
68struct SimConnections {
69    input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
70    output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
71    cluster_input_senders: HashMap<SimExternalPort, HashMap<u32, Rc<UnboundedSender<Bytes>>>>,
72    cluster_output_receivers:
73        HashMap<SimExternalPort, HashMap<u32, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
74    external_registered: HashMap<ExternalPortId, SimExternalPort>,
75    quiescence: Rc<QuiescenceState>,
76}
77
78tokio::task_local! {
79    static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
80}
81
82/// A handle to a compiled Hydro simulation, which can be instantiated and run.
83pub struct CompiledSim {
84    pub(super) _path: TempPath,
85    pub(super) lib: Library,
86    pub(super) externals_port_registry: SimExternalPortRegistry,
87    pub(super) unit_test_fuzz_iterations: usize,
88}
89
90#[sealed::sealed]
91/// A trait implemented by closures that can instantiate a compiled simulation.
92///
93/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
94pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
95#[sealed::sealed]
96impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
97
98fn null_handler(_args: fmt::Arguments) {}
99
100fn println_handler(args: fmt::Arguments) {
101    println!("{}", args);
102}
103
104fn eprintln_handler(args: fmt::Arguments) {
105    eprintln!("{}", args);
106}
107
108/// Creates a simulation instance, returning:
109/// - A list of async DFIRs to run (all process / cluster logic outside a tick)
110/// - A list of tick DFIRs to run (where the &'static str is for the tick location id)
111/// - A mapping of hooks for non-deterministic decisions at tick-input boundaries
112/// - A mapping of inline hooks for non-deterministic decisions inside ticks
113type SimLoaded<'a> = libloading::Symbol<
114    'a,
115    unsafe extern "Rust" fn(
116        should_color: bool,
117        external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
118        external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
119        cluster_external_out: &mut HashMap<usize, HashMap<u32, UnboundedReceiverStream<Bytes>>>,
120        cluster_external_in: &mut HashMap<usize, HashMap<u32, UnboundedSender<Bytes>>>,
121        println_handler: fn(fmt::Arguments<'_>),
122        eprintln_handler: fn(fmt::Arguments<'_>),
123    ) -> (
124        Vec<(&'static str, Option<u32>, DfirErased)>,
125        Vec<(&'static str, Option<u32>, DfirErased)>,
126        Hooks<&'static str>,
127        InlineHooks<&'static str>,
128    ),
129>;
130
131impl CompiledSim {
132    /// Executes the given closure with a single instance of the compiled simulation.
133    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
134        self.with_instantiator(|instantiator| thunk(instantiator()), true)
135    }
136
137    /// Executes the given closure with an [`Instantiator`], which can be called to create
138    /// independent instances of the simulation. This is useful for fuzzing, where we need to
139    /// re-execute the simulation several times with different decisions.
140    ///
141    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
142    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
143    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
144    pub fn with_instantiator<T>(
145        &self,
146        thunk: impl FnOnce(&dyn Instantiator) -> T,
147        always_log: bool,
148    ) -> T {
149        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
150        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
151        thunk(
152            &(|| CompiledSimInstance {
153                func: func.clone(),
154                externals_port_registry: self.externals_port_registry.clone(),
155                dylib_result: None,
156                log,
157            }),
158        )
159    }
160
161    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
162    /// closure will be repeatedly executed with instances of the Hydro program where the
163    /// batching boundaries, order of messages, and retries are varied.
164    ///
165    /// During development, you should run the test that invokes this function with the `cargo sim`
166    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
167    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
168    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
169    /// be executed, and if no reproducer is found a small number of random executions will be
170    /// performed.
171    pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
172        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
173            .elements()
174            .into_iter()
175            .find(|e| {
176                !e.fn_name.starts_with("hydro_lang::sim::compiled")
177                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
178                    && !e.fn_name.starts_with("fuzz<")
179                    && !e.fn_name.starts_with("<hydro_lang::sim")
180            })
181            .unwrap();
182
183        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
184        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
185
186        let caller_fuzz_repro_path = repro_folder
187            .join(caller_fn.fn_name.replace("::", "__"))
188            .with_extension("bin");
189
190        if std::env::var("BOLERO_FUZZER").is_ok() {
191            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
192            std::fs::create_dir_all(&corpus_dir).unwrap();
193            let libfuzzer_args = format!(
194                "{} {} -artifact_prefix={}/ -handle_abrt=0",
195                corpus_dir.to_str().unwrap(),
196                corpus_dir.to_str().unwrap(),
197                corpus_dir.to_str().unwrap(),
198            );
199
200            std::fs::create_dir_all(&repro_folder).unwrap();
201
202            if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
203                unsafe {
204                    std::env::set_var(
205                        "BOLERO_FAILURE_OUTPUT",
206                        caller_fuzz_repro_path.to_str().unwrap(),
207                    );
208                }
209            }
210
211            unsafe {
212                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
213            }
214
215            self.with_instantiator(
216                |instantiator| {
217                    bolero::test(bolero::TargetLocation {
218                        package_name: "",
219                        manifest_dir: "",
220                        module_path: "",
221                        file: "",
222                        line: 0,
223                        item_path: "<unknown>::__bolero_item_path__",
224                        test_name: None,
225                    })
226                    .run_with_replay(move |is_replay| {
227                        let mut instance = instantiator();
228
229                        if instance.log {
230                            eprintln!(
231                                "{}",
232                                "\n==== New Simulation Instance ===="
233                                    .color(colored::Color::Cyan)
234                                    .bold()
235                            );
236                        }
237
238                        if is_replay {
239                            instance.log = true;
240                        }
241
242                        tokio::runtime::Builder::new_current_thread()
243                            .build()
244                            .unwrap()
245                            .block_on(async { instance.run(&mut thunk).await })
246                    })
247                },
248                false,
249            );
250        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
251            self.fuzz_repro(existing_bytes, async |compiled| {
252                compiled.launch();
253                thunk().await
254            });
255        } else {
256            eprintln!(
257                "Running a fuzz test without `cargo sim` and no reproducer found at {}, using {} iterations with random inputs.",
258                caller_fuzz_repro_path.display(),
259                self.unit_test_fuzz_iterations,
260            );
261            self.with_instantiator(
262                |instantiator| {
263                    bolero::test(bolero::TargetLocation {
264                        package_name: "",
265                        manifest_dir: "",
266                        module_path: "",
267                        file: ".",
268                        line: 0,
269                        item_path: "<unknown>::__bolero_item_path__",
270                        test_name: None,
271                    })
272                    .with_iterations(self.unit_test_fuzz_iterations)
273                    .run(move || {
274                        let instance = instantiator();
275                        tokio::runtime::Builder::new_current_thread()
276                            .build()
277                            .unwrap()
278                            .block_on(async { instance.run(&mut thunk).await })
279                    })
280                },
281                false,
282            );
283        }
284    }
285
286    /// Executes the given closure with a single instance of the compiled simulation, using the
287    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
288    /// failure found during fuzzing.
289    pub fn fuzz_repro<'a>(
290        &'a self,
291        bytes: Vec<u8>,
292        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
293    ) {
294        self.with_instance(|instance| {
295            bolero::bolero_engine::any::scope::with(
296                Box::new(bolero::bolero_engine::driver::object::Object(
297                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
298                )),
299                || {
300                    tokio::runtime::Builder::new_current_thread()
301                        .build()
302                        .unwrap()
303                        .block_on(async { instance.run_without_launching(thunk).await })
304                },
305            )
306        });
307    }
308
309    /// Exhaustively searches all possible executions of the simulation. The provided
310    /// closure will be repeatedly executed with instances of the Hydro program where the
311    /// batching boundaries, order of messages, and retries are varied.
312    ///
313    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
314    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
315    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
316    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
317    ///
318    /// Returns the number of distinct executions explored.
319    pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
320        if std::env::var("BOLERO_FUZZER").is_ok() {
321            eprintln!(
322                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
323            );
324            std::process::abort();
325        }
326
327        let mut count = 0;
328        let count_mut = &mut count;
329
330        self.with_instantiator(
331            |instantiator| {
332                bolero::test(bolero::TargetLocation {
333                    package_name: "",
334                    manifest_dir: "",
335                    module_path: "",
336                    file: "",
337                    line: 0,
338                    item_path: "<unknown>::__bolero_item_path__",
339                    test_name: None,
340                })
341                .exhaustive()
342                .run_with_replay(move |is_replay| {
343                    *count_mut += 1;
344
345                    let mut instance = instantiator();
346                    if instance.log {
347                        eprintln!(
348                            "{}",
349                            "\n==== New Simulation Instance ===="
350                                .color(colored::Color::Cyan)
351                                .bold()
352                        );
353                    }
354
355                    if is_replay {
356                        instance.log = true;
357                    }
358
359                    tokio::runtime::Builder::new_current_thread()
360                        .build()
361                        .unwrap()
362                        .block_on(async { instance.run(&mut thunk).await })
363                })
364            },
365            false,
366        );
367
368        count
369    }
370}
371
372// This must be a tuple because it is referenced from generated code in `graph.rs`.
373type DylibResult = (
374    Vec<(&'static str, Option<u32>, DfirErased)>,
375    Vec<(&'static str, Option<u32>, DfirErased)>,
376    Hooks<&'static str>,
377    InlineHooks<&'static str>,
378);
379
380/// A single instance of a compiled Hydro simulation, which provides methods to interactively
381/// execute the simulation, feed inputs, and receive outputs.
382pub struct CompiledSimInstance<'a> {
383    func: SimLoaded<'a>,
384    externals_port_registry: SimExternalPortRegistry,
385    dylib_result: Option<DylibResult>,
386    log: bool,
387}
388
389impl<'a> CompiledSimInstance<'a> {
390    async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
391        self.run_without_launching(async |instance| {
392            instance.launch();
393            thunk().await;
394        })
395        .await;
396    }
397
398    async fn run_without_launching(
399        mut self,
400        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
401    ) {
402        let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
403        let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
404        let mut cluster_external_out: HashMap<usize, HashMap<u32, UnboundedReceiverStream<Bytes>>> =
405            HashMap::new();
406        let mut cluster_external_in: HashMap<usize, HashMap<u32, UnboundedSender<Bytes>>> =
407            HashMap::new();
408
409        let dylib_result = unsafe {
410            (self.func)(
411                colored::control::SHOULD_COLORIZE.should_colorize(),
412                &mut external_out,
413                &mut external_in,
414                &mut cluster_external_out,
415                &mut cluster_external_in,
416                if self.log {
417                    println_handler
418                } else {
419                    null_handler
420                },
421                if self.log {
422                    eprintln_handler
423                } else {
424                    null_handler
425                },
426            )
427        };
428
429        let registered = &self.externals_port_registry.registered;
430
431        let quiescence = Rc::new(QuiescenceState {
432            quiescent: Cell::new(false),
433            quiescence_notify: Notify::new(),
434            resume_notify: Notify::new(),
435        });
436
437        let mut input_senders = HashMap::new();
438        let mut output_receivers = HashMap::new();
439        let mut cluster_input_senders = HashMap::new();
440        let mut cluster_output_receivers = HashMap::new();
441
442        #[expect(
443            clippy::disallowed_methods,
444            reason = "inserts into maps also unordered"
445        )]
446        for sim_port in registered.values() {
447            let usize_key = sim_port.into_inner();
448            if let Some(sender) = external_in.remove(&usize_key) {
449                input_senders.insert(*sim_port, Rc::new(sender));
450            }
451            if let Some(receiver) = external_out.remove(&usize_key) {
452                output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
453            }
454            if let Some(senders) = cluster_external_in.remove(&usize_key) {
455                cluster_input_senders.insert(
456                    *sim_port,
457                    senders
458                        .into_iter()
459                        .map(|(member, s)| (member, Rc::new(s)))
460                        .collect(),
461                );
462            }
463            if let Some(receivers) = cluster_external_out.remove(&usize_key) {
464                cluster_output_receivers.insert(
465                    *sim_port,
466                    receivers
467                        .into_iter()
468                        .map(|(member, r)| (member, Rc::new(Mutex::new(r))))
469                        .collect(),
470                );
471            }
472        }
473
474        self.dylib_result = Some(dylib_result);
475
476        let local_set = tokio::task::LocalSet::new();
477        local_set
478            .run_until(CURRENT_SIM_CONNECTIONS.scope(
479                RefCell::new(SimConnections {
480                    input_senders,
481                    output_receivers,
482                    cluster_input_senders,
483                    cluster_output_receivers,
484                    external_registered: self.externals_port_registry.registered.clone(),
485                    quiescence: quiescence.clone(),
486                }),
487                async move {
488                    thunk(self).await;
489                },
490            ))
491            .await;
492    }
493
494    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
495    /// be invoked but before receiving any messages.
496    fn launch(self) {
497        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
498    }
499
500    /// Returns a future that schedules simulation with the given logger for reporting the
501    /// simulation trace.
502    pub fn schedule_with_logger<W: std::io::Write>(
503        self,
504        log_writer: W,
505    ) -> impl use<W> + Future<Output = ()> {
506        self.schedule_with_maybe_logger(Some(log_writer))
507    }
508
509    fn schedule_with_maybe_logger<W: std::io::Write>(
510        mut self,
511        log_override: Option<W>,
512    ) -> impl use<W> + Future<Output = ()> {
513        let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
514
515        let not_ready_observation = async_dfirs
516            .iter()
517            .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
518            .collect();
519
520        let quiescence = CURRENT_SIM_CONNECTIONS.with(|connections| {
521            let connections = connections.borrow();
522            connections.quiescence.clone()
523        });
524
525        let mut launched = LaunchedSim {
526            async_dfirs: async_dfirs
527                .into_iter()
528                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
529                .collect(),
530            possibly_ready_ticks: vec![],
531            not_ready_ticks: tick_dfirs
532                .into_iter()
533                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
534                .collect(),
535            possibly_ready_observation: vec![],
536            not_ready_observation,
537            hooks: hooks
538                .into_iter()
539                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
540                .collect(),
541            inline_hooks: inline_hooks
542                .into_iter()
543                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
544                .collect(),
545            log: if self.log {
546                if let Some(w) = log_override {
547                    LogKind::Custom(w)
548                } else {
549                    LogKind::Stderr
550                }
551            } else {
552                LogKind::Null
553            },
554            quiescence,
555        };
556
557        async move { launched.scheduler().await }
558    }
559}
560
561impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
562    fn clone(&self) -> Self {
563        *self
564    }
565}
566
567impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
568
569impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
570    async fn with_stream<Out>(
571        &self,
572        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
573    ) -> Out {
574        let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
575            let connections = connections.borrow();
576            let port = connections.external_registered.get(&self.0).unwrap();
577            (
578                connections.output_receivers.get(port).unwrap().clone(),
579                connections.quiescence.clone(),
580            )
581        });
582
583        let mut receiver_stream = receiver.lock().await;
584        let mut notified_fut = pin!(quiescence.notified());
585        let mut quiescence_aware = futures::stream::poll_fn(|cx| {
586            use std::task::Poll;
587            match receiver_stream.poll_next_unpin(cx) {
588                Poll::Ready(Some(bytes)) => {
589                    return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
590                }
591                Poll::Ready(None) => return Poll::Ready(None),
592                Poll::Pending => {}
593            }
594            if quiescence.is_quiescent() {
595                return Poll::Ready(None);
596            }
597            let () = ready!(notified_fut.as_mut().poll(cx));
598            notified_fut.set(quiescence.notified());
599            Poll::Ready(None)
600        });
601        thunk(&mut pin!(&mut quiescence_aware)).await
602    }
603
604    /// Asserts that the stream has ended and no more messages can possibly arrive.
605    pub fn assert_no_more(self) -> impl Future<Output = ()>
606    where
607        T: Debug,
608    {
609        FutureTrackingCaller {
610            future: async move {
611                self.with_stream(async |stream| {
612                    if let Some(next) = stream.next().await {
613                        return Err(format!(
614                            "Stream yielded unexpected message: {:?}, expected termination",
615                            next
616                        ));
617                    }
618                    Ok(())
619                })
620                .await
621            },
622        }
623    }
624}
625
626impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
627    /// Receives the next message from the external bincode stream. This will wait until a message
628    /// is available, or return `None` if no more messages can possibly arrive.
629    pub async fn next(&self) -> Option<T> {
630        self.with_stream(async |stream| stream.next().await).await
631    }
632
633    /// Collects all remaining messages from the external bincode stream into a collection. This
634    /// will wait until no more messages can possibly arrive.
635    pub async fn collect<C: Default + Extend<T>>(self) -> C {
636        self.with_stream(async |stream| stream.collect().await)
637            .await
638    }
639
640    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
641    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
642    pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
643        &self,
644        expected: I,
645    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
646    where
647        T: Debug + PartialEq<T2>,
648    {
649        FutureTrackingCaller {
650            future: async {
651                let mut expected: VecDeque<T2> = expected.into_iter().collect();
652
653                while !expected.is_empty() {
654                    if let Some(next) = self.next().await {
655                        let next_expected = expected.pop_front().unwrap();
656                        if next != next_expected {
657                            return Err(format!(
658                                "Stream yielded unexpected message: {:?}, expected: {:?}",
659                                next, next_expected
660                            ));
661                        }
662                    } else {
663                        return Err(format!(
664                            "Stream ended early, still expected: {:?}",
665                            expected
666                        ));
667                    }
668                }
669
670                Ok(())
671            },
672        }
673    }
674
675    /// Asserts that the stream yields only the expected sequence of messages, in order,
676    /// and then ends.
677    pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
678        &self,
679        expected: I,
680    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
681    where
682        T: Debug + PartialEq<T2>,
683    {
684        ChainedFuture {
685            first: self.assert_yields(expected),
686            second: self.assert_no_more(),
687            first_done: false,
688        }
689    }
690}
691
692pin_project_lite::pin_project! {
693    // A future that tracks the location of the `.await` call for better panic messages.
694    //
695    // `#[track_caller]` is important for us to create assertion methods because it makes
696    // the panic backtrace show up at that method (instead of inside the call tree within
697    // that method). This is e.g. what `Option::unwrap` uses. Unfortunately, `#[track_caller]`
698    // does not work correctly for async methods (or `dyn Future` either), so we have to
699    // create these concrete future types that (1) have `#[track_caller]` on their `poll()`
700    // method and (2) have the `panic!` triggered in their `poll()` method (or in a directly
701    // nested concrete future).
702    struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
703        #[pin]
704        future: F,
705    }
706}
707
708impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
709    type Output = ();
710
711    #[track_caller]
712    fn poll(
713        mut self: Pin<&mut Self>,
714        cx: &mut std::task::Context<'_>,
715    ) -> std::task::Poll<Self::Output> {
716        match ready!(self.as_mut().project().future.poll(cx)) {
717            Ok(()) => std::task::Poll::Ready(()),
718            Err(e) => panic!("{}", e),
719        }
720    }
721}
722
723pin_project_lite::pin_project! {
724    // A future that first awaits the first future, then the second, propagating caller info.
725    //
726    // See [`FutureTrackingCaller`] for context.
727    struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
728        #[pin]
729        first: F1,
730        #[pin]
731        second: F2,
732        first_done: bool,
733    }
734}
735
736impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
737    type Output = ();
738
739    #[track_caller]
740    fn poll(
741        mut self: Pin<&mut Self>,
742        cx: &mut std::task::Context<'_>,
743    ) -> std::task::Poll<Self::Output> {
744        if !self.first_done {
745            ready!(self.as_mut().project().first.poll(cx));
746            *self.as_mut().project().first_done = true;
747        }
748
749        self.as_mut().project().second.poll(cx)
750    }
751}
752
753impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
754    /// Collects all remaining messages from the external bincode stream into a collection,
755    /// sorting them. This will wait until no more messages can possibly arrive.
756    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
757    where
758        T: Ord,
759    {
760        self.with_stream(async |stream| {
761            let mut collected: C = stream.collect().await;
762            collected.as_mut().sort();
763            collected
764        })
765        .await
766    }
767
768    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
769    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
770    pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
771        &self,
772        expected: I,
773    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
774    where
775        T: Debug + PartialEq<T2>,
776    {
777        FutureTrackingCaller {
778            future: async {
779                self.with_stream(async |stream| {
780                    let mut expected: Vec<T2> = expected.into_iter().collect();
781
782                    while !expected.is_empty() {
783                        if let Some(next) = stream.next().await {
784                            let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
785                            if let Some((i, _)) = idx {
786                                expected.swap_remove(i);
787                            } else {
788                                return Err(format!(
789                                    "Stream yielded unexpected message: {:?}",
790                                    next
791                                ));
792                            }
793                        } else {
794                            return Err(format!(
795                                "Stream ended early, still expected: {:?}",
796                                expected
797                            ));
798                        }
799                    }
800
801                    Ok(())
802                })
803                .await
804            },
805        }
806    }
807
808    /// Asserts that the stream yields only the expected sequence of messages, in some order,
809    /// and then ends.
810    pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
811        &self,
812        expected: I,
813    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
814    where
815        T: Debug + PartialEq<T2>,
816    {
817        ChainedFuture {
818            first: self.assert_yields_unordered(expected),
819            second: self.assert_no_more(),
820            first_done: false,
821        }
822    }
823}
824
825impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
826    fn with_sink<Out>(
827        &self,
828        thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
829    ) -> Out {
830        let (sender, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
831            let connections = connections.borrow();
832            (
833                connections
834                    .input_senders
835                    .get(connections.external_registered.get(&self.0).unwrap())
836                    .unwrap()
837                    .clone(),
838                connections.quiescence.clone(),
839            )
840        });
841
842        thunk(&move |t| {
843            let res = sender.send(bincode::serialize(&t).unwrap().into());
844            quiescence.resume();
845            res
846        })
847    }
848}
849
850impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
851    /// Sends several messages to the external bincode sink. The messages will be asynchronously
852    /// processed as part of the simulation, in non-deterministic order.
853    pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
854        self.with_sink(|send| {
855            for t in iter {
856                send(t).unwrap();
857            }
858        })
859    }
860}
861
862impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
863    /// Sends a message to the external bincode sink. The message will be asynchronously processed
864    /// as part of the simulation.
865    pub fn send(&self, t: T) {
866        self.with_sink(|send| send(t)).unwrap();
867    }
868
869    /// Sends several messages to the external bincode sink. The messages will be asynchronously
870    /// processed as part of the simulation.
871    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
872        self.with_sink(|send| {
873            for t in iter {
874                send(t).unwrap();
875            }
876        })
877    }
878}
879
880impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
881    for SimClusterReceiver<T, O, R>
882{
883    fn clone(&self) -> Self {
884        *self
885    }
886}
887
888impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
889    for SimClusterReceiver<T, O, R>
890{
891}
892
893impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
894    async fn with_member_stream<Out>(
895        &self,
896        member_id: u32,
897        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
898    ) -> Out {
899        let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
900            let connections = connections.borrow();
901            let port = connections.external_registered.get(&self.0).unwrap();
902            let receivers = connections.cluster_output_receivers.get(port).unwrap();
903            (
904                receivers[&member_id].clone(),
905                connections.quiescence.clone(),
906            )
907        });
908
909        let mut lock = receiver.lock().await;
910        let mut notified_fut = pin!(quiescence.notified());
911        let mut quiescence_aware = futures::stream::poll_fn(|cx| {
912            use std::task::Poll;
913            match lock.poll_next_unpin(cx) {
914                Poll::Ready(Some(bytes)) => {
915                    return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
916                }
917                Poll::Ready(None) => return Poll::Ready(None),
918                Poll::Pending => {}
919            }
920            if quiescence.is_quiescent() {
921                return Poll::Ready(None);
922            }
923            let () = ready!(notified_fut.as_mut().poll(cx));
924            notified_fut.set(quiescence.notified());
925            Poll::Ready(None)
926        });
927        thunk(&mut pin!(&mut quiescence_aware)).await
928    }
929}
930
931impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
932    /// Receives the next value from a specific cluster member.
933    pub async fn next(&self, member_id: u32) -> Option<T> {
934        self.with_member_stream(member_id, async |stream| stream.next().await)
935            .await
936    }
937
938    /// Collects all remaining values from a specific cluster member into a collection.
939    pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
940        self.with_member_stream(member_id, async |stream| stream.collect().await)
941            .await
942    }
943}
944
945impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
946    /// Collects all remaining values from a specific cluster member, sorted.
947    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
948    where
949        T: Ord,
950    {
951        self.with_member_stream(member_id, async |stream| {
952            let mut collected: C = stream.collect().await;
953            collected.as_mut().sort();
954            collected
955        })
956        .await
957    }
958}
959
960impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
961    fn with_sink<Out>(
962        &self,
963        thunk: impl FnOnce(
964            &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
965        ) -> Out,
966    ) -> Out {
967        let (senders, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
968            let connections = connections.borrow();
969            (
970                connections
971                    .cluster_input_senders
972                    .get(connections.external_registered.get(&self.0).unwrap())
973                    .unwrap()
974                    .clone(),
975                connections.quiescence.clone(),
976            )
977        });
978
979        thunk(&move |member_id: u32, t: T| {
980            let payload = bincode::serialize(&t).unwrap();
981            let res = senders[&member_id].send(Bytes::from(payload));
982            quiescence.resume();
983            res
984        })
985    }
986}
987
988impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
989    /// Sends a value to a specific cluster member.
990    pub fn send(&self, member_id: u32, t: T) {
991        self.with_sink(|send| send(member_id, t)).unwrap();
992    }
993
994    /// Sends multiple values to specific cluster members.
995    pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
996        self.with_sink(|send| {
997            for (member_id, t) in iter {
998                send(member_id, t).unwrap();
999            }
1000        })
1001    }
1002}
1003
1004enum LogKind<W: std::io::Write> {
1005    Null,
1006    Stderr,
1007    Custom(W),
1008}
1009
1010// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
1011impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
1012    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
1013        match self {
1014            LogKind::Null => Ok(()),
1015            LogKind::Stderr => {
1016                eprint!("{}", s);
1017                Ok(())
1018            }
1019            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
1020        }
1021    }
1022}
1023
1024/// A running simulation, which manages the async DFIRs, tick DFIRs, and hook-based
1025/// scheduling decisions for non-deterministic operators like `batch` and `assume_ordering`.
1026///
1027/// The scheduler loops between three kinds of work:
1028/// - **Async DFIRs**: long-running top-level dataflows (one per process/cluster member) that
1029///   produce data consumed by ticks and observations.
1030/// - **Ticks**: tick-scoped DFIRs that execute a single tick. Before running, their associated
1031///   hooks (e.g. from `batch`) are resolved to decide what data to release into the tick.
1032/// - **Observations**: top-level locations that have hooks (e.g. from `assume_ordering` on a
1033///   non-tick stream) needing decisions, but no tick DFIR to execute. The scheduler just
1034///   resolves their hooks.
1035struct LaunchedSim<W: std::io::Write> {
1036    /// Top-level async DFIRs, one per process/cluster member. These run continuously and
1037    /// produce data that feeds into ticks and observations.
1038    async_dfirs: Vec<(LocationId, Option<u32>, DfirErased)>,
1039    /// Tick DFIRs whose parent async DFIR has made progress, so they may be ready to run.
1040    /// The scheduler further filters these by checking whether their hooks have pending decisions.
1041    possibly_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1042    /// Tick DFIRs whose parent async DFIR has not yet made progress since they were last checked.
1043    not_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1044    /// Top-level locations whose async DFIR has made progress and whose hooks (from top-level
1045    /// `assume_ordering`) may have ordering decisions to resolve. Unlike ticks, these have no
1046    /// DFIR to execute — only hook resolution.
1047    possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
1048    /// Top-level locations whose async DFIR has not yet made progress since they were last checked.
1049    not_ready_observation: Vec<(LocationId, Option<u32>)>,
1050    /// Hooks keyed by (location, cluster_member_id). These are resolved *before* a tick runs
1051    /// (for `batch` hooks) or standalone (for top-level `assume_ordering` hooks via observations).
1052    hooks: Hooks<LocationId>,
1053    /// Inline hooks keyed by (tick location, cluster_member_id). These are resolved *during*
1054    /// tick execution via a `tokio::select!` loop, for operators like `assume_ordering` inside
1055    /// a tick that block on ordering decisions while the tick DFIR is running.
1056    inline_hooks: InlineHooks<LocationId>,
1057    log: LogKind<W>,
1058    /// Represents quiescence state of the simulation.
1059    quiescence: Rc<QuiescenceState>,
1060}
1061
1062impl<W: std::io::Write> LaunchedSim<W> {
1063    async fn scheduler(&mut self) {
1064        loop {
1065            tokio::task::yield_now().await;
1066            let mut any_made_progress = false;
1067            for (loc, c_id, dfir) in &mut self.async_dfirs {
1068                if dfir.run_tick().await {
1069                    any_made_progress = true;
1070                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
1071                        .not_ready_ticks
1072                        .drain(..)
1073                        .partition(|(tick_loc, tick_c_id, _)| {
1074                            let LocationId::Tick(_, outer) = tick_loc else {
1075                                unreachable!()
1076                            };
1077                            outer.as_ref() == loc && tick_c_id == c_id
1078                        });
1079
1080                    self.possibly_ready_ticks.extend(now_ready);
1081                    self.not_ready_ticks.extend(still_not_ready);
1082
1083                    let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
1084                        .not_ready_observation
1085                        .drain(..)
1086                        .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
1087
1088                    self.possibly_ready_observation.extend(now_ready_obs);
1089                    self.not_ready_observation.extend(still_not_ready_obs);
1090                }
1091            }
1092
1093            if any_made_progress {
1094                continue;
1095            } else {
1096                use bolero::generator::*;
1097
1098                let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
1099                    .possibly_ready_ticks
1100                    .drain(..)
1101                    .partition(|(name, cid, _)| {
1102                        let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1103                        // All hooks must be ready (have received input or have a last value)
1104                        hooks.iter().all(|hook| hook.is_ready())
1105                            // And at least one hook must be able to make progress
1106                            && hooks.iter().any(|hook| {
1107                                hook.current_decision().unwrap_or(false)
1108                                    || hook.can_make_nontrivial_decision()
1109                            })
1110                    });
1111
1112                self.possibly_ready_ticks = ready_tick;
1113                self.not_ready_ticks.append(&mut not_ready_tick);
1114
1115                let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
1116                    .possibly_ready_observation
1117                    .drain(..)
1118                    .partition(|(name, cid)| {
1119                        self.hooks
1120                            .get(&(name.clone(), *cid))
1121                            .into_iter()
1122                            .flatten()
1123                            .any(|hook| {
1124                                hook.current_decision().unwrap_or(false)
1125                                    || hook.can_make_nontrivial_decision()
1126                            })
1127                    });
1128
1129                self.possibly_ready_observation = ready_obs;
1130                self.not_ready_observation.append(&mut not_ready_obs);
1131
1132                if self.possibly_ready_ticks.is_empty()
1133                    && self.possibly_ready_observation.is_empty()
1134                {
1135                    // If any tick is blocked because a hook is not ready, that's a
1136                    // simulator bug — it means a singleton never received a value.
1137                    for (name, cid, _) in &self.not_ready_ticks {
1138                        let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1139                        assert!(
1140                            hooks.iter().all(|hook| hook.is_ready()),
1141                            "Simulator bug: tick has a hook that never became ready"
1142                        );
1143                    }
1144
1145                    // Signal quiescence and wait for new input.
1146                    self.quiescence.wait_for_resume().await;
1147                } else {
1148                    let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1149                        + self.possibly_ready_observation.len()))
1150                        .any();
1151
1152                    if next_tick_or_obs < self.possibly_ready_ticks.len() {
1153                        let next_tick = next_tick_or_obs;
1154                        let mut removed = self.possibly_ready_ticks.remove(next_tick);
1155
1156                        match &mut self.log {
1157                            LogKind::Null => {}
1158                            LogKind::Stderr => {
1159                                if let Some(cid) = &removed.1 {
1160                                    eprintln!(
1161                                        "\n{}",
1162                                        format!("Running Tick (Cluster Member {})", cid)
1163                                            .color(colored::Color::Magenta)
1164                                            .bold()
1165                                    )
1166                                } else {
1167                                    eprintln!(
1168                                        "\n{}",
1169                                        "Running Tick".color(colored::Color::Magenta).bold()
1170                                    )
1171                                }
1172                            }
1173                            LogKind::Custom(writer) => {
1174                                writeln!(
1175                                    writer,
1176                                    "\n{}",
1177                                    "Running Tick".color(colored::Color::Magenta).bold()
1178                                )
1179                                .unwrap();
1180                            }
1181                        }
1182
1183                        let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1184                            write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1185                            write.write_str(" ")
1186                        };
1187
1188                        let mut tick_decision_writer = indenter::indented(&mut self.log)
1189                            .with_format(indenter::Format::Custom {
1190                                inserter: &mut asterisk_indenter,
1191                            });
1192
1193                        let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1194                        run_hooks(&mut tick_decision_writer, hooks);
1195
1196                        let run_tick_future = removed.2.run_tick();
1197                        if let Some(inline_hooks) =
1198                            self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1199                        {
1200                            let mut run_tick_future_pinned = pin!(run_tick_future);
1201
1202                            loop {
1203                                tokio::select! {
1204                                    biased;
1205                                    r = &mut run_tick_future_pinned => {
1206                                        assert!(r);
1207                                        break;
1208                                    }
1209                                    _ = async {} => {
1210                                        bolero_generator::any::scope::borrow_with(|driver| {
1211                                            for hook in inline_hooks.iter_mut() {
1212                                                if hook.pending_decision() {
1213                                                    if !hook.has_decision() {
1214                                                        hook.autonomous_decision(driver);
1215                                                    }
1216
1217                                                    hook.release_decision(&mut tick_decision_writer);
1218                                                }
1219                                            }
1220                                        });
1221                                    }
1222                                }
1223                            }
1224                        } else {
1225                            assert!(run_tick_future.await);
1226                        }
1227
1228                        self.possibly_ready_ticks.push(removed);
1229                    } else {
1230                        let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1231                        let mut default_hooks = vec![];
1232                        let hooks = self
1233                            .hooks
1234                            .get_mut(&self.possibly_ready_observation[next_obs])
1235                            .unwrap_or(&mut default_hooks);
1236
1237                        run_hooks(&mut self.log, hooks);
1238                    }
1239                }
1240            }
1241        }
1242    }
1243}
1244
1245fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1246    let mut remaining_decision_count = hooks.len();
1247    let mut made_nontrivial_decision = false;
1248
1249    bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1250        // first, scan manual decisions
1251        hooks.iter_mut().for_each(|hook| {
1252            if let Some(is_nontrivial) = hook.current_decision() {
1253                made_nontrivial_decision |= is_nontrivial;
1254                remaining_decision_count -= 1;
1255            } else if !hook.can_make_nontrivial_decision() {
1256                // if no nontrivial decision is possible, make a trivial one
1257                // (we need to do this in the first pass to force nontrivial decisions
1258                // on the remaining hooks)
1259                hook.autonomous_decision(driver, false);
1260                remaining_decision_count -= 1;
1261            }
1262        });
1263
1264        hooks.iter_mut().for_each(|hook| {
1265            if hook.current_decision().is_none() {
1266                made_nontrivial_decision |= hook.autonomous_decision(
1267                    driver,
1268                    !made_nontrivial_decision && remaining_decision_count == 1,
1269                );
1270                remaining_decision_count -= 1;
1271            }
1272
1273            hook.release_decision(tick_decision_writer);
1274        });
1275    });
1276}