1use core::{fmt, panic};
4use std::cell::RefCell;
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::graph::Dfir;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::Mutex;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31use crate::sim::runtime::SimHook;
32
33struct SimConnections {
34 input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
35 output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
36 cluster_input_senders: HashMap<SimExternalPort, Vec<Rc<UnboundedSender<Bytes>>>>,
37 cluster_output_receivers:
38 HashMap<SimExternalPort, Vec<Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
39 external_registered: HashMap<ExternalPortId, SimExternalPort>,
40}
41
42tokio::task_local! {
43 static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
44}
45
46pub struct CompiledSim {
48 pub(super) _path: TempPath,
49 pub(super) lib: Library,
50 pub(super) externals_port_registry: SimExternalPortRegistry,
51}
52
53#[sealed::sealed]
54pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
58#[sealed::sealed]
59impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
60
61fn null_handler(_args: fmt::Arguments) {}
62
63fn println_handler(args: fmt::Arguments) {
64 println!("{}", args);
65}
66
67fn eprintln_handler(args: fmt::Arguments) {
68 eprintln!("{}", args);
69}
70
71type SimLoaded<'a> = libloading::Symbol<
77 'a,
78 unsafe extern "Rust" fn(
79 should_color: bool,
80 external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
81 external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
82 cluster_external_out: &mut HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>>,
83 cluster_external_in: &mut HashMap<usize, Vec<UnboundedSender<Bytes>>>,
84 println_handler: fn(fmt::Arguments<'_>),
85 eprintln_handler: fn(fmt::Arguments<'_>),
86 ) -> (
87 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
88 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
89 Hooks<&'static str>,
90 InlineHooks<&'static str>,
91 ),
92>;
93
94impl CompiledSim {
95 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
97 self.with_instantiator(|instantiator| thunk(instantiator()), true)
98 }
99
100 pub fn with_instantiator<T>(
108 &self,
109 thunk: impl FnOnce(&dyn Instantiator) -> T,
110 always_log: bool,
111 ) -> T {
112 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
113 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
114 thunk(
115 &(|| CompiledSimInstance {
116 func: func.clone(),
117 externals_port_registry: self.externals_port_registry.clone(),
118 dylib_result: None,
119 log,
120 }),
121 )
122 }
123
124 pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
135 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
136 .elements()
137 .into_iter()
138 .find(|e| {
139 !e.fn_name.starts_with("hydro_lang::sim::compiled")
140 && !e.fn_name.starts_with("hydro_lang::sim::flow")
141 && !e.fn_name.starts_with("fuzz<")
142 && !e.fn_name.starts_with("<hydro_lang::sim")
143 })
144 .unwrap();
145
146 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
147 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
148
149 let caller_fuzz_repro_path = repro_folder
150 .join(caller_fn.fn_name.replace("::", "__"))
151 .with_extension("bin");
152
153 if std::env::var("BOLERO_FUZZER").is_ok() {
154 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
155 std::fs::create_dir_all(&corpus_dir).unwrap();
156 let libfuzzer_args = format!(
157 "{} {} -artifact_prefix={}/ -handle_abrt=0",
158 corpus_dir.to_str().unwrap(),
159 corpus_dir.to_str().unwrap(),
160 corpus_dir.to_str().unwrap(),
161 );
162
163 std::fs::create_dir_all(&repro_folder).unwrap();
164
165 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
166 unsafe {
167 std::env::set_var(
168 "BOLERO_FAILURE_OUTPUT",
169 caller_fuzz_repro_path.to_str().unwrap(),
170 );
171 }
172 }
173
174 unsafe {
175 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
176 }
177
178 self.with_instantiator(
179 |instantiator| {
180 bolero::test(bolero::TargetLocation {
181 package_name: "",
182 manifest_dir: "",
183 module_path: "",
184 file: "",
185 line: 0,
186 item_path: "<unknown>::__bolero_item_path__",
187 test_name: None,
188 })
189 .run_with_replay(move |is_replay| {
190 let mut instance = instantiator();
191
192 if instance.log {
193 eprintln!(
194 "{}",
195 "\n==== New Simulation Instance ===="
196 .color(colored::Color::Cyan)
197 .bold()
198 );
199 }
200
201 if is_replay {
202 instance.log = true;
203 }
204
205 tokio::runtime::Builder::new_current_thread()
206 .build()
207 .unwrap()
208 .block_on(async { instance.run(&mut thunk).await })
209 })
210 },
211 false,
212 );
213 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
214 self.fuzz_repro(existing_bytes, async |compiled| {
215 compiled.launch();
216 thunk().await
217 });
218 } else {
219 eprintln!(
220 "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
221 caller_fuzz_repro_path.display()
222 );
223 self.with_instantiator(
224 |instantiator| {
225 bolero::test(bolero::TargetLocation {
226 package_name: "",
227 manifest_dir: "",
228 module_path: "",
229 file: ".",
230 line: 0,
231 item_path: "<unknown>::__bolero_item_path__",
232 test_name: None,
233 })
234 .with_iterations(8192)
235 .run(move || {
236 let instance = instantiator();
237 tokio::runtime::Builder::new_current_thread()
238 .build()
239 .unwrap()
240 .block_on(async { instance.run(&mut thunk).await })
241 })
242 },
243 false,
244 );
245 }
246 }
247
248 pub fn fuzz_repro<'a>(
252 &'a self,
253 bytes: Vec<u8>,
254 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
255 ) {
256 self.with_instance(|instance| {
257 bolero::bolero_engine::any::scope::with(
258 Box::new(bolero::bolero_engine::driver::object::Object(
259 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
260 )),
261 || {
262 tokio::runtime::Builder::new_current_thread()
263 .build()
264 .unwrap()
265 .block_on(async { instance.run_without_launching(thunk).await })
266 },
267 )
268 });
269 }
270
271 pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
282 if std::env::var("BOLERO_FUZZER").is_ok() {
283 eprintln!(
284 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
285 );
286 std::process::abort();
287 }
288
289 let mut count = 0;
290 let count_mut = &mut count;
291
292 self.with_instantiator(
293 |instantiator| {
294 bolero::test(bolero::TargetLocation {
295 package_name: "",
296 manifest_dir: "",
297 module_path: "",
298 file: "",
299 line: 0,
300 item_path: "<unknown>::__bolero_item_path__",
301 test_name: None,
302 })
303 .exhaustive()
304 .run_with_replay(move |is_replay| {
305 *count_mut += 1;
306
307 let mut instance = instantiator();
308 if instance.log {
309 eprintln!(
310 "{}",
311 "\n==== New Simulation Instance ===="
312 .color(colored::Color::Cyan)
313 .bold()
314 );
315 }
316
317 if is_replay {
318 instance.log = true;
319 }
320
321 tokio::runtime::Builder::new_current_thread()
322 .build()
323 .unwrap()
324 .block_on(async { instance.run(&mut thunk).await })
325 })
326 },
327 false,
328 );
329
330 count
331 }
332}
333
334type DylibResult = (
336 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
337 Vec<(&'static str, Option<u32>, Dfir<'static>)>,
338 Hooks<&'static str>,
339 InlineHooks<&'static str>,
340);
341
342pub struct CompiledSimInstance<'a> {
345 func: SimLoaded<'a>,
346 externals_port_registry: SimExternalPortRegistry,
347 dylib_result: Option<DylibResult>,
348 log: bool,
349}
350
351impl<'a> CompiledSimInstance<'a> {
352 async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
353 self.run_without_launching(async |instance| {
354 instance.launch();
355 thunk().await;
356 })
357 .await;
358 }
359
360 async fn run_without_launching(
361 mut self,
362 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
363 ) {
364 let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
365 let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
366 let mut cluster_external_out: HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>> =
367 HashMap::new();
368 let mut cluster_external_in: HashMap<usize, Vec<UnboundedSender<Bytes>>> = HashMap::new();
369
370 let dylib_result = unsafe {
371 (self.func)(
372 colored::control::SHOULD_COLORIZE.should_colorize(),
373 &mut external_out,
374 &mut external_in,
375 &mut cluster_external_out,
376 &mut cluster_external_in,
377 if self.log {
378 println_handler
379 } else {
380 null_handler
381 },
382 if self.log {
383 eprintln_handler
384 } else {
385 null_handler
386 },
387 )
388 };
389
390 let registered = &self.externals_port_registry.registered;
391
392 let mut input_senders = HashMap::new();
393 let mut output_receivers = HashMap::new();
394 let mut cluster_input_senders = HashMap::new();
395 let mut cluster_output_receivers = HashMap::new();
396
397 #[expect(
398 clippy::disallowed_methods,
399 reason = "inserts into maps also unordered"
400 )]
401 for sim_port in registered.values() {
402 let usize_key = sim_port.into_inner();
403 if let Some(sender) = external_in.remove(&usize_key) {
404 input_senders.insert(*sim_port, Rc::new(sender));
405 }
406 if let Some(receiver) = external_out.remove(&usize_key) {
407 output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
408 }
409 if let Some(senders) = cluster_external_in.remove(&usize_key) {
410 cluster_input_senders.insert(*sim_port, senders.into_iter().map(Rc::new).collect());
411 }
412 if let Some(receivers) = cluster_external_out.remove(&usize_key) {
413 cluster_output_receivers.insert(
414 *sim_port,
415 receivers
416 .into_iter()
417 .map(|r| Rc::new(Mutex::new(r)))
418 .collect(),
419 );
420 }
421 }
422
423 self.dylib_result = Some(dylib_result);
424
425 let local_set = tokio::task::LocalSet::new();
426 local_set
427 .run_until(CURRENT_SIM_CONNECTIONS.scope(
428 RefCell::new(SimConnections {
429 input_senders,
430 output_receivers,
431 cluster_input_senders,
432 cluster_output_receivers,
433 external_registered: self.externals_port_registry.registered.clone(),
434 }),
435 async move {
436 thunk(self).await;
437 },
438 ))
439 .await;
440 }
441
442 fn launch(self) {
445 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
446 }
447
448 pub fn schedule_with_logger<W: std::io::Write>(
451 self,
452 log_writer: W,
453 ) -> impl use<W> + Future<Output = ()> {
454 self.schedule_with_maybe_logger(Some(log_writer))
455 }
456
457 fn schedule_with_maybe_logger<W: std::io::Write>(
458 mut self,
459 log_override: Option<W>,
460 ) -> impl use<W> + Future<Output = ()> {
461 let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
462
463 let not_ready_observation = async_dfirs
464 .iter()
465 .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
466 .collect();
467
468 let mut launched = LaunchedSim {
469 async_dfirs: async_dfirs
470 .into_iter()
471 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
472 .collect(),
473 possibly_ready_ticks: vec![],
474 not_ready_ticks: tick_dfirs
475 .into_iter()
476 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
477 .collect(),
478 possibly_ready_observation: vec![],
479 not_ready_observation,
480 hooks: hooks
481 .into_iter()
482 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
483 .collect(),
484 inline_hooks: inline_hooks
485 .into_iter()
486 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
487 .collect(),
488 log: if self.log {
489 if let Some(w) = log_override {
490 LogKind::Custom(w)
491 } else {
492 LogKind::Stderr
493 }
494 } else {
495 LogKind::Null
496 },
497 };
498
499 async move { launched.scheduler().await }
500 }
501}
502
503impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
504 fn clone(&self) -> Self {
505 *self
506 }
507}
508
509impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
510
511impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
512 async fn with_stream<Out>(
513 &self,
514 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
515 ) -> Out {
516 let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
517 let connections = &mut *connections.borrow_mut();
518 connections
519 .output_receivers
520 .get(connections.external_registered.get(&self.0).unwrap())
521 .unwrap()
522 .clone()
523 });
524
525 let mut receiver_stream = receiver.lock().await;
526 thunk(&mut pin!(
527 &mut receiver_stream
528 .by_ref()
529 .map(|b| bincode::deserialize(&b).unwrap())
530 ))
531 .await
532 }
533
534 pub fn assert_no_more(self) -> impl Future<Output = ()>
536 where
537 T: Debug,
538 {
539 FutureTrackingCaller {
540 future: async move {
541 self.with_stream(async |stream| {
542 if let Some(next) = stream.next().await {
543 return Err(format!(
544 "Stream yielded unexpected message: {:?}, expected termination",
545 next
546 ));
547 }
548 Ok(())
549 })
550 .await
551 },
552 }
553 }
554}
555
556impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
557 pub async fn next(&self) -> Option<T> {
560 self.with_stream(async |stream| stream.next().await).await
561 }
562
563 pub async fn collect<C: Default + Extend<T>>(self) -> C {
566 self.with_stream(async |stream| stream.collect().await)
567 .await
568 }
569
570 pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
573 &self,
574 expected: I,
575 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
576 where
577 T: Debug + PartialEq<T2>,
578 {
579 FutureTrackingCaller {
580 future: async {
581 let mut expected: VecDeque<T2> = expected.into_iter().collect();
582
583 while !expected.is_empty() {
584 if let Some(next) = self.next().await {
585 let next_expected = expected.pop_front().unwrap();
586 if next != next_expected {
587 return Err(format!(
588 "Stream yielded unexpected message: {:?}, expected: {:?}",
589 next, next_expected
590 ));
591 }
592 } else {
593 return Err(format!(
594 "Stream ended early, still expected: {:?}",
595 expected
596 ));
597 }
598 }
599
600 Ok(())
601 },
602 }
603 }
604
605 pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
608 &self,
609 expected: I,
610 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
611 where
612 T: Debug + PartialEq<T2>,
613 {
614 ChainedFuture {
615 first: self.assert_yields(expected),
616 second: self.assert_no_more(),
617 first_done: false,
618 }
619 }
620}
621
622pin_project_lite::pin_project! {
623 struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
633 #[pin]
634 future: F,
635 }
636}
637
638impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
639 type Output = ();
640
641 #[track_caller]
642 fn poll(
643 mut self: Pin<&mut Self>,
644 cx: &mut std::task::Context<'_>,
645 ) -> std::task::Poll<Self::Output> {
646 match ready!(self.as_mut().project().future.poll(cx)) {
647 Ok(()) => std::task::Poll::Ready(()),
648 Err(e) => panic!("{}", e),
649 }
650 }
651}
652
653pin_project_lite::pin_project! {
654 struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
658 #[pin]
659 first: F1,
660 #[pin]
661 second: F2,
662 first_done: bool,
663 }
664}
665
666impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
667 type Output = ();
668
669 #[track_caller]
670 fn poll(
671 mut self: Pin<&mut Self>,
672 cx: &mut std::task::Context<'_>,
673 ) -> std::task::Poll<Self::Output> {
674 if !self.first_done {
675 ready!(self.as_mut().project().first.poll(cx));
676 *self.as_mut().project().first_done = true;
677 }
678
679 self.as_mut().project().second.poll(cx)
680 }
681}
682
683impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
684 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
687 where
688 T: Ord,
689 {
690 self.with_stream(async |stream| {
691 let mut collected: C = stream.collect().await;
692 collected.as_mut().sort();
693 collected
694 })
695 .await
696 }
697
698 pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
701 &self,
702 expected: I,
703 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
704 where
705 T: Debug + PartialEq<T2>,
706 {
707 FutureTrackingCaller {
708 future: async {
709 self.with_stream(async |stream| {
710 let mut expected: Vec<T2> = expected.into_iter().collect();
711
712 while !expected.is_empty() {
713 if let Some(next) = stream.next().await {
714 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
715 if let Some((i, _)) = idx {
716 expected.swap_remove(i);
717 } else {
718 return Err(format!(
719 "Stream yielded unexpected message: {:?}",
720 next
721 ));
722 }
723 } else {
724 return Err(format!(
725 "Stream ended early, still expected: {:?}",
726 expected
727 ));
728 }
729 }
730
731 Ok(())
732 })
733 .await
734 },
735 }
736 }
737
738 pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
741 &self,
742 expected: I,
743 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
744 where
745 T: Debug + PartialEq<T2>,
746 {
747 ChainedFuture {
748 first: self.assert_yields_unordered(expected),
749 second: self.assert_no_more(),
750 first_done: false,
751 }
752 }
753}
754
755impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
756 fn with_sink<Out>(
757 &self,
758 thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
759 ) -> Out {
760 let sender = CURRENT_SIM_CONNECTIONS.with(|connections| {
761 let connections = &mut *connections.borrow_mut();
762 connections
763 .input_senders
764 .get(connections.external_registered.get(&self.0).unwrap())
765 .unwrap()
766 .clone()
767 });
768
769 thunk(&move |t| sender.send(bincode::serialize(&t).unwrap().into()))
770 }
771}
772
773impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
774 pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
777 self.with_sink(|send| {
778 for t in iter {
779 send(t).unwrap();
780 }
781 })
782 }
783}
784
785impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
786 pub fn send(&self, t: T) {
789 self.with_sink(|send| send(t)).unwrap();
790 }
791
792 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
795 self.with_sink(|send| {
796 for t in iter {
797 send(t).unwrap();
798 }
799 })
800 }
801}
802
803impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
804 for SimClusterReceiver<T, O, R>
805{
806 fn clone(&self) -> Self {
807 *self
808 }
809}
810
811impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
812 for SimClusterReceiver<T, O, R>
813{
814}
815
816impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
817 async fn with_member_stream<Out>(
818 &self,
819 member_id: u32,
820 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
821 ) -> Out {
822 let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
823 let connections = &mut *connections.borrow_mut();
824 let receivers = connections
825 .cluster_output_receivers
826 .get(connections.external_registered.get(&self.0).unwrap())
827 .unwrap();
828 receivers[member_id as usize].clone()
829 });
830
831 let mut lock = receiver.lock().await;
832 thunk(&mut pin!(
833 lock.by_ref().map(|b| bincode::deserialize(&b).unwrap())
834 ))
835 .await
836 }
837}
838
839impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
840 pub async fn next(&self, member_id: u32) -> Option<T> {
842 self.with_member_stream(member_id, async |stream| stream.next().await)
843 .await
844 }
845
846 pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
848 self.with_member_stream(member_id, async |stream| stream.collect().await)
849 .await
850 }
851}
852
853impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
854 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
856 where
857 T: Ord,
858 {
859 self.with_member_stream(member_id, async |stream| {
860 let mut collected: C = stream.collect().await;
861 collected.as_mut().sort();
862 collected
863 })
864 .await
865 }
866}
867
868impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
869 fn with_sink<Out>(
870 &self,
871 thunk: impl FnOnce(
872 &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
873 ) -> Out,
874 ) -> Out {
875 let senders = CURRENT_SIM_CONNECTIONS.with(|connections| {
876 let connections = &mut *connections.borrow_mut();
877 connections
878 .cluster_input_senders
879 .get(connections.external_registered.get(&self.0).unwrap())
880 .unwrap()
881 .clone()
882 });
883
884 thunk(&move |member_id: u32, t: T| {
885 let payload = bincode::serialize(&t).unwrap();
886 senders[member_id as usize].send(Bytes::from(payload))
887 })
888 }
889}
890
891impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
892 pub fn send(&self, member_id: u32, t: T) {
894 self.with_sink(|send| send(member_id, t)).unwrap();
895 }
896
897 pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
899 self.with_sink(|send| {
900 for (member_id, t) in iter {
901 send(member_id, t).unwrap();
902 }
903 })
904 }
905}
906
907enum LogKind<W: std::io::Write> {
908 Null,
909 Stderr,
910 Custom(W),
911}
912
913impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
915 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
916 match self {
917 LogKind::Null => Ok(()),
918 LogKind::Stderr => {
919 eprint!("{}", s);
920 Ok(())
921 }
922 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
923 }
924 }
925}
926
927struct LaunchedSim<W: std::io::Write> {
930 async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
931 possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
932 not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
933 possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
934 not_ready_observation: Vec<(LocationId, Option<u32>)>,
935 hooks: Hooks<LocationId>,
936 inline_hooks: InlineHooks<LocationId>,
937 log: LogKind<W>,
938}
939
940impl<W: std::io::Write> LaunchedSim<W> {
941 async fn scheduler(&mut self) {
942 loop {
943 tokio::task::yield_now().await;
944 let mut any_made_progress = false;
945 for (loc, c_id, dfir) in &mut self.async_dfirs {
946 if dfir.run_tick().await {
947 any_made_progress = true;
948 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
949 .not_ready_ticks
950 .drain(..)
951 .partition(|(tick_loc, tick_c_id, _)| {
952 let LocationId::Tick(_, outer) = tick_loc else {
953 unreachable!()
954 };
955 outer.as_ref() == loc && tick_c_id == c_id
956 });
957
958 self.possibly_ready_ticks.extend(now_ready);
959 self.not_ready_ticks.extend(still_not_ready);
960
961 let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
962 .not_ready_observation
963 .drain(..)
964 .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
965
966 self.possibly_ready_observation.extend(now_ready_obs);
967 self.not_ready_observation.extend(still_not_ready_obs);
968 }
969 }
970
971 if any_made_progress {
972 continue;
973 } else {
974 use bolero::generator::*;
975
976 let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
977 .possibly_ready_ticks
978 .drain(..)
979 .partition(|(name, cid, _)| {
980 self.hooks
981 .get(&(name.clone(), *cid))
982 .unwrap()
983 .iter()
984 .any(|hook| {
985 hook.current_decision().unwrap_or(false)
986 || hook.can_make_nontrivial_decision()
987 })
988 });
989
990 self.possibly_ready_ticks = ready_tick;
991 self.not_ready_ticks.append(&mut not_ready_tick);
992
993 let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
994 .possibly_ready_observation
995 .drain(..)
996 .partition(|(name, cid)| {
997 self.hooks
998 .get(&(name.clone(), *cid))
999 .into_iter()
1000 .flatten()
1001 .any(|hook| {
1002 hook.current_decision().unwrap_or(false)
1003 || hook.can_make_nontrivial_decision()
1004 })
1005 });
1006
1007 self.possibly_ready_observation = ready_obs;
1008 self.not_ready_observation.append(&mut not_ready_obs);
1009
1010 if self.possibly_ready_ticks.is_empty()
1011 && self.possibly_ready_observation.is_empty()
1012 {
1013 break;
1014 } else {
1015 let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1016 + self.possibly_ready_observation.len()))
1017 .any();
1018
1019 if next_tick_or_obs < self.possibly_ready_ticks.len() {
1020 let next_tick = next_tick_or_obs;
1021 let mut removed = self.possibly_ready_ticks.remove(next_tick);
1022
1023 match &mut self.log {
1024 LogKind::Null => {}
1025 LogKind::Stderr => {
1026 if let Some(cid) = &removed.1 {
1027 eprintln!(
1028 "\n{}",
1029 format!("Running Tick (Cluster Member {})", cid)
1030 .color(colored::Color::Magenta)
1031 .bold()
1032 )
1033 } else {
1034 eprintln!(
1035 "\n{}",
1036 "Running Tick".color(colored::Color::Magenta).bold()
1037 )
1038 }
1039 }
1040 LogKind::Custom(writer) => {
1041 writeln!(
1042 writer,
1043 "\n{}",
1044 "Running Tick".color(colored::Color::Magenta).bold()
1045 )
1046 .unwrap();
1047 }
1048 }
1049
1050 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1051 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1052 write.write_str(" ")
1053 };
1054
1055 let mut tick_decision_writer = indenter::indented(&mut self.log)
1056 .with_format(indenter::Format::Custom {
1057 inserter: &mut asterisk_indenter,
1058 });
1059
1060 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1061 run_hooks(&mut tick_decision_writer, hooks);
1062
1063 let run_tick_future = removed.2.run_tick();
1064 if let Some(inline_hooks) =
1065 self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1066 {
1067 let mut run_tick_future_pinned = pin!(run_tick_future);
1068
1069 loop {
1070 tokio::select! {
1071 biased;
1072 r = &mut run_tick_future_pinned => {
1073 assert!(r);
1074 break;
1075 }
1076 _ = async {} => {
1077 bolero_generator::any::scope::borrow_with(|driver| {
1078 for hook in inline_hooks.iter_mut() {
1079 if hook.pending_decision() {
1080 if !hook.has_decision() {
1081 hook.autonomous_decision(driver);
1082 }
1083
1084 hook.release_decision(&mut tick_decision_writer);
1085 }
1086 }
1087 });
1088 }
1089 }
1090 }
1091 } else {
1092 assert!(run_tick_future.await);
1093 }
1094
1095 self.possibly_ready_ticks.push(removed);
1096 } else {
1097 let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1098 let mut default_hooks = vec![];
1099 let hooks = self
1100 .hooks
1101 .get_mut(&self.possibly_ready_observation[next_obs])
1102 .unwrap_or(&mut default_hooks);
1103
1104 run_hooks(&mut self.log, hooks);
1105 }
1106 }
1107 }
1108 }
1109 }
1110}
1111
1112fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1113 let mut remaining_decision_count = hooks.len();
1114 let mut made_nontrivial_decision = false;
1115
1116 bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1117 hooks.iter_mut().for_each(|hook| {
1119 if let Some(is_nontrivial) = hook.current_decision() {
1120 made_nontrivial_decision |= is_nontrivial;
1121 remaining_decision_count -= 1;
1122 } else if !hook.can_make_nontrivial_decision() {
1123 hook.autonomous_decision(driver, false);
1127 remaining_decision_count -= 1;
1128 }
1129 });
1130
1131 hooks.iter_mut().for_each(|hook| {
1132 if hook.current_decision().is_none() {
1133 made_nontrivial_decision |= hook.autonomous_decision(
1134 driver,
1135 !made_nontrivial_decision && remaining_decision_count == 1,
1136 );
1137 remaining_decision_count -= 1;
1138 }
1139
1140 hook.release_decision(tick_decision_writer);
1141 });
1142 });
1143}