1use core::{fmt, panic};
4use std::cell::{Cell, RefCell};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::context::DfirErased;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::mpsc::UnboundedSender;
22use tokio::sync::{Mutex, Notify};
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31use crate::sim::runtime::SimHook;
32
33struct QuiescenceState {
34 quiescent: Cell<bool>,
36 quiescence_notify: Notify,
38 resume_notify: Notify,
40}
41
42impl QuiescenceState {
43 fn resume(&self) {
45 self.quiescent.set(false);
46 self.resume_notify.notify_waiters();
47 }
48
49 fn is_quiescent(&self) -> bool {
51 self.quiescent.get()
52 }
53
54 fn notified(&self) -> tokio::sync::futures::Notified<'_> {
56 self.quiescence_notify.notified()
57 }
58
59 async fn wait_for_resume(&self) {
61 self.quiescent.set(true);
62 self.quiescence_notify.notify_waiters();
63 self.resume_notify.notified().await;
64 self.quiescent.set(false);
65 }
66}
67
68struct SimConnections {
69 input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
70 output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
71 cluster_input_senders: HashMap<SimExternalPort, HashMap<u32, Rc<UnboundedSender<Bytes>>>>,
72 cluster_output_receivers:
73 HashMap<SimExternalPort, HashMap<u32, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
74 external_registered: HashMap<ExternalPortId, SimExternalPort>,
75 quiescence: Rc<QuiescenceState>,
76}
77
78tokio::task_local! {
79 static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
80}
81
82pub struct CompiledSim {
84 pub(super) _path: TempPath,
85 pub(super) lib: Library,
86 pub(super) externals_port_registry: SimExternalPortRegistry,
87 pub(super) unit_test_fuzz_iterations: usize,
88}
89
90#[sealed::sealed]
91pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
95#[sealed::sealed]
96impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
97
98fn null_handler(_args: fmt::Arguments) {}
99
100fn println_handler(args: fmt::Arguments) {
101 println!("{}", args);
102}
103
104fn eprintln_handler(args: fmt::Arguments) {
105 eprintln!("{}", args);
106}
107
108type SimLoaded<'a> = libloading::Symbol<
114 'a,
115 unsafe extern "Rust" fn(
116 should_color: bool,
117 external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
118 external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
119 cluster_external_out: &mut HashMap<usize, HashMap<u32, UnboundedReceiverStream<Bytes>>>,
120 cluster_external_in: &mut HashMap<usize, HashMap<u32, UnboundedSender<Bytes>>>,
121 println_handler: fn(fmt::Arguments<'_>),
122 eprintln_handler: fn(fmt::Arguments<'_>),
123 ) -> (
124 Vec<(&'static str, Option<u32>, DfirErased)>,
125 Vec<(&'static str, Option<u32>, DfirErased)>,
126 Hooks<&'static str>,
127 InlineHooks<&'static str>,
128 ),
129>;
130
131impl CompiledSim {
132 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
134 self.with_instantiator(|instantiator| thunk(instantiator()), true)
135 }
136
137 pub fn with_instantiator<T>(
145 &self,
146 thunk: impl FnOnce(&dyn Instantiator) -> T,
147 always_log: bool,
148 ) -> T {
149 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
150 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
151 thunk(
152 &(|| CompiledSimInstance {
153 func: func.clone(),
154 externals_port_registry: self.externals_port_registry.clone(),
155 dylib_result: None,
156 log,
157 }),
158 )
159 }
160
161 pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
172 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
173 .elements()
174 .into_iter()
175 .find(|e| {
176 !e.fn_name.starts_with("hydro_lang::sim::compiled")
177 && !e.fn_name.starts_with("hydro_lang::sim::flow")
178 && !e.fn_name.starts_with("fuzz<")
179 && !e.fn_name.starts_with("<hydro_lang::sim")
180 })
181 .unwrap();
182
183 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
184 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
185
186 let caller_fuzz_repro_path = repro_folder
187 .join(caller_fn.fn_name.replace("::", "__"))
188 .with_extension("bin");
189
190 if std::env::var("BOLERO_FUZZER").is_ok() {
191 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
192 std::fs::create_dir_all(&corpus_dir).unwrap();
193 let libfuzzer_args = format!(
194 "{} {} -artifact_prefix={}/ -handle_abrt=0",
195 corpus_dir.to_str().unwrap(),
196 corpus_dir.to_str().unwrap(),
197 corpus_dir.to_str().unwrap(),
198 );
199
200 std::fs::create_dir_all(&repro_folder).unwrap();
201
202 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
203 unsafe {
204 std::env::set_var(
205 "BOLERO_FAILURE_OUTPUT",
206 caller_fuzz_repro_path.to_str().unwrap(),
207 );
208 }
209 }
210
211 unsafe {
212 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
213 }
214
215 self.with_instantiator(
216 |instantiator| {
217 bolero::test(bolero::TargetLocation {
218 package_name: "",
219 manifest_dir: "",
220 module_path: "",
221 file: "",
222 line: 0,
223 item_path: "<unknown>::__bolero_item_path__",
224 test_name: None,
225 })
226 .run_with_replay(move |is_replay| {
227 let mut instance = instantiator();
228
229 if instance.log {
230 eprintln!(
231 "{}",
232 "\n==== New Simulation Instance ===="
233 .color(colored::Color::Cyan)
234 .bold()
235 );
236 }
237
238 if is_replay {
239 instance.log = true;
240 }
241
242 tokio::runtime::Builder::new_current_thread()
243 .build()
244 .unwrap()
245 .block_on(async { instance.run(&mut thunk).await })
246 })
247 },
248 false,
249 );
250 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
251 self.fuzz_repro(existing_bytes, async |compiled| {
252 compiled.launch();
253 thunk().await
254 });
255 } else {
256 eprintln!(
257 "Running a fuzz test without `cargo sim` and no reproducer found at {}, using {} iterations with random inputs.",
258 caller_fuzz_repro_path.display(),
259 self.unit_test_fuzz_iterations,
260 );
261 self.with_instantiator(
262 |instantiator| {
263 bolero::test(bolero::TargetLocation {
264 package_name: "",
265 manifest_dir: "",
266 module_path: "",
267 file: ".",
268 line: 0,
269 item_path: "<unknown>::__bolero_item_path__",
270 test_name: None,
271 })
272 .with_iterations(self.unit_test_fuzz_iterations)
273 .run(move || {
274 let instance = instantiator();
275 tokio::runtime::Builder::new_current_thread()
276 .build()
277 .unwrap()
278 .block_on(async { instance.run(&mut thunk).await })
279 })
280 },
281 false,
282 );
283 }
284 }
285
286 pub fn fuzz_repro<'a>(
290 &'a self,
291 bytes: Vec<u8>,
292 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
293 ) {
294 self.with_instance(|instance| {
295 bolero::bolero_engine::any::scope::with(
296 Box::new(bolero::bolero_engine::driver::object::Object(
297 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
298 )),
299 || {
300 tokio::runtime::Builder::new_current_thread()
301 .build()
302 .unwrap()
303 .block_on(async { instance.run_without_launching(thunk).await })
304 },
305 )
306 });
307 }
308
309 pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
320 if std::env::var("BOLERO_FUZZER").is_ok() {
321 eprintln!(
322 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
323 );
324 std::process::abort();
325 }
326
327 let mut count = 0;
328 let count_mut = &mut count;
329
330 self.with_instantiator(
331 |instantiator| {
332 bolero::test(bolero::TargetLocation {
333 package_name: "",
334 manifest_dir: "",
335 module_path: "",
336 file: "",
337 line: 0,
338 item_path: "<unknown>::__bolero_item_path__",
339 test_name: None,
340 })
341 .exhaustive()
342 .run_with_replay(move |is_replay| {
343 *count_mut += 1;
344
345 let mut instance = instantiator();
346 if instance.log {
347 eprintln!(
348 "{}",
349 "\n==== New Simulation Instance ===="
350 .color(colored::Color::Cyan)
351 .bold()
352 );
353 }
354
355 if is_replay {
356 instance.log = true;
357 }
358
359 tokio::runtime::Builder::new_current_thread()
360 .build()
361 .unwrap()
362 .block_on(async { instance.run(&mut thunk).await })
363 })
364 },
365 false,
366 );
367
368 count
369 }
370}
371
372type DylibResult = (
374 Vec<(&'static str, Option<u32>, DfirErased)>,
375 Vec<(&'static str, Option<u32>, DfirErased)>,
376 Hooks<&'static str>,
377 InlineHooks<&'static str>,
378);
379
380pub struct CompiledSimInstance<'a> {
383 func: SimLoaded<'a>,
384 externals_port_registry: SimExternalPortRegistry,
385 dylib_result: Option<DylibResult>,
386 log: bool,
387}
388
389impl<'a> CompiledSimInstance<'a> {
390 async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
391 self.run_without_launching(async |instance| {
392 instance.launch();
393 thunk().await;
394 })
395 .await;
396 }
397
398 async fn run_without_launching(
399 mut self,
400 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
401 ) {
402 let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
403 let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
404 let mut cluster_external_out: HashMap<usize, HashMap<u32, UnboundedReceiverStream<Bytes>>> =
405 HashMap::new();
406 let mut cluster_external_in: HashMap<usize, HashMap<u32, UnboundedSender<Bytes>>> =
407 HashMap::new();
408
409 let dylib_result = unsafe {
410 (self.func)(
411 colored::control::SHOULD_COLORIZE.should_colorize(),
412 &mut external_out,
413 &mut external_in,
414 &mut cluster_external_out,
415 &mut cluster_external_in,
416 if self.log {
417 println_handler
418 } else {
419 null_handler
420 },
421 if self.log {
422 eprintln_handler
423 } else {
424 null_handler
425 },
426 )
427 };
428
429 let registered = &self.externals_port_registry.registered;
430
431 let quiescence = Rc::new(QuiescenceState {
432 quiescent: Cell::new(false),
433 quiescence_notify: Notify::new(),
434 resume_notify: Notify::new(),
435 });
436
437 let mut input_senders = HashMap::new();
438 let mut output_receivers = HashMap::new();
439 let mut cluster_input_senders = HashMap::new();
440 let mut cluster_output_receivers = HashMap::new();
441
442 #[expect(
443 clippy::disallowed_methods,
444 reason = "inserts into maps also unordered"
445 )]
446 for sim_port in registered.values() {
447 let usize_key = sim_port.into_inner();
448 if let Some(sender) = external_in.remove(&usize_key) {
449 input_senders.insert(*sim_port, Rc::new(sender));
450 }
451 if let Some(receiver) = external_out.remove(&usize_key) {
452 output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
453 }
454 if let Some(senders) = cluster_external_in.remove(&usize_key) {
455 cluster_input_senders.insert(
456 *sim_port,
457 senders
458 .into_iter()
459 .map(|(member, s)| (member, Rc::new(s)))
460 .collect(),
461 );
462 }
463 if let Some(receivers) = cluster_external_out.remove(&usize_key) {
464 cluster_output_receivers.insert(
465 *sim_port,
466 receivers
467 .into_iter()
468 .map(|(member, r)| (member, Rc::new(Mutex::new(r))))
469 .collect(),
470 );
471 }
472 }
473
474 self.dylib_result = Some(dylib_result);
475
476 let local_set = tokio::task::LocalSet::new();
477 local_set
478 .run_until(CURRENT_SIM_CONNECTIONS.scope(
479 RefCell::new(SimConnections {
480 input_senders,
481 output_receivers,
482 cluster_input_senders,
483 cluster_output_receivers,
484 external_registered: self.externals_port_registry.registered.clone(),
485 quiescence: quiescence.clone(),
486 }),
487 async move {
488 thunk(self).await;
489 },
490 ))
491 .await;
492 }
493
494 fn launch(self) {
497 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
498 }
499
500 pub fn schedule_with_logger<W: std::io::Write>(
503 self,
504 log_writer: W,
505 ) -> impl use<W> + Future<Output = ()> {
506 self.schedule_with_maybe_logger(Some(log_writer))
507 }
508
509 fn schedule_with_maybe_logger<W: std::io::Write>(
510 mut self,
511 log_override: Option<W>,
512 ) -> impl use<W> + Future<Output = ()> {
513 let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
514
515 let not_ready_observation = async_dfirs
516 .iter()
517 .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
518 .collect();
519
520 let quiescence = CURRENT_SIM_CONNECTIONS.with(|connections| {
521 let connections = connections.borrow();
522 connections.quiescence.clone()
523 });
524
525 let mut launched = LaunchedSim {
526 async_dfirs: async_dfirs
527 .into_iter()
528 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
529 .collect(),
530 possibly_ready_ticks: vec![],
531 not_ready_ticks: tick_dfirs
532 .into_iter()
533 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
534 .collect(),
535 possibly_ready_observation: vec![],
536 not_ready_observation,
537 hooks: hooks
538 .into_iter()
539 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
540 .collect(),
541 inline_hooks: inline_hooks
542 .into_iter()
543 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
544 .collect(),
545 log: if self.log {
546 if let Some(w) = log_override {
547 LogKind::Custom(w)
548 } else {
549 LogKind::Stderr
550 }
551 } else {
552 LogKind::Null
553 },
554 quiescence,
555 };
556
557 async move { launched.scheduler().await }
558 }
559}
560
561impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
562 fn clone(&self) -> Self {
563 *self
564 }
565}
566
567impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
568
569impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
570 async fn with_stream<Out>(
571 &self,
572 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
573 ) -> Out {
574 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
575 let connections = connections.borrow();
576 let port = connections.external_registered.get(&self.0).unwrap();
577 (
578 connections.output_receivers.get(port).unwrap().clone(),
579 connections.quiescence.clone(),
580 )
581 });
582
583 let mut receiver_stream = receiver.lock().await;
584 let mut notified_fut = pin!(quiescence.notified());
585 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
586 use std::task::Poll;
587 match receiver_stream.poll_next_unpin(cx) {
588 Poll::Ready(Some(bytes)) => {
589 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
590 }
591 Poll::Ready(None) => return Poll::Ready(None),
592 Poll::Pending => {}
593 }
594 if quiescence.is_quiescent() {
595 return Poll::Ready(None);
596 }
597 let () = ready!(notified_fut.as_mut().poll(cx));
598 notified_fut.set(quiescence.notified());
599 Poll::Ready(None)
600 });
601 thunk(&mut pin!(&mut quiescence_aware)).await
602 }
603
604 pub fn assert_no_more(self) -> impl Future<Output = ()>
606 where
607 T: Debug,
608 {
609 FutureTrackingCaller {
610 future: async move {
611 self.with_stream(async |stream| {
612 if let Some(next) = stream.next().await {
613 return Err(format!(
614 "Stream yielded unexpected message: {:?}, expected termination",
615 next
616 ));
617 }
618 Ok(())
619 })
620 .await
621 },
622 }
623 }
624}
625
626impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
627 pub async fn next(&self) -> Option<T> {
630 self.with_stream(async |stream| stream.next().await).await
631 }
632
633 pub async fn collect<C: Default + Extend<T>>(self) -> C {
636 self.with_stream(async |stream| stream.collect().await)
637 .await
638 }
639
640 pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
643 &self,
644 expected: I,
645 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
646 where
647 T: Debug + PartialEq<T2>,
648 {
649 FutureTrackingCaller {
650 future: async {
651 let mut expected: VecDeque<T2> = expected.into_iter().collect();
652
653 while !expected.is_empty() {
654 if let Some(next) = self.next().await {
655 let next_expected = expected.pop_front().unwrap();
656 if next != next_expected {
657 return Err(format!(
658 "Stream yielded unexpected message: {:?}, expected: {:?}",
659 next, next_expected
660 ));
661 }
662 } else {
663 return Err(format!(
664 "Stream ended early, still expected: {:?}",
665 expected
666 ));
667 }
668 }
669
670 Ok(())
671 },
672 }
673 }
674
675 pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
678 &self,
679 expected: I,
680 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
681 where
682 T: Debug + PartialEq<T2>,
683 {
684 ChainedFuture {
685 first: self.assert_yields(expected),
686 second: self.assert_no_more(),
687 first_done: false,
688 }
689 }
690}
691
692pin_project_lite::pin_project! {
693 struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
703 #[pin]
704 future: F,
705 }
706}
707
708impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
709 type Output = ();
710
711 #[track_caller]
712 fn poll(
713 mut self: Pin<&mut Self>,
714 cx: &mut std::task::Context<'_>,
715 ) -> std::task::Poll<Self::Output> {
716 match ready!(self.as_mut().project().future.poll(cx)) {
717 Ok(()) => std::task::Poll::Ready(()),
718 Err(e) => panic!("{}", e),
719 }
720 }
721}
722
723pin_project_lite::pin_project! {
724 struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
728 #[pin]
729 first: F1,
730 #[pin]
731 second: F2,
732 first_done: bool,
733 }
734}
735
736impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
737 type Output = ();
738
739 #[track_caller]
740 fn poll(
741 mut self: Pin<&mut Self>,
742 cx: &mut std::task::Context<'_>,
743 ) -> std::task::Poll<Self::Output> {
744 if !self.first_done {
745 ready!(self.as_mut().project().first.poll(cx));
746 *self.as_mut().project().first_done = true;
747 }
748
749 self.as_mut().project().second.poll(cx)
750 }
751}
752
753impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
754 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
757 where
758 T: Ord,
759 {
760 self.with_stream(async |stream| {
761 let mut collected: C = stream.collect().await;
762 collected.as_mut().sort();
763 collected
764 })
765 .await
766 }
767
768 pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
771 &self,
772 expected: I,
773 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
774 where
775 T: Debug + PartialEq<T2>,
776 {
777 FutureTrackingCaller {
778 future: async {
779 self.with_stream(async |stream| {
780 let mut expected: Vec<T2> = expected.into_iter().collect();
781
782 while !expected.is_empty() {
783 if let Some(next) = stream.next().await {
784 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
785 if let Some((i, _)) = idx {
786 expected.swap_remove(i);
787 } else {
788 return Err(format!(
789 "Stream yielded unexpected message: {:?}",
790 next
791 ));
792 }
793 } else {
794 return Err(format!(
795 "Stream ended early, still expected: {:?}",
796 expected
797 ));
798 }
799 }
800
801 Ok(())
802 })
803 .await
804 },
805 }
806 }
807
808 pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
811 &self,
812 expected: I,
813 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
814 where
815 T: Debug + PartialEq<T2>,
816 {
817 ChainedFuture {
818 first: self.assert_yields_unordered(expected),
819 second: self.assert_no_more(),
820 first_done: false,
821 }
822 }
823}
824
825impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
826 fn with_sink<Out>(
827 &self,
828 thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
829 ) -> Out {
830 let (sender, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
831 let connections = connections.borrow();
832 (
833 connections
834 .input_senders
835 .get(connections.external_registered.get(&self.0).unwrap())
836 .unwrap()
837 .clone(),
838 connections.quiescence.clone(),
839 )
840 });
841
842 thunk(&move |t| {
843 let res = sender.send(bincode::serialize(&t).unwrap().into());
844 quiescence.resume();
845 res
846 })
847 }
848}
849
850impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
851 pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
854 self.with_sink(|send| {
855 for t in iter {
856 send(t).unwrap();
857 }
858 })
859 }
860}
861
862impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
863 pub fn send(&self, t: T) {
866 self.with_sink(|send| send(t)).unwrap();
867 }
868
869 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
872 self.with_sink(|send| {
873 for t in iter {
874 send(t).unwrap();
875 }
876 })
877 }
878}
879
880impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
881 for SimClusterReceiver<T, O, R>
882{
883 fn clone(&self) -> Self {
884 *self
885 }
886}
887
888impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
889 for SimClusterReceiver<T, O, R>
890{
891}
892
893impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
894 async fn with_member_stream<Out>(
895 &self,
896 member_id: u32,
897 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
898 ) -> Out {
899 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
900 let connections = connections.borrow();
901 let port = connections.external_registered.get(&self.0).unwrap();
902 let receivers = connections.cluster_output_receivers.get(port).unwrap();
903 (
904 receivers[&member_id].clone(),
905 connections.quiescence.clone(),
906 )
907 });
908
909 let mut lock = receiver.lock().await;
910 let mut notified_fut = pin!(quiescence.notified());
911 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
912 use std::task::Poll;
913 match lock.poll_next_unpin(cx) {
914 Poll::Ready(Some(bytes)) => {
915 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
916 }
917 Poll::Ready(None) => return Poll::Ready(None),
918 Poll::Pending => {}
919 }
920 if quiescence.is_quiescent() {
921 return Poll::Ready(None);
922 }
923 let () = ready!(notified_fut.as_mut().poll(cx));
924 notified_fut.set(quiescence.notified());
925 Poll::Ready(None)
926 });
927 thunk(&mut pin!(&mut quiescence_aware)).await
928 }
929}
930
931impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
932 pub async fn next(&self, member_id: u32) -> Option<T> {
934 self.with_member_stream(member_id, async |stream| stream.next().await)
935 .await
936 }
937
938 pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
940 self.with_member_stream(member_id, async |stream| stream.collect().await)
941 .await
942 }
943}
944
945impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
946 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
948 where
949 T: Ord,
950 {
951 self.with_member_stream(member_id, async |stream| {
952 let mut collected: C = stream.collect().await;
953 collected.as_mut().sort();
954 collected
955 })
956 .await
957 }
958}
959
960impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
961 fn with_sink<Out>(
962 &self,
963 thunk: impl FnOnce(
964 &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
965 ) -> Out,
966 ) -> Out {
967 let (senders, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
968 let connections = connections.borrow();
969 (
970 connections
971 .cluster_input_senders
972 .get(connections.external_registered.get(&self.0).unwrap())
973 .unwrap()
974 .clone(),
975 connections.quiescence.clone(),
976 )
977 });
978
979 thunk(&move |member_id: u32, t: T| {
980 let payload = bincode::serialize(&t).unwrap();
981 let res = senders[&member_id].send(Bytes::from(payload));
982 quiescence.resume();
983 res
984 })
985 }
986}
987
988impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
989 pub fn send(&self, member_id: u32, t: T) {
991 self.with_sink(|send| send(member_id, t)).unwrap();
992 }
993
994 pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
996 self.with_sink(|send| {
997 for (member_id, t) in iter {
998 send(member_id, t).unwrap();
999 }
1000 })
1001 }
1002}
1003
1004enum LogKind<W: std::io::Write> {
1005 Null,
1006 Stderr,
1007 Custom(W),
1008}
1009
1010impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
1012 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
1013 match self {
1014 LogKind::Null => Ok(()),
1015 LogKind::Stderr => {
1016 eprint!("{}", s);
1017 Ok(())
1018 }
1019 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
1020 }
1021 }
1022}
1023
1024struct LaunchedSim<W: std::io::Write> {
1036 async_dfirs: Vec<(LocationId, Option<u32>, DfirErased)>,
1039 possibly_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1042 not_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1044 possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
1048 not_ready_observation: Vec<(LocationId, Option<u32>)>,
1050 hooks: Hooks<LocationId>,
1053 inline_hooks: InlineHooks<LocationId>,
1057 log: LogKind<W>,
1058 quiescence: Rc<QuiescenceState>,
1060}
1061
1062impl<W: std::io::Write> LaunchedSim<W> {
1063 async fn scheduler(&mut self) {
1064 loop {
1065 tokio::task::yield_now().await;
1066 let mut any_made_progress = false;
1067 for (loc, c_id, dfir) in &mut self.async_dfirs {
1068 if dfir.run_tick().await {
1069 any_made_progress = true;
1070 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
1071 .not_ready_ticks
1072 .drain(..)
1073 .partition(|(tick_loc, tick_c_id, _)| {
1074 let LocationId::Tick(_, outer) = tick_loc else {
1075 unreachable!()
1076 };
1077 outer.as_ref() == loc && tick_c_id == c_id
1078 });
1079
1080 self.possibly_ready_ticks.extend(now_ready);
1081 self.not_ready_ticks.extend(still_not_ready);
1082
1083 let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
1084 .not_ready_observation
1085 .drain(..)
1086 .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
1087
1088 self.possibly_ready_observation.extend(now_ready_obs);
1089 self.not_ready_observation.extend(still_not_ready_obs);
1090 }
1091 }
1092
1093 if any_made_progress {
1094 continue;
1095 } else {
1096 use bolero::generator::*;
1097
1098 let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
1099 .possibly_ready_ticks
1100 .drain(..)
1101 .partition(|(name, cid, _)| {
1102 let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1103 hooks.iter().all(|hook| hook.is_ready())
1105 && hooks.iter().any(|hook| {
1107 hook.current_decision().unwrap_or(false)
1108 || hook.can_make_nontrivial_decision()
1109 })
1110 });
1111
1112 self.possibly_ready_ticks = ready_tick;
1113 self.not_ready_ticks.append(&mut not_ready_tick);
1114
1115 let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
1116 .possibly_ready_observation
1117 .drain(..)
1118 .partition(|(name, cid)| {
1119 self.hooks
1120 .get(&(name.clone(), *cid))
1121 .into_iter()
1122 .flatten()
1123 .any(|hook| {
1124 hook.current_decision().unwrap_or(false)
1125 || hook.can_make_nontrivial_decision()
1126 })
1127 });
1128
1129 self.possibly_ready_observation = ready_obs;
1130 self.not_ready_observation.append(&mut not_ready_obs);
1131
1132 if self.possibly_ready_ticks.is_empty()
1133 && self.possibly_ready_observation.is_empty()
1134 {
1135 for (name, cid, _) in &self.not_ready_ticks {
1138 let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1139 assert!(
1140 hooks.iter().all(|hook| hook.is_ready()),
1141 "Simulator bug: tick has a hook that never became ready"
1142 );
1143 }
1144
1145 self.quiescence.wait_for_resume().await;
1147 } else {
1148 let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1149 + self.possibly_ready_observation.len()))
1150 .any();
1151
1152 if next_tick_or_obs < self.possibly_ready_ticks.len() {
1153 let next_tick = next_tick_or_obs;
1154 let mut removed = self.possibly_ready_ticks.remove(next_tick);
1155
1156 match &mut self.log {
1157 LogKind::Null => {}
1158 LogKind::Stderr => {
1159 if let Some(cid) = &removed.1 {
1160 eprintln!(
1161 "\n{}",
1162 format!("Running Tick (Cluster Member {})", cid)
1163 .color(colored::Color::Magenta)
1164 .bold()
1165 )
1166 } else {
1167 eprintln!(
1168 "\n{}",
1169 "Running Tick".color(colored::Color::Magenta).bold()
1170 )
1171 }
1172 }
1173 LogKind::Custom(writer) => {
1174 writeln!(
1175 writer,
1176 "\n{}",
1177 "Running Tick".color(colored::Color::Magenta).bold()
1178 )
1179 .unwrap();
1180 }
1181 }
1182
1183 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1184 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1185 write.write_str(" ")
1186 };
1187
1188 let mut tick_decision_writer = indenter::indented(&mut self.log)
1189 .with_format(indenter::Format::Custom {
1190 inserter: &mut asterisk_indenter,
1191 });
1192
1193 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1194 run_hooks(&mut tick_decision_writer, hooks);
1195
1196 let run_tick_future = removed.2.run_tick();
1197 if let Some(inline_hooks) =
1198 self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1199 {
1200 let mut run_tick_future_pinned = pin!(run_tick_future);
1201
1202 loop {
1203 tokio::select! {
1204 biased;
1205 r = &mut run_tick_future_pinned => {
1206 assert!(r);
1207 break;
1208 }
1209 _ = async {} => {
1210 bolero_generator::any::scope::borrow_with(|driver| {
1211 for hook in inline_hooks.iter_mut() {
1212 if hook.pending_decision() {
1213 if !hook.has_decision() {
1214 hook.autonomous_decision(driver);
1215 }
1216
1217 hook.release_decision(&mut tick_decision_writer);
1218 }
1219 }
1220 });
1221 }
1222 }
1223 }
1224 } else {
1225 assert!(run_tick_future.await);
1226 }
1227
1228 self.possibly_ready_ticks.push(removed);
1229 } else {
1230 let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1231 let mut default_hooks = vec![];
1232 let hooks = self
1233 .hooks
1234 .get_mut(&self.possibly_ready_observation[next_obs])
1235 .unwrap_or(&mut default_hooks);
1236
1237 run_hooks(&mut self.log, hooks);
1238 }
1239 }
1240 }
1241 }
1242 }
1243}
1244
1245fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1246 let mut remaining_decision_count = hooks.len();
1247 let mut made_nontrivial_decision = false;
1248
1249 bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1250 hooks.iter_mut().for_each(|hook| {
1252 if let Some(is_nontrivial) = hook.current_decision() {
1253 made_nontrivial_decision |= is_nontrivial;
1254 remaining_decision_count -= 1;
1255 } else if !hook.can_make_nontrivial_decision() {
1256 hook.autonomous_decision(driver, false);
1260 remaining_decision_count -= 1;
1261 }
1262 });
1263
1264 hooks.iter_mut().for_each(|hook| {
1265 if hook.current_decision().is_none() {
1266 made_nontrivial_decision |= hook.autonomous_decision(
1267 driver,
1268 !made_nontrivial_decision && remaining_decision_count == 1,
1269 );
1270 remaining_decision_count -= 1;
1271 }
1272
1273 hook.release_decision(tick_decision_writer);
1274 });
1275 });
1276}