Skip to main content

hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::{fmt, panic};
4use std::cell::RefCell;
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::graph::Dfir;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::Mutex;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31use crate::sim::runtime::SimHook;
32
33struct SimConnections {
34    input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
35    output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
36    cluster_input_senders: HashMap<SimExternalPort, Vec<Rc<UnboundedSender<Bytes>>>>,
37    cluster_output_receivers:
38        HashMap<SimExternalPort, Vec<Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
39    external_registered: HashMap<ExternalPortId, SimExternalPort>,
40}
41
42tokio::task_local! {
43    static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
44}
45
46/// A handle to a compiled Hydro simulation, which can be instantiated and run.
47pub struct CompiledSim {
48    pub(super) _path: TempPath,
49    pub(super) lib: Library,
50    pub(super) externals_port_registry: SimExternalPortRegistry,
51}
52
53#[sealed::sealed]
54/// A trait implemented by closures that can instantiate a compiled simulation.
55///
56/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
57pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
58#[sealed::sealed]
59impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
60
61fn null_handler(_args: fmt::Arguments) {}
62
63fn println_handler(args: fmt::Arguments) {
64    println!("{}", args);
65}
66
67fn eprintln_handler(args: fmt::Arguments) {
68    eprintln!("{}", args);
69}
70
71/// Creates a simulation instance, returning:
72/// - A list of async DFIRs to run (all process / cluster logic outside a tick)
73/// - A list of tick DFIRs to run (where the &'static str is for the tick location id)
74/// - A mapping of hooks for non-deterministic decisions at tick-input boundaries
75/// - A mapping of inline hooks for non-deterministic decisions inside ticks
76type SimLoaded<'a> = libloading::Symbol<
77    'a,
78    unsafe extern "Rust" fn(
79        should_color: bool,
80        external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
81        external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
82        cluster_external_out: &mut HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>>,
83        cluster_external_in: &mut HashMap<usize, Vec<UnboundedSender<Bytes>>>,
84        println_handler: fn(fmt::Arguments<'_>),
85        eprintln_handler: fn(fmt::Arguments<'_>),
86    ) -> (
87        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
88        Vec<(&'static str, Option<u32>, Dfir<'static>)>,
89        Hooks<&'static str>,
90        InlineHooks<&'static str>,
91    ),
92>;
93
94impl CompiledSim {
95    /// Executes the given closure with a single instance of the compiled simulation.
96    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
97        self.with_instantiator(|instantiator| thunk(instantiator()), true)
98    }
99
100    /// Executes the given closure with an [`Instantiator`], which can be called to create
101    /// independent instances of the simulation. This is useful for fuzzing, where we need to
102    /// re-execute the simulation several times with different decisions.
103    ///
104    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
105    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
106    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
107    pub fn with_instantiator<T>(
108        &self,
109        thunk: impl FnOnce(&dyn Instantiator) -> T,
110        always_log: bool,
111    ) -> T {
112        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
113        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
114        thunk(
115            &(|| CompiledSimInstance {
116                func: func.clone(),
117                externals_port_registry: self.externals_port_registry.clone(),
118                dylib_result: None,
119                log,
120            }),
121        )
122    }
123
124    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
125    /// closure will be repeatedly executed with instances of the Hydro program where the
126    /// batching boundaries, order of messages, and retries are varied.
127    ///
128    /// During development, you should run the test that invokes this function with the `cargo sim`
129    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
130    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
131    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
132    /// be executed, and if no reproducer is found a small number of random executions will be
133    /// performed.
134    pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
135        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
136            .elements()
137            .into_iter()
138            .find(|e| {
139                !e.fn_name.starts_with("hydro_lang::sim::compiled")
140                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
141                    && !e.fn_name.starts_with("fuzz<")
142                    && !e.fn_name.starts_with("<hydro_lang::sim")
143            })
144            .unwrap();
145
146        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
147        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
148
149        let caller_fuzz_repro_path = repro_folder
150            .join(caller_fn.fn_name.replace("::", "__"))
151            .with_extension("bin");
152
153        if std::env::var("BOLERO_FUZZER").is_ok() {
154            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
155            std::fs::create_dir_all(&corpus_dir).unwrap();
156            let libfuzzer_args = format!(
157                "{} {} -artifact_prefix={}/ -handle_abrt=0",
158                corpus_dir.to_str().unwrap(),
159                corpus_dir.to_str().unwrap(),
160                corpus_dir.to_str().unwrap(),
161            );
162
163            std::fs::create_dir_all(&repro_folder).unwrap();
164
165            if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
166                unsafe {
167                    std::env::set_var(
168                        "BOLERO_FAILURE_OUTPUT",
169                        caller_fuzz_repro_path.to_str().unwrap(),
170                    );
171                }
172            }
173
174            unsafe {
175                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
176            }
177
178            self.with_instantiator(
179                |instantiator| {
180                    bolero::test(bolero::TargetLocation {
181                        package_name: "",
182                        manifest_dir: "",
183                        module_path: "",
184                        file: "",
185                        line: 0,
186                        item_path: "<unknown>::__bolero_item_path__",
187                        test_name: None,
188                    })
189                    .run_with_replay(move |is_replay| {
190                        let mut instance = instantiator();
191
192                        if instance.log {
193                            eprintln!(
194                                "{}",
195                                "\n==== New Simulation Instance ===="
196                                    .color(colored::Color::Cyan)
197                                    .bold()
198                            );
199                        }
200
201                        if is_replay {
202                            instance.log = true;
203                        }
204
205                        tokio::runtime::Builder::new_current_thread()
206                            .build()
207                            .unwrap()
208                            .block_on(async { instance.run(&mut thunk).await })
209                    })
210                },
211                false,
212            );
213        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
214            self.fuzz_repro(existing_bytes, async |compiled| {
215                compiled.launch();
216                thunk().await
217            });
218        } else {
219            eprintln!(
220                "Running a fuzz test without `cargo sim` and no reproducer found at {}, defaulting to 8192 iterations with random inputs.",
221                caller_fuzz_repro_path.display()
222            );
223            self.with_instantiator(
224                |instantiator| {
225                    bolero::test(bolero::TargetLocation {
226                        package_name: "",
227                        manifest_dir: "",
228                        module_path: "",
229                        file: ".",
230                        line: 0,
231                        item_path: "<unknown>::__bolero_item_path__",
232                        test_name: None,
233                    })
234                    .with_iterations(8192)
235                    .run(move || {
236                        let instance = instantiator();
237                        tokio::runtime::Builder::new_current_thread()
238                            .build()
239                            .unwrap()
240                            .block_on(async { instance.run(&mut thunk).await })
241                    })
242                },
243                false,
244            );
245        }
246    }
247
248    /// Executes the given closure with a single instance of the compiled simulation, using the
249    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
250    /// failure found during fuzzing.
251    pub fn fuzz_repro<'a>(
252        &'a self,
253        bytes: Vec<u8>,
254        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
255    ) {
256        self.with_instance(|instance| {
257            bolero::bolero_engine::any::scope::with(
258                Box::new(bolero::bolero_engine::driver::object::Object(
259                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
260                )),
261                || {
262                    tokio::runtime::Builder::new_current_thread()
263                        .build()
264                        .unwrap()
265                        .block_on(async { instance.run_without_launching(thunk).await })
266                },
267            )
268        });
269    }
270
271    /// Exhaustively searches all possible executions of the simulation. The provided
272    /// closure will be repeatedly executed with instances of the Hydro program where the
273    /// batching boundaries, order of messages, and retries are varied.
274    ///
275    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
276    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
277    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
278    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
279    ///
280    /// Returns the number of distinct executions explored.
281    pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
282        if std::env::var("BOLERO_FUZZER").is_ok() {
283            eprintln!(
284                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
285            );
286            std::process::abort();
287        }
288
289        let mut count = 0;
290        let count_mut = &mut count;
291
292        self.with_instantiator(
293            |instantiator| {
294                bolero::test(bolero::TargetLocation {
295                    package_name: "",
296                    manifest_dir: "",
297                    module_path: "",
298                    file: "",
299                    line: 0,
300                    item_path: "<unknown>::__bolero_item_path__",
301                    test_name: None,
302                })
303                .exhaustive()
304                .run_with_replay(move |is_replay| {
305                    *count_mut += 1;
306
307                    let mut instance = instantiator();
308                    if instance.log {
309                        eprintln!(
310                            "{}",
311                            "\n==== New Simulation Instance ===="
312                                .color(colored::Color::Cyan)
313                                .bold()
314                        );
315                    }
316
317                    if is_replay {
318                        instance.log = true;
319                    }
320
321                    tokio::runtime::Builder::new_current_thread()
322                        .build()
323                        .unwrap()
324                        .block_on(async { instance.run(&mut thunk).await })
325                })
326            },
327            false,
328        );
329
330        count
331    }
332}
333
334// This must be a tuple because it is referenced from generated code in `graph.rs`.
335type DylibResult = (
336    Vec<(&'static str, Option<u32>, Dfir<'static>)>,
337    Vec<(&'static str, Option<u32>, Dfir<'static>)>,
338    Hooks<&'static str>,
339    InlineHooks<&'static str>,
340);
341
342/// A single instance of a compiled Hydro simulation, which provides methods to interactively
343/// execute the simulation, feed inputs, and receive outputs.
344pub struct CompiledSimInstance<'a> {
345    func: SimLoaded<'a>,
346    externals_port_registry: SimExternalPortRegistry,
347    dylib_result: Option<DylibResult>,
348    log: bool,
349}
350
351impl<'a> CompiledSimInstance<'a> {
352    async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
353        self.run_without_launching(async |instance| {
354            instance.launch();
355            thunk().await;
356        })
357        .await;
358    }
359
360    async fn run_without_launching(
361        mut self,
362        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
363    ) {
364        let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
365        let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
366        let mut cluster_external_out: HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>> =
367            HashMap::new();
368        let mut cluster_external_in: HashMap<usize, Vec<UnboundedSender<Bytes>>> = HashMap::new();
369
370        let dylib_result = unsafe {
371            (self.func)(
372                colored::control::SHOULD_COLORIZE.should_colorize(),
373                &mut external_out,
374                &mut external_in,
375                &mut cluster_external_out,
376                &mut cluster_external_in,
377                if self.log {
378                    println_handler
379                } else {
380                    null_handler
381                },
382                if self.log {
383                    eprintln_handler
384                } else {
385                    null_handler
386                },
387            )
388        };
389
390        let registered = &self.externals_port_registry.registered;
391
392        let mut input_senders = HashMap::new();
393        let mut output_receivers = HashMap::new();
394        let mut cluster_input_senders = HashMap::new();
395        let mut cluster_output_receivers = HashMap::new();
396
397        #[expect(
398            clippy::disallowed_methods,
399            reason = "inserts into maps also unordered"
400        )]
401        for sim_port in registered.values() {
402            let usize_key = sim_port.into_inner();
403            if let Some(sender) = external_in.remove(&usize_key) {
404                input_senders.insert(*sim_port, Rc::new(sender));
405            }
406            if let Some(receiver) = external_out.remove(&usize_key) {
407                output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
408            }
409            if let Some(senders) = cluster_external_in.remove(&usize_key) {
410                cluster_input_senders.insert(*sim_port, senders.into_iter().map(Rc::new).collect());
411            }
412            if let Some(receivers) = cluster_external_out.remove(&usize_key) {
413                cluster_output_receivers.insert(
414                    *sim_port,
415                    receivers
416                        .into_iter()
417                        .map(|r| Rc::new(Mutex::new(r)))
418                        .collect(),
419                );
420            }
421        }
422
423        self.dylib_result = Some(dylib_result);
424
425        let local_set = tokio::task::LocalSet::new();
426        local_set
427            .run_until(CURRENT_SIM_CONNECTIONS.scope(
428                RefCell::new(SimConnections {
429                    input_senders,
430                    output_receivers,
431                    cluster_input_senders,
432                    cluster_output_receivers,
433                    external_registered: self.externals_port_registry.registered.clone(),
434                }),
435                async move {
436                    thunk(self).await;
437                },
438            ))
439            .await;
440    }
441
442    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
443    /// be invoked but before receiving any messages.
444    fn launch(self) {
445        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
446    }
447
448    /// Returns a future that schedules simulation with the given logger for reporting the
449    /// simulation trace.
450    pub fn schedule_with_logger<W: std::io::Write>(
451        self,
452        log_writer: W,
453    ) -> impl use<W> + Future<Output = ()> {
454        self.schedule_with_maybe_logger(Some(log_writer))
455    }
456
457    fn schedule_with_maybe_logger<W: std::io::Write>(
458        mut self,
459        log_override: Option<W>,
460    ) -> impl use<W> + Future<Output = ()> {
461        let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
462
463        let not_ready_observation = async_dfirs
464            .iter()
465            .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
466            .collect();
467
468        let mut launched = LaunchedSim {
469            async_dfirs: async_dfirs
470                .into_iter()
471                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
472                .collect(),
473            possibly_ready_ticks: vec![],
474            not_ready_ticks: tick_dfirs
475                .into_iter()
476                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
477                .collect(),
478            possibly_ready_observation: vec![],
479            not_ready_observation,
480            hooks: hooks
481                .into_iter()
482                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
483                .collect(),
484            inline_hooks: inline_hooks
485                .into_iter()
486                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
487                .collect(),
488            log: if self.log {
489                if let Some(w) = log_override {
490                    LogKind::Custom(w)
491                } else {
492                    LogKind::Stderr
493                }
494            } else {
495                LogKind::Null
496            },
497        };
498
499        async move { launched.scheduler().await }
500    }
501}
502
503impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
504    fn clone(&self) -> Self {
505        *self
506    }
507}
508
509impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
510
511impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
512    async fn with_stream<Out>(
513        &self,
514        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
515    ) -> Out {
516        let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
517            let connections = &mut *connections.borrow_mut();
518            connections
519                .output_receivers
520                .get(connections.external_registered.get(&self.0).unwrap())
521                .unwrap()
522                .clone()
523        });
524
525        let mut receiver_stream = receiver.lock().await;
526        thunk(&mut pin!(
527            &mut receiver_stream
528                .by_ref()
529                .map(|b| bincode::deserialize(&b).unwrap())
530        ))
531        .await
532    }
533
534    /// Asserts that the stream has ended and no more messages can possibly arrive.
535    pub fn assert_no_more(self) -> impl Future<Output = ()>
536    where
537        T: Debug,
538    {
539        FutureTrackingCaller {
540            future: async move {
541                self.with_stream(async |stream| {
542                    if let Some(next) = stream.next().await {
543                        return Err(format!(
544                            "Stream yielded unexpected message: {:?}, expected termination",
545                            next
546                        ));
547                    }
548                    Ok(())
549                })
550                .await
551            },
552        }
553    }
554}
555
556impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
557    /// Receives the next message from the external bincode stream. This will wait until a message
558    /// is available, or return `None` if no more messages can possibly arrive.
559    pub async fn next(&self) -> Option<T> {
560        self.with_stream(async |stream| stream.next().await).await
561    }
562
563    /// Collects all remaining messages from the external bincode stream into a collection. This
564    /// will wait until no more messages can possibly arrive.
565    pub async fn collect<C: Default + Extend<T>>(self) -> C {
566        self.with_stream(async |stream| stream.collect().await)
567            .await
568    }
569
570    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
571    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
572    pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
573        &self,
574        expected: I,
575    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
576    where
577        T: Debug + PartialEq<T2>,
578    {
579        FutureTrackingCaller {
580            future: async {
581                let mut expected: VecDeque<T2> = expected.into_iter().collect();
582
583                while !expected.is_empty() {
584                    if let Some(next) = self.next().await {
585                        let next_expected = expected.pop_front().unwrap();
586                        if next != next_expected {
587                            return Err(format!(
588                                "Stream yielded unexpected message: {:?}, expected: {:?}",
589                                next, next_expected
590                            ));
591                        }
592                    } else {
593                        return Err(format!(
594                            "Stream ended early, still expected: {:?}",
595                            expected
596                        ));
597                    }
598                }
599
600                Ok(())
601            },
602        }
603    }
604
605    /// Asserts that the stream yields only the expected sequence of messages, in order,
606    /// and then ends.
607    pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
608        &self,
609        expected: I,
610    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
611    where
612        T: Debug + PartialEq<T2>,
613    {
614        ChainedFuture {
615            first: self.assert_yields(expected),
616            second: self.assert_no_more(),
617            first_done: false,
618        }
619    }
620}
621
622pin_project_lite::pin_project! {
623    // A future that tracks the location of the `.await` call for better panic messages.
624    //
625    // `#[track_caller]` is important for us to create assertion methods because it makes
626    // the panic backtrace show up at that method (instead of inside the call tree within
627    // that method). This is e.g. what `Option::unwrap` uses. Unfortunately, `#[track_caller]`
628    // does not work correctly for async methods (or `dyn Future` either), so we have to
629    // create these concrete future types that (1) have `#[track_caller]` on their `poll()`
630    // method and (2) have the `panic!` triggered in their `poll()` method (or in a directly
631    // nested concrete future).
632    struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
633        #[pin]
634        future: F,
635    }
636}
637
638impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
639    type Output = ();
640
641    #[track_caller]
642    fn poll(
643        mut self: Pin<&mut Self>,
644        cx: &mut std::task::Context<'_>,
645    ) -> std::task::Poll<Self::Output> {
646        match ready!(self.as_mut().project().future.poll(cx)) {
647            Ok(()) => std::task::Poll::Ready(()),
648            Err(e) => panic!("{}", e),
649        }
650    }
651}
652
653pin_project_lite::pin_project! {
654    // A future that first awaits the first future, then the second, propagating caller info.
655    //
656    // See [`FutureTrackingCaller`] for context.
657    struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
658        #[pin]
659        first: F1,
660        #[pin]
661        second: F2,
662        first_done: bool,
663    }
664}
665
666impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
667    type Output = ();
668
669    #[track_caller]
670    fn poll(
671        mut self: Pin<&mut Self>,
672        cx: &mut std::task::Context<'_>,
673    ) -> std::task::Poll<Self::Output> {
674        if !self.first_done {
675            ready!(self.as_mut().project().first.poll(cx));
676            *self.as_mut().project().first_done = true;
677        }
678
679        self.as_mut().project().second.poll(cx)
680    }
681}
682
683impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
684    /// Collects all remaining messages from the external bincode stream into a collection,
685    /// sorting them. This will wait until no more messages can possibly arrive.
686    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
687    where
688        T: Ord,
689    {
690        self.with_stream(async |stream| {
691            let mut collected: C = stream.collect().await;
692            collected.as_mut().sort();
693            collected
694        })
695        .await
696    }
697
698    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
699    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
700    pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
701        &self,
702        expected: I,
703    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
704    where
705        T: Debug + PartialEq<T2>,
706    {
707        FutureTrackingCaller {
708            future: async {
709                self.with_stream(async |stream| {
710                    let mut expected: Vec<T2> = expected.into_iter().collect();
711
712                    while !expected.is_empty() {
713                        if let Some(next) = stream.next().await {
714                            let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
715                            if let Some((i, _)) = idx {
716                                expected.swap_remove(i);
717                            } else {
718                                return Err(format!(
719                                    "Stream yielded unexpected message: {:?}",
720                                    next
721                                ));
722                            }
723                        } else {
724                            return Err(format!(
725                                "Stream ended early, still expected: {:?}",
726                                expected
727                            ));
728                        }
729                    }
730
731                    Ok(())
732                })
733                .await
734            },
735        }
736    }
737
738    /// Asserts that the stream yields only the expected sequence of messages, in some order,
739    /// and then ends.
740    pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
741        &self,
742        expected: I,
743    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
744    where
745        T: Debug + PartialEq<T2>,
746    {
747        ChainedFuture {
748            first: self.assert_yields_unordered(expected),
749            second: self.assert_no_more(),
750            first_done: false,
751        }
752    }
753}
754
755impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
756    fn with_sink<Out>(
757        &self,
758        thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
759    ) -> Out {
760        let sender = CURRENT_SIM_CONNECTIONS.with(|connections| {
761            let connections = &mut *connections.borrow_mut();
762            connections
763                .input_senders
764                .get(connections.external_registered.get(&self.0).unwrap())
765                .unwrap()
766                .clone()
767        });
768
769        thunk(&move |t| sender.send(bincode::serialize(&t).unwrap().into()))
770    }
771}
772
773impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
774    /// Sends several messages to the external bincode sink. The messages will be asynchronously
775    /// processed as part of the simulation, in non-deterministic order.
776    pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
777        self.with_sink(|send| {
778            for t in iter {
779                send(t).unwrap();
780            }
781        })
782    }
783}
784
785impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
786    /// Sends a message to the external bincode sink. The message will be asynchronously processed
787    /// as part of the simulation.
788    pub fn send(&self, t: T) {
789        self.with_sink(|send| send(t)).unwrap();
790    }
791
792    /// Sends several messages to the external bincode sink. The messages will be asynchronously
793    /// processed as part of the simulation.
794    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
795        self.with_sink(|send| {
796            for t in iter {
797                send(t).unwrap();
798            }
799        })
800    }
801}
802
803impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
804    for SimClusterReceiver<T, O, R>
805{
806    fn clone(&self) -> Self {
807        *self
808    }
809}
810
811impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
812    for SimClusterReceiver<T, O, R>
813{
814}
815
816impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
817    async fn with_member_stream<Out>(
818        &self,
819        member_id: u32,
820        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
821    ) -> Out {
822        let receiver = CURRENT_SIM_CONNECTIONS.with(|connections| {
823            let connections = &mut *connections.borrow_mut();
824            let receivers = connections
825                .cluster_output_receivers
826                .get(connections.external_registered.get(&self.0).unwrap())
827                .unwrap();
828            receivers[member_id as usize].clone()
829        });
830
831        let mut lock = receiver.lock().await;
832        thunk(&mut pin!(
833            lock.by_ref().map(|b| bincode::deserialize(&b).unwrap())
834        ))
835        .await
836    }
837}
838
839impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
840    /// Receives the next value from a specific cluster member.
841    pub async fn next(&self, member_id: u32) -> Option<T> {
842        self.with_member_stream(member_id, async |stream| stream.next().await)
843            .await
844    }
845
846    /// Collects all remaining values from a specific cluster member into a collection.
847    pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
848        self.with_member_stream(member_id, async |stream| stream.collect().await)
849            .await
850    }
851}
852
853impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
854    /// Collects all remaining values from a specific cluster member, sorted.
855    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
856    where
857        T: Ord,
858    {
859        self.with_member_stream(member_id, async |stream| {
860            let mut collected: C = stream.collect().await;
861            collected.as_mut().sort();
862            collected
863        })
864        .await
865    }
866}
867
868impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
869    fn with_sink<Out>(
870        &self,
871        thunk: impl FnOnce(
872            &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
873        ) -> Out,
874    ) -> Out {
875        let senders = CURRENT_SIM_CONNECTIONS.with(|connections| {
876            let connections = &mut *connections.borrow_mut();
877            connections
878                .cluster_input_senders
879                .get(connections.external_registered.get(&self.0).unwrap())
880                .unwrap()
881                .clone()
882        });
883
884        thunk(&move |member_id: u32, t: T| {
885            let payload = bincode::serialize(&t).unwrap();
886            senders[member_id as usize].send(Bytes::from(payload))
887        })
888    }
889}
890
891impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
892    /// Sends a value to a specific cluster member.
893    pub fn send(&self, member_id: u32, t: T) {
894        self.with_sink(|send| send(member_id, t)).unwrap();
895    }
896
897    /// Sends multiple values to specific cluster members.
898    pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
899        self.with_sink(|send| {
900            for (member_id, t) in iter {
901                send(member_id, t).unwrap();
902            }
903        })
904    }
905}
906
907enum LogKind<W: std::io::Write> {
908    Null,
909    Stderr,
910    Custom(W),
911}
912
913// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
914impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
915    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
916        match self {
917            LogKind::Null => Ok(()),
918            LogKind::Stderr => {
919                eprint!("{}", s);
920                Ok(())
921            }
922            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
923        }
924    }
925}
926
927/// A running simulation, which manages the async DFIR and tick DFIRs, and makes decisions
928/// about scheduling ticks and choices for non-deterministic operators like batch.
929struct LaunchedSim<W: std::io::Write> {
930    async_dfirs: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
931    possibly_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
932    not_ready_ticks: Vec<(LocationId, Option<u32>, Dfir<'static>)>,
933    possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
934    not_ready_observation: Vec<(LocationId, Option<u32>)>,
935    hooks: Hooks<LocationId>,
936    inline_hooks: InlineHooks<LocationId>,
937    log: LogKind<W>,
938}
939
940impl<W: std::io::Write> LaunchedSim<W> {
941    async fn scheduler(&mut self) {
942        loop {
943            tokio::task::yield_now().await;
944            let mut any_made_progress = false;
945            for (loc, c_id, dfir) in &mut self.async_dfirs {
946                if dfir.run_tick().await {
947                    any_made_progress = true;
948                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
949                        .not_ready_ticks
950                        .drain(..)
951                        .partition(|(tick_loc, tick_c_id, _)| {
952                            let LocationId::Tick(_, outer) = tick_loc else {
953                                unreachable!()
954                            };
955                            outer.as_ref() == loc && tick_c_id == c_id
956                        });
957
958                    self.possibly_ready_ticks.extend(now_ready);
959                    self.not_ready_ticks.extend(still_not_ready);
960
961                    let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
962                        .not_ready_observation
963                        .drain(..)
964                        .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
965
966                    self.possibly_ready_observation.extend(now_ready_obs);
967                    self.not_ready_observation.extend(still_not_ready_obs);
968                }
969            }
970
971            if any_made_progress {
972                continue;
973            } else {
974                use bolero::generator::*;
975
976                let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
977                    .possibly_ready_ticks
978                    .drain(..)
979                    .partition(|(name, cid, _)| {
980                        self.hooks
981                            .get(&(name.clone(), *cid))
982                            .unwrap()
983                            .iter()
984                            .any(|hook| {
985                                hook.current_decision().unwrap_or(false)
986                                    || hook.can_make_nontrivial_decision()
987                            })
988                    });
989
990                self.possibly_ready_ticks = ready_tick;
991                self.not_ready_ticks.append(&mut not_ready_tick);
992
993                let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
994                    .possibly_ready_observation
995                    .drain(..)
996                    .partition(|(name, cid)| {
997                        self.hooks
998                            .get(&(name.clone(), *cid))
999                            .into_iter()
1000                            .flatten()
1001                            .any(|hook| {
1002                                hook.current_decision().unwrap_or(false)
1003                                    || hook.can_make_nontrivial_decision()
1004                            })
1005                    });
1006
1007                self.possibly_ready_observation = ready_obs;
1008                self.not_ready_observation.append(&mut not_ready_obs);
1009
1010                if self.possibly_ready_ticks.is_empty()
1011                    && self.possibly_ready_observation.is_empty()
1012                {
1013                    break;
1014                } else {
1015                    let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1016                        + self.possibly_ready_observation.len()))
1017                        .any();
1018
1019                    if next_tick_or_obs < self.possibly_ready_ticks.len() {
1020                        let next_tick = next_tick_or_obs;
1021                        let mut removed = self.possibly_ready_ticks.remove(next_tick);
1022
1023                        match &mut self.log {
1024                            LogKind::Null => {}
1025                            LogKind::Stderr => {
1026                                if let Some(cid) = &removed.1 {
1027                                    eprintln!(
1028                                        "\n{}",
1029                                        format!("Running Tick (Cluster Member {})", cid)
1030                                            .color(colored::Color::Magenta)
1031                                            .bold()
1032                                    )
1033                                } else {
1034                                    eprintln!(
1035                                        "\n{}",
1036                                        "Running Tick".color(colored::Color::Magenta).bold()
1037                                    )
1038                                }
1039                            }
1040                            LogKind::Custom(writer) => {
1041                                writeln!(
1042                                    writer,
1043                                    "\n{}",
1044                                    "Running Tick".color(colored::Color::Magenta).bold()
1045                                )
1046                                .unwrap();
1047                            }
1048                        }
1049
1050                        let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1051                            write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1052                            write.write_str(" ")
1053                        };
1054
1055                        let mut tick_decision_writer = indenter::indented(&mut self.log)
1056                            .with_format(indenter::Format::Custom {
1057                                inserter: &mut asterisk_indenter,
1058                            });
1059
1060                        let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1061                        run_hooks(&mut tick_decision_writer, hooks);
1062
1063                        let run_tick_future = removed.2.run_tick();
1064                        if let Some(inline_hooks) =
1065                            self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1066                        {
1067                            let mut run_tick_future_pinned = pin!(run_tick_future);
1068
1069                            loop {
1070                                tokio::select! {
1071                                    biased;
1072                                    r = &mut run_tick_future_pinned => {
1073                                        assert!(r);
1074                                        break;
1075                                    }
1076                                    _ = async {} => {
1077                                        bolero_generator::any::scope::borrow_with(|driver| {
1078                                            for hook in inline_hooks.iter_mut() {
1079                                                if hook.pending_decision() {
1080                                                    if !hook.has_decision() {
1081                                                        hook.autonomous_decision(driver);
1082                                                    }
1083
1084                                                    hook.release_decision(&mut tick_decision_writer);
1085                                                }
1086                                            }
1087                                        });
1088                                    }
1089                                }
1090                            }
1091                        } else {
1092                            assert!(run_tick_future.await);
1093                        }
1094
1095                        self.possibly_ready_ticks.push(removed);
1096                    } else {
1097                        let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1098                        let mut default_hooks = vec![];
1099                        let hooks = self
1100                            .hooks
1101                            .get_mut(&self.possibly_ready_observation[next_obs])
1102                            .unwrap_or(&mut default_hooks);
1103
1104                        run_hooks(&mut self.log, hooks);
1105                    }
1106                }
1107            }
1108        }
1109    }
1110}
1111
1112fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1113    let mut remaining_decision_count = hooks.len();
1114    let mut made_nontrivial_decision = false;
1115
1116    bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1117        // first, scan manual decisions
1118        hooks.iter_mut().for_each(|hook| {
1119            if let Some(is_nontrivial) = hook.current_decision() {
1120                made_nontrivial_decision |= is_nontrivial;
1121                remaining_decision_count -= 1;
1122            } else if !hook.can_make_nontrivial_decision() {
1123                // if no nontrivial decision is possible, make a trivial one
1124                // (we need to do this in the first pass to force nontrivial decisions
1125                // on the remaining hooks)
1126                hook.autonomous_decision(driver, false);
1127                remaining_decision_count -= 1;
1128            }
1129        });
1130
1131        hooks.iter_mut().for_each(|hook| {
1132            if hook.current_decision().is_none() {
1133                made_nontrivial_decision |= hook.autonomous_decision(
1134                    driver,
1135                    !made_nontrivial_decision && remaining_decision_count == 1,
1136                );
1137                remaining_decision_count -= 1;
1138            }
1139
1140            hook.release_decision(tick_decision_writer);
1141        });
1142    });
1143}