1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub(crate) expr: DebugExpr,
46 pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53 fn clone(&self) -> Self {
54 Self {
55 expr: self.expr.clone(),
56 singleton_refs: self
57 .singleton_refs
58 .iter()
59 .map(|(node, is_mut)| {
60 let HydroNode::Reference {
61 inner,
62 kind,
63 access_counter,
64 metadata,
65 } = node
66 else {
67 panic!("singleton_refs should only contain HydroNode::Reference");
68 };
69 (
70 HydroNode::Reference {
71 inner: SharedNode(Rc::clone(&inner.0)),
72 kind: *kind,
73 access_counter: access_counter.freeze(),
74 metadata: metadata.clone(),
75 },
76 *is_mut,
77 )
78 })
79 .collect(),
80 }
81 }
82}
83
84impl Hash for ClosureExpr {
85 fn hash<H: Hasher>(&self, state: &mut H) {
86 self.expr.hash(state);
87 }
91}
92
93impl serde::Serialize for ClosureExpr {
94 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95 use serde::ser::SerializeStruct;
96 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97 s.serialize_field("expr", &self.expr)?;
98 s.serialize_field(
99 "singleton_refs",
100 &SerializableSingletonRefs(&self.singleton_refs),
101 )?;
102 s.end()
103 }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110 use serde::ser::SerializeSeq;
111 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112 for (node, is_mut) in self.0.iter() {
113 seq.serialize_element(&(node, is_mut))?;
114 }
115 seq.end()
116 }
117}
118
119impl Debug for ClosureExpr {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 Debug::fmt(&self.expr, f)
122 }
123}
124
125impl Display for ClosureExpr {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 Display::fmt(&self.expr, f)
128 }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132 fn from(expr: syn::Expr) -> Self {
133 Self {
134 expr: DebugExpr(Box::new(expr)),
135 singleton_refs: Vec::new(),
136 }
137 }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141 fn from(expr: DebugExpr) -> Self {
142 Self {
143 expr,
144 singleton_refs: Vec::new(),
145 }
146 }
147}
148
149impl ClosureExpr {
150 pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151 Self {
152 expr,
153 singleton_refs,
154 }
155 }
156
157 pub fn has_mut_ref(&self) -> bool {
158 self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159 }
160
161 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162 Self {
163 expr: self.expr.clone(),
164 singleton_refs: self
165 .singleton_refs
166 .iter()
167 .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168 .collect(),
169 }
170 }
171
172 pub fn transform_children(
173 &mut self,
174 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175 seen_tees: &mut SeenSharedNodes,
176 ) {
177 for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178 transform(ref_node, seen_tees);
179 }
180 }
181
182 #[cfg(feature = "build")]
185 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186 if self.singleton_refs.is_empty() {
187 self.expr.0.to_token_stream()
188 } else {
189 assert!(
190 ident_stack.len() >= self.singleton_refs.len(),
191 "ident_stack has {} entries but expected at least {} for singleton_refs",
192 ident_stack.len(),
193 self.singleton_refs.len()
194 );
195 let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197 let mut let_bindings = Vec::new();
198 for ((i, (ref_node, is_mut)), ref_ident) in
199 self.singleton_refs.iter().enumerate().zip(ref_idents)
200 {
201 let HydroNode::Reference { access_counter, .. } = ref_node else {
202 panic!("ClosureExpression expected references to `HydroNode::Reference`");
203 };
204 let group = access_counter.frozen_group();
205 let local_ident = handoff_ref_ident(i);
207 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208 let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209 let mut_token = is_mut.then(|| quote!(mut));
210 let binding = quote! {
211 let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212 };
213 let_bindings.push(binding);
214 }
215
216 let expr = &self.expr.0;
217 quote! {
218 {
219 #( #let_bindings )*
220 #expr
221 }
222 }
223 }
224 }
225}
226
227#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235 serializer.serialize_str(&self.to_string())
236 }
237}
238
239impl From<syn::Expr> for DebugExpr {
240 fn from(expr: syn::Expr) -> Self {
241 Self(Box::new(expr))
242 }
243}
244
245impl Deref for DebugExpr {
246 type Target = syn::Expr;
247
248 fn deref(&self) -> &Self::Target {
249 &self.0
250 }
251}
252
253impl ToTokens for DebugExpr {
254 fn to_tokens(&self, tokens: &mut TokenStream) {
255 self.0.to_tokens(tokens);
256 }
257}
258
259impl Debug for DebugExpr {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 write!(f, "{}", self.0.to_token_stream())
262 }
263}
264
265impl Display for DebugExpr {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 let original = self.0.as_ref().clone();
268 let simplified = simplify_q_macro(original);
269
270 write!(f, "q!({})", quote::quote!(#simplified))
273 }
274}
275
276fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278 if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279 && is_stageleft_runtime_support_call(&path_expr.path)
281 && let syn::Expr::Block(b) = &call.args[0]
282 && b.block.stmts.len() == 3
283 && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284 {
286 let mut e = e.clone();
287 while let syn::Expr::Block(ref mut block) = e
288 && block.block.stmts.len() == 1
289 && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290 {
291 e = inner_e;
292 }
293
294 e
295 } else {
296 expr
297 }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301 if let Some(last_segment) = path.segments.last() {
303 let fn_name = last_segment.ident.to_string();
304 path.segments.len() > 2
305 && path.segments[0].ident == "stageleft"
306 && path.segments[1].ident == "runtime_support"
307 && fn_name.contains("_type_hint")
308 } else {
309 false
310 }
311}
312
313#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320 fn from(t: syn::Type) -> Self {
321 Self(Box::new(t))
322 }
323}
324
325impl Deref for DebugType {
326 type Target = syn::Type;
327
328 fn deref(&self) -> &Self::Target {
329 &self.0
330 }
331}
332
333impl ToTokens for DebugType {
334 fn to_tokens(&self, tokens: &mut TokenStream) {
335 self.0.to_tokens(tokens);
336 }
337}
338
339impl Debug for DebugType {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 write!(f, "{}", self.0.to_token_stream())
342 }
343}
344
345impl serde::Serialize for DebugType {
346 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348 }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352 backtrace: &Backtrace,
353 serializer: S,
354) -> Result<S::Ok, S::Error> {
355 match backtrace.format_span() {
356 Some(span) => serializer.serialize_some(&span),
357 None => serializer.serialize_none(),
358 }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362 ident: &syn::Ident,
363 serializer: S,
364) -> Result<S::Ok, S::Error> {
365 serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369 Building,
370 Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375 match self {
376 DebugInstantiate::Building => {
377 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378 }
379 DebugInstantiate::Finalized(_) => {
380 panic!(
381 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382 )
383 }
384 }
385 }
386}
387
388#[cfg_attr(
389 not(feature = "build"),
390 expect(
391 dead_code,
392 reason = "sink, source unused without `feature = \"build\"`."
393 )
394)]
395pub struct DebugInstantiateFinalized {
396 sink: syn::Expr,
397 source: syn::Expr,
398 connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402 fn from(f: DebugInstantiateFinalized) -> Self {
403 Self::Finalized(Box::new(f))
404 }
405}
406
407impl Debug for DebugInstantiate {
408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409 write!(f, "<network instantiate>")
410 }
411}
412
413impl Hash for DebugInstantiate {
414 fn hash<H: Hasher>(&self, _state: &mut H) {
415 }
417}
418
419impl Clone for DebugInstantiate {
420 fn clone(&self) -> Self {
421 match self {
422 DebugInstantiate::Building => DebugInstantiate::Building,
423 DebugInstantiate::Finalized(_) => {
424 panic!("DebugInstantiate::Finalized should not be cloned")
425 }
426 }
427 }
428}
429
430#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440 Uninit,
442 Stream(DebugExpr),
445 Tee(LocationId, LocationId),
449}
450
451#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454 Stream(DebugExpr),
455 ExternalNetwork(),
456 Iter(DebugExpr),
457 Spin(),
458 ClusterMembers(LocationId, ClusterMembersState),
459 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464pub trait DfirBuilder {
470 fn singleton_intermediates(&self) -> bool;
472
473 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476 #[expect(clippy::too_many_arguments, reason = "TODO")]
477 fn batch(
478 &mut self,
479 in_ident: syn::Ident,
480 in_location: &LocationId,
481 in_kind: &CollectionKind,
482 out_ident: &syn::Ident,
483 out_location: &LocationId,
484 op_meta: &HydroIrOpMetadata,
485 fold_hooked_idents: &HashSet<String>,
486 );
487 fn yield_from_tick(
488 &mut self,
489 in_ident: syn::Ident,
490 in_location: &LocationId,
491 in_kind: &CollectionKind,
492 out_ident: &syn::Ident,
493 out_location: &LocationId,
494 );
495
496 fn begin_atomic(
497 &mut self,
498 in_ident: syn::Ident,
499 in_location: &LocationId,
500 in_kind: &CollectionKind,
501 out_ident: &syn::Ident,
502 out_location: &LocationId,
503 op_meta: &HydroIrOpMetadata,
504 );
505 fn end_atomic(
506 &mut self,
507 in_ident: syn::Ident,
508 in_location: &LocationId,
509 in_kind: &CollectionKind,
510 out_ident: &syn::Ident,
511 );
512
513 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514 fn observe_nondet(
515 &mut self,
516 trusted: bool,
517 location: &LocationId,
518 in_ident: syn::Ident,
519 in_kind: &CollectionKind,
520 out_ident: &syn::Ident,
521 out_kind: &CollectionKind,
522 op_meta: &HydroIrOpMetadata,
523 );
524
525 #[expect(clippy::too_many_arguments, reason = "TODO")]
526 fn merge_ordered(
527 &mut self,
528 location: &LocationId,
529 first_ident: syn::Ident,
530 second_ident: syn::Ident,
531 out_ident: &syn::Ident,
532 in_kind: &CollectionKind,
533 op_meta: &HydroIrOpMetadata,
534 operator_tag: Option<&str>,
535 );
536
537 #[expect(clippy::too_many_arguments, reason = "TODO")]
538 fn create_network(
539 &mut self,
540 from: &LocationId,
541 to: &LocationId,
542 input_ident: syn::Ident,
543 out_ident: &syn::Ident,
544 serialize: Option<&DebugExpr>,
545 sink: syn::Expr,
546 source: syn::Expr,
547 deserialize: Option<&DebugExpr>,
548 tag_id: StmtId,
549 networking_info: &crate::networking::NetworkingInfo,
550 );
551
552 fn create_external_source(
553 &mut self,
554 on: &LocationId,
555 source_expr: syn::Expr,
556 out_ident: &syn::Ident,
557 deserialize: Option<&DebugExpr>,
558 tag_id: StmtId,
559 );
560
561 fn create_external_output(
562 &mut self,
563 on: &LocationId,
564 sink_expr: syn::Expr,
565 input_ident: &syn::Ident,
566 serialize: Option<&DebugExpr>,
567 tag_id: StmtId,
568 );
569
570 fn emit_fold_hook(
573 &mut self,
574 location: &LocationId,
575 in_ident: &syn::Ident,
576 in_kind: &CollectionKind,
577 op_meta: &HydroIrOpMetadata,
578 ) -> Option<syn::Ident>;
579
580 fn assert_is_consistent(
584 &mut self,
585 trusted: bool,
586 location: &LocationId,
587 in_ident: syn::Ident,
588 out_ident: &syn::Ident,
589 );
590
591 fn observe_for_mut(
595 &mut self,
596 location: &LocationId,
597 in_ident: syn::Ident,
598 in_kind: &CollectionKind,
599 out_ident: &syn::Ident,
600 op_meta: &HydroIrOpMetadata,
601 );
602
603 fn create_versioned_network_fork(
604 &mut self,
605 channel_id: u32,
606 dest: &LocationId,
607 senders: Vec<(LocationId, syn::Ident, Option<DebugExpr>)>,
608 tag_id: StmtId,
609 );
610
611 fn create_versioned_network(
612 &mut self,
613 channel_id: u32,
614 source: &LocationId,
615 dest: &LocationId,
616 out_ident: &syn::Ident,
617 deserialize: Option<&DebugExpr>,
618 tag_id: StmtId,
619 );
620}
621
622#[cfg(feature = "build")]
623impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
624 fn singleton_intermediates(&self) -> bool {
625 false
626 }
627
628 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
629 self.entry(location.root().key())
630 .expect("location was removed")
631 .or_default()
632 }
633
634 fn batch(
635 &mut self,
636 in_ident: syn::Ident,
637 in_location: &LocationId,
638 in_kind: &CollectionKind,
639 out_ident: &syn::Ident,
640 _out_location: &LocationId,
641 _op_meta: &HydroIrOpMetadata,
642 _fold_hooked_idents: &HashSet<String>,
643 ) {
644 let builder = self.get_dfir_mut(in_location.root());
645 if in_kind.is_bounded()
646 && matches!(
647 in_kind,
648 CollectionKind::Singleton { .. }
649 | CollectionKind::Optional { .. }
650 | CollectionKind::KeyedSingleton { .. }
651 )
652 {
653 assert!(in_location.is_top_level());
654 builder.add_dfir(
655 parse_quote! {
656 #out_ident = #in_ident -> persist::<'static>();
657 },
658 None,
659 None,
660 );
661 } else {
662 builder.add_dfir(
663 parse_quote! {
664 #out_ident = #in_ident;
665 },
666 None,
667 None,
668 );
669 }
670 }
671
672 fn yield_from_tick(
673 &mut self,
674 in_ident: syn::Ident,
675 in_location: &LocationId,
676 _in_kind: &CollectionKind,
677 out_ident: &syn::Ident,
678 _out_location: &LocationId,
679 ) {
680 let builder = self.get_dfir_mut(in_location.root());
681 builder.add_dfir(
682 parse_quote! {
683 #out_ident = #in_ident;
684 },
685 None,
686 None,
687 );
688 }
689
690 fn begin_atomic(
691 &mut self,
692 in_ident: syn::Ident,
693 in_location: &LocationId,
694 _in_kind: &CollectionKind,
695 out_ident: &syn::Ident,
696 _out_location: &LocationId,
697 _op_meta: &HydroIrOpMetadata,
698 ) {
699 let builder = self.get_dfir_mut(in_location.root());
700 builder.add_dfir(
701 parse_quote! {
702 #out_ident = #in_ident;
703 },
704 None,
705 None,
706 );
707 }
708
709 fn end_atomic(
710 &mut self,
711 in_ident: syn::Ident,
712 in_location: &LocationId,
713 _in_kind: &CollectionKind,
714 out_ident: &syn::Ident,
715 ) {
716 let builder = self.get_dfir_mut(in_location.root());
717 builder.add_dfir(
718 parse_quote! {
719 #out_ident = #in_ident;
720 },
721 None,
722 None,
723 );
724 }
725
726 fn observe_nondet(
727 &mut self,
728 _trusted: bool,
729 location: &LocationId,
730 in_ident: syn::Ident,
731 _in_kind: &CollectionKind,
732 out_ident: &syn::Ident,
733 _out_kind: &CollectionKind,
734 _op_meta: &HydroIrOpMetadata,
735 ) {
736 let builder = self.get_dfir_mut(location);
737 builder.add_dfir(
738 parse_quote! {
739 #out_ident = #in_ident;
740 },
741 None,
742 None,
743 );
744 }
745
746 fn merge_ordered(
747 &mut self,
748 location: &LocationId,
749 first_ident: syn::Ident,
750 second_ident: syn::Ident,
751 out_ident: &syn::Ident,
752 _in_kind: &CollectionKind,
753 _op_meta: &HydroIrOpMetadata,
754 operator_tag: Option<&str>,
755 ) {
756 let builder = self.get_dfir_mut(location);
757 builder.add_dfir(
758 parse_quote! {
759 #out_ident = union();
760 #first_ident -> [0]#out_ident;
761 #second_ident -> [1]#out_ident;
762 },
763 None,
764 operator_tag,
765 );
766 }
767
768 fn create_network(
769 &mut self,
770 from: &LocationId,
771 to: &LocationId,
772 input_ident: syn::Ident,
773 out_ident: &syn::Ident,
774 serialize: Option<&DebugExpr>,
775 sink: syn::Expr,
776 source: syn::Expr,
777 deserialize: Option<&DebugExpr>,
778 tag_id: StmtId,
779 _networking_info: &crate::networking::NetworkingInfo,
780 ) {
781 let sender_builder = self.get_dfir_mut(from);
782 if let Some(serialize_pipeline) = serialize {
783 sender_builder.add_dfir(
784 parse_quote! {
785 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
786 },
787 None,
788 Some(&format!("send{}", tag_id)),
790 );
791 } else {
792 sender_builder.add_dfir(
793 parse_quote! {
794 #input_ident -> dest_sink(#sink);
795 },
796 None,
797 Some(&format!("send{}", tag_id)),
798 );
799 }
800
801 let receiver_builder = self.get_dfir_mut(to);
802 if let Some(deserialize_pipeline) = deserialize {
803 receiver_builder.add_dfir(
804 parse_quote! {
805 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
806 },
807 None,
808 Some(&format!("recv{}", tag_id)),
809 );
810 } else {
811 receiver_builder.add_dfir(
812 parse_quote! {
813 #out_ident = source_stream(#source);
814 },
815 None,
816 Some(&format!("recv{}", tag_id)),
817 );
818 }
819 }
820
821 fn create_external_source(
822 &mut self,
823 on: &LocationId,
824 source_expr: syn::Expr,
825 out_ident: &syn::Ident,
826 deserialize: Option<&DebugExpr>,
827 tag_id: StmtId,
828 ) {
829 let receiver_builder = self.get_dfir_mut(on);
830 if let Some(deserialize_pipeline) = deserialize {
831 receiver_builder.add_dfir(
832 parse_quote! {
833 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
834 },
835 None,
836 Some(&format!("recv{}", tag_id)),
837 );
838 } else {
839 receiver_builder.add_dfir(
840 parse_quote! {
841 #out_ident = source_stream(#source_expr);
842 },
843 None,
844 Some(&format!("recv{}", tag_id)),
845 );
846 }
847 }
848
849 fn create_external_output(
850 &mut self,
851 on: &LocationId,
852 sink_expr: syn::Expr,
853 input_ident: &syn::Ident,
854 serialize: Option<&DebugExpr>,
855 tag_id: StmtId,
856 ) {
857 let sender_builder = self.get_dfir_mut(on);
858 if let Some(serialize_fn) = serialize {
859 sender_builder.add_dfir(
860 parse_quote! {
861 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
862 },
863 None,
864 Some(&format!("send{}", tag_id)),
866 );
867 } else {
868 sender_builder.add_dfir(
869 parse_quote! {
870 #input_ident -> dest_sink(#sink_expr);
871 },
872 None,
873 Some(&format!("send{}", tag_id)),
874 );
875 }
876 }
877
878 fn emit_fold_hook(
879 &mut self,
880 _location: &LocationId,
881 _in_ident: &syn::Ident,
882 _in_kind: &CollectionKind,
883 _op_meta: &HydroIrOpMetadata,
884 ) -> Option<syn::Ident> {
885 None
886 }
887
888 fn assert_is_consistent(
889 &mut self,
890 _trusted: bool,
891 location: &LocationId,
892 in_ident: syn::Ident,
893 out_ident: &syn::Ident,
894 ) {
895 let builder = self.get_dfir_mut(location);
896 builder.add_dfir(
897 parse_quote! {
898 #out_ident = #in_ident;
899 },
900 None,
901 None,
902 );
903 }
904
905 fn observe_for_mut(
906 &mut self,
907 location: &LocationId,
908 in_ident: syn::Ident,
909 _in_kind: &CollectionKind,
910 out_ident: &syn::Ident,
911 _op_meta: &HydroIrOpMetadata,
912 ) {
913 let builder = self.get_dfir_mut(location);
914 builder.add_dfir(
915 parse_quote! {
916 #out_ident = #in_ident;
917 },
918 None,
919 None,
920 );
921 }
922
923 fn create_versioned_network_fork(
924 &mut self,
925 _channel_id: u32,
926 _dest: &LocationId,
927 _senders: Vec<(LocationId, syn::Ident, Option<DebugExpr>)>,
928 _tag_id: StmtId,
929 ) {
930 unreachable!(
931 "HydroNode::VersionedNetworkFork is only produced by the multi-version simulator merge \
932 pass and cannot be emitted by the non-simulation builder"
933 );
934 }
935
936 fn create_versioned_network(
937 &mut self,
938 _channel_id: u32,
939 _source: &LocationId,
940 _dest: &LocationId,
941 _out_ident: &syn::Ident,
942 _deserialize: Option<&DebugExpr>,
943 _tag_id: StmtId,
944 ) {
945 unreachable!(
946 "HydroNode::VersionedNetwork is only produced by the multi-version simulator merge \
947 pass and cannot be emitted by the non-simulation builder"
948 );
949 }
950}
951
952#[cfg(feature = "build")]
953pub enum BuildersOrCallback<'a, L, N>
954where
955 L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
956 N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
957{
958 Builders(&'a mut dyn DfirBuilder),
959 Callback(L, N),
960}
961
962#[derive(Debug, Hash, serde::Serialize)]
966pub enum HydroRoot {
967 ForEach {
968 f: ClosureExpr,
969 input: Box<HydroNode>,
970 op_metadata: HydroIrOpMetadata,
971 },
972 SendExternal {
973 to_external_key: LocationKey,
974 to_port_id: ExternalPortId,
975 to_many: bool,
976 unpaired: bool,
977 serialize_fn: Option<DebugExpr>,
978 instantiate_fn: DebugInstantiate,
979 input: Box<HydroNode>,
980 op_metadata: HydroIrOpMetadata,
981 },
982 DestSink {
983 sink: DebugExpr,
984 input: Box<HydroNode>,
985 op_metadata: HydroIrOpMetadata,
986 },
987 CycleSink {
988 cycle_id: CycleId,
989 input: Box<HydroNode>,
990 op_metadata: HydroIrOpMetadata,
991 },
992 EmbeddedOutput {
993 #[serde(serialize_with = "serialize_ident")]
994 ident: syn::Ident,
995 input: Box<HydroNode>,
996 op_metadata: HydroIrOpMetadata,
997 },
998 Null {
999 input: Box<HydroNode>,
1000 op_metadata: HydroIrOpMetadata,
1001 },
1002}
1003
1004impl HydroRoot {
1005 #[cfg(feature = "build")]
1006 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1007 pub fn compile_network<'a, D>(
1008 &mut self,
1009 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1010 seen_tees: &mut SeenSharedNodes,
1011 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1012 processes: &SparseSecondaryMap<LocationKey, D::Process>,
1013 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1014 externals: &SparseSecondaryMap<LocationKey, D::External>,
1015 env: &mut D::InstantiateEnv,
1016 ) where
1017 D: Deploy<'a>,
1018 {
1019 let refcell_extra_stmts = RefCell::new(extra_stmts);
1020 let refcell_env = RefCell::new(env);
1021 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1022 self.transform_bottom_up(
1023 &mut |l| {
1024 if let HydroRoot::SendExternal {
1025 input,
1026 to_external_key,
1027 to_port_id,
1028 to_many,
1029 unpaired,
1030 instantiate_fn,
1031 ..
1032 } = l
1033 {
1034 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1035 DebugInstantiate::Building => {
1036 let to_node = externals
1037 .get(*to_external_key)
1038 .unwrap_or_else(|| {
1039 panic!("A external used in the graph was not instantiated: {}", to_external_key)
1040 })
1041 .clone();
1042
1043 match input.metadata().location_id.root() {
1044 &LocationId::Process(process_key) => {
1045 if *to_many {
1046 (
1047 (
1048 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1049 parse_quote!(DUMMY),
1050 ),
1051 Box::new(|| {}) as Box<dyn FnOnce()>,
1052 )
1053 } else {
1054 let from_node = processes
1055 .get(process_key)
1056 .unwrap_or_else(|| {
1057 panic!("A process used in the graph was not instantiated: {}", process_key)
1058 })
1059 .clone();
1060
1061 let sink_port = from_node.next_port();
1062 let source_port = to_node.next_port();
1063
1064 if *unpaired {
1065 use stageleft::quote_type;
1066 use tokio_util::codec::LengthDelimitedCodec;
1067
1068 to_node.register(*to_port_id, source_port.clone());
1069
1070 let _ = D::e2o_source(
1071 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1072 &to_node, &source_port,
1073 &from_node, &sink_port,
1074 "e_type::<LengthDelimitedCodec>(),
1075 format!("{}_{}", *to_external_key, *to_port_id)
1076 );
1077 }
1078
1079 (
1080 (
1081 D::o2e_sink(
1082 &from_node,
1083 &sink_port,
1084 &to_node,
1085 &source_port,
1086 format!("{}_{}", *to_external_key, *to_port_id)
1087 ),
1088 parse_quote!(DUMMY),
1089 ),
1090 if *unpaired {
1091 D::e2o_connect(
1092 &to_node,
1093 &source_port,
1094 &from_node,
1095 &sink_port,
1096 *to_many,
1097 NetworkHint::Auto,
1098 )
1099 } else {
1100 Box::new(|| {}) as Box<dyn FnOnce()>
1101 },
1102 )
1103 }
1104 }
1105 LocationId::Cluster(cluster_key) => {
1106 let from_node = clusters
1107 .get(*cluster_key)
1108 .unwrap_or_else(|| {
1109 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1110 })
1111 .clone();
1112
1113 let sink_port = from_node.next_port();
1114 let source_port = to_node.next_port();
1115
1116 if *unpaired {
1117 to_node.register(*to_port_id, source_port.clone());
1118 }
1119
1120 (
1121 (
1122 D::m2e_sink(
1123 &from_node,
1124 &sink_port,
1125 &to_node,
1126 &source_port,
1127 format!("{}_{}", *to_external_key, *to_port_id)
1128 ),
1129 parse_quote!(DUMMY),
1130 ),
1131 Box::new(|| {}) as Box<dyn FnOnce()>,
1132 )
1133 }
1134 _ => panic!()
1135 }
1136 },
1137
1138 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1139 };
1140
1141 *instantiate_fn = DebugInstantiateFinalized {
1142 sink: sink_expr,
1143 source: source_expr,
1144 connect_fn: Some(connect_fn),
1145 }
1146 .into();
1147 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1148 let element_type = match &input.metadata().collection_kind {
1149 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1150 _ => panic!("Embedded output must have Stream collection kind"),
1151 };
1152 let location_key = match input.metadata().location_id.root() {
1153 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1154 _ => panic!("Embedded output must be on a process or cluster"),
1155 };
1156 D::register_embedded_output(
1157 &mut refcell_env.borrow_mut(),
1158 location_key,
1159 ident,
1160 &element_type,
1161 );
1162 }
1163 },
1164 &mut |n| {
1165 if let HydroNode::Network {
1166 name,
1167 networking_info,
1168 input,
1169 instantiate_fn,
1170 metadata,
1171 ..
1172 } = n
1173 {
1174 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1175 DebugInstantiate::Building => instantiate_network::<D>(
1176 &mut refcell_env.borrow_mut(),
1177 input.metadata().location_id.root(),
1178 metadata.location_id.root(),
1179 processes,
1180 clusters,
1181 name.as_deref(),
1182 networking_info,
1183 ),
1184
1185 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1186 };
1187
1188 *instantiate_fn = DebugInstantiateFinalized {
1189 sink: sink_expr,
1190 source: source_expr,
1191 connect_fn: Some(connect_fn),
1192 }
1193 .into();
1194 } else if let HydroNode::ExternalInput {
1195 from_external_key,
1196 from_port_id,
1197 from_many,
1198 codec_type,
1199 port_hint,
1200 instantiate_fn,
1201 metadata,
1202 ..
1203 } = n
1204 {
1205 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1206 DebugInstantiate::Building => {
1207 let from_node = externals
1208 .get(*from_external_key)
1209 .unwrap_or_else(|| {
1210 panic!(
1211 "A external used in the graph was not instantiated: {}",
1212 from_external_key,
1213 )
1214 })
1215 .clone();
1216
1217 match metadata.location_id.root() {
1218 &LocationId::Process(process_key) => {
1219 let to_node = processes
1220 .get(process_key)
1221 .unwrap_or_else(|| {
1222 panic!("A process used in the graph was not instantiated: {}", process_key)
1223 })
1224 .clone();
1225
1226 let sink_port = from_node.next_port();
1227 let source_port = to_node.next_port();
1228
1229 from_node.register(*from_port_id, sink_port.clone());
1230
1231 (
1232 (
1233 parse_quote!(DUMMY),
1234 if *from_many {
1235 D::e2o_many_source(
1236 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1237 &to_node, &source_port,
1238 codec_type.0.as_ref(),
1239 format!("{}_{}", *from_external_key, *from_port_id)
1240 )
1241 } else {
1242 D::e2o_source(
1243 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1244 &from_node, &sink_port,
1245 &to_node, &source_port,
1246 codec_type.0.as_ref(),
1247 format!("{}_{}", *from_external_key, *from_port_id)
1248 )
1249 },
1250 ),
1251 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1252 )
1253 }
1254 LocationId::Cluster(cluster_key) => {
1255 let to_node = clusters
1256 .get(*cluster_key)
1257 .unwrap_or_else(|| {
1258 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1259 })
1260 .clone();
1261
1262 let sink_port = from_node.next_port();
1263 let source_port = to_node.next_port();
1264
1265 from_node.register(*from_port_id, sink_port.clone());
1266
1267 (
1268 (
1269 parse_quote!(DUMMY),
1270 D::e2m_source(
1271 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1272 &from_node, &sink_port,
1273 &to_node, &source_port,
1274 codec_type.0.as_ref(),
1275 format!("{}_{}", *from_external_key, *from_port_id)
1276 ),
1277 ),
1278 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1279 )
1280 }
1281 _ => panic!()
1282 }
1283 },
1284
1285 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1286 };
1287
1288 *instantiate_fn = DebugInstantiateFinalized {
1289 sink: sink_expr,
1290 source: source_expr,
1291 connect_fn: Some(connect_fn),
1292 }
1293 .into();
1294 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1295 let element_type = match &metadata.collection_kind {
1296 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1297 _ => panic!("Embedded source must have Stream collection kind"),
1298 };
1299 let location_key = match metadata.location_id.root() {
1300 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1301 _ => panic!("Embedded source must be on a process or cluster"),
1302 };
1303 D::register_embedded_stream_input(
1304 &mut refcell_env.borrow_mut(),
1305 location_key,
1306 ident,
1307 &element_type,
1308 );
1309 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1310 let element_type = match &metadata.collection_kind {
1311 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1312 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1313 };
1314 let location_key = match metadata.location_id.root() {
1315 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1316 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1317 };
1318 D::register_embedded_singleton_input(
1319 &mut refcell_env.borrow_mut(),
1320 location_key,
1321 ident,
1322 &element_type,
1323 );
1324 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1325 match state {
1326 ClusterMembersState::Uninit => {
1327 let at_location = metadata.location_id.root().clone();
1328 let key = (at_location.clone(), location_id.key());
1329 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1330 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1332 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1333 &(),
1334 );
1335 *state = ClusterMembersState::Stream(expr.into());
1336 } else {
1337 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1339 }
1340 }
1341 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1342 panic!("cluster members already finalized");
1343 }
1344 }
1345 }
1346 },
1347 seen_tees,
1348 false,
1349 );
1350 }
1351
1352 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1353 self.transform_bottom_up(
1354 &mut |l| {
1355 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1356 match instantiate_fn {
1357 DebugInstantiate::Building => panic!("network not built"),
1358
1359 DebugInstantiate::Finalized(finalized) => {
1360 (finalized.connect_fn.take().unwrap())();
1361 }
1362 }
1363 }
1364 },
1365 &mut |n| {
1366 if let HydroNode::Network { instantiate_fn, .. }
1367 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1368 {
1369 match instantiate_fn {
1370 DebugInstantiate::Building => panic!("network not built"),
1371
1372 DebugInstantiate::Finalized(finalized) => {
1373 (finalized.connect_fn.take().unwrap())();
1374 }
1375 }
1376 }
1377 },
1378 seen_tees,
1379 false,
1380 );
1381 }
1382
1383 pub fn transform_bottom_up(
1384 &mut self,
1385 transform_root: &mut impl FnMut(&mut HydroRoot),
1386 transform_node: &mut impl FnMut(&mut HydroNode),
1387 seen_tees: &mut SeenSharedNodes,
1388 check_well_formed: bool,
1389 ) {
1390 self.transform_children(
1391 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1392 seen_tees,
1393 );
1394
1395 transform_root(self);
1396 }
1397
1398 pub fn transform_children(
1399 &mut self,
1400 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1401 seen_tees: &mut SeenSharedNodes,
1402 ) {
1403 match self {
1404 HydroRoot::ForEach { f, input, .. } => {
1405 f.transform_children(&mut transform, seen_tees);
1406 transform(input, seen_tees);
1407 }
1408 HydroRoot::SendExternal { input, .. }
1409 | HydroRoot::DestSink { input, .. }
1410 | HydroRoot::CycleSink { input, .. }
1411 | HydroRoot::EmbeddedOutput { input, .. }
1412 | HydroRoot::Null { input, .. } => {
1413 transform(input, seen_tees);
1414 }
1415 }
1416 }
1417
1418 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1419 match self {
1420 HydroRoot::ForEach {
1421 f,
1422 input,
1423 op_metadata,
1424 } => HydroRoot::ForEach {
1425 f: f.deep_clone(seen_tees),
1426 input: Box::new(input.deep_clone(seen_tees)),
1427 op_metadata: op_metadata.clone(),
1428 },
1429 HydroRoot::SendExternal {
1430 to_external_key,
1431 to_port_id,
1432 to_many,
1433 unpaired,
1434 serialize_fn,
1435 instantiate_fn,
1436 input,
1437 op_metadata,
1438 } => HydroRoot::SendExternal {
1439 to_external_key: *to_external_key,
1440 to_port_id: *to_port_id,
1441 to_many: *to_many,
1442 unpaired: *unpaired,
1443 serialize_fn: serialize_fn.clone(),
1444 instantiate_fn: instantiate_fn.clone(),
1445 input: Box::new(input.deep_clone(seen_tees)),
1446 op_metadata: op_metadata.clone(),
1447 },
1448 HydroRoot::DestSink {
1449 sink,
1450 input,
1451 op_metadata,
1452 } => HydroRoot::DestSink {
1453 sink: sink.clone(),
1454 input: Box::new(input.deep_clone(seen_tees)),
1455 op_metadata: op_metadata.clone(),
1456 },
1457 HydroRoot::CycleSink {
1458 cycle_id,
1459 input,
1460 op_metadata,
1461 } => HydroRoot::CycleSink {
1462 cycle_id: *cycle_id,
1463 input: Box::new(input.deep_clone(seen_tees)),
1464 op_metadata: op_metadata.clone(),
1465 },
1466 HydroRoot::EmbeddedOutput {
1467 ident,
1468 input,
1469 op_metadata,
1470 } => HydroRoot::EmbeddedOutput {
1471 ident: ident.clone(),
1472 input: Box::new(input.deep_clone(seen_tees)),
1473 op_metadata: op_metadata.clone(),
1474 },
1475 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1476 input: Box::new(input.deep_clone(seen_tees)),
1477 op_metadata: op_metadata.clone(),
1478 },
1479 }
1480 }
1481
1482 #[cfg(feature = "build")]
1483 pub fn emit(
1484 &mut self,
1485 graph_builders: &mut dyn DfirBuilder,
1486 seen_tees: &mut SeenSharedNodes,
1487 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1488 next_stmt_id: &mut crate::Counter<StmtId>,
1489 fold_hooked_idents: &mut HashSet<String>,
1490 ) {
1491 self.emit_core(
1492 &mut BuildersOrCallback::<
1493 fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1494 fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1495 >::Builders(graph_builders),
1496 seen_tees,
1497 built_tees,
1498 next_stmt_id,
1499 fold_hooked_idents,
1500 );
1501 }
1502
1503 #[cfg(feature = "build")]
1504 pub fn emit_core(
1505 &mut self,
1506 builders_or_callback: &mut BuildersOrCallback<
1507 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1508 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1509 >,
1510 seen_tees: &mut SeenSharedNodes,
1511 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1512 next_stmt_id: &mut crate::Counter<StmtId>,
1513 fold_hooked_idents: &mut HashSet<String>,
1514 ) {
1515 match self {
1516 HydroRoot::ForEach { f, input, .. } => {
1517 let input_ident = input.emit_core(
1518 builders_or_callback,
1519 seen_tees,
1520 built_tees,
1521 next_stmt_id,
1522 fold_hooked_idents,
1523 );
1524
1525 let stmt_id = next_stmt_id.get_and_increment();
1526
1527 match builders_or_callback {
1528 BuildersOrCallback::Builders(graph_builders) => {
1529 let mut ident_stack: Vec<syn::Ident> = Vec::new();
1530
1531 for (ref_node, _is_mut) in f.singleton_refs.iter() {
1533 let HydroNode::Reference { inner, .. } = ref_node else {
1534 panic!("singleton_refs should only contain HydroNode::Reference");
1535 };
1536 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1537 let idents = built_tees.get(&ptr).expect(
1538 "ForEach singleton ref not found in built_tees — ref node was not emitted",
1539 );
1540 ident_stack.push(idents[0].clone());
1541 }
1542
1543 let f_tokens = f.emit_tokens(&mut ident_stack);
1544
1545 graph_builders
1546 .get_dfir_mut(&input.metadata().location_id)
1547 .add_dfir(
1548 parse_quote! {
1549 #input_ident -> for_each(#f_tokens);
1550 },
1551 None,
1552 Some(&stmt_id.to_string()),
1553 );
1554 }
1555 BuildersOrCallback::Callback(leaf_callback, _) => {
1556 leaf_callback(self, next_stmt_id);
1557 }
1558 }
1559 }
1560
1561 HydroRoot::SendExternal {
1562 serialize_fn,
1563 instantiate_fn,
1564 input,
1565 ..
1566 } => {
1567 let input_ident = input.emit_core(
1568 builders_or_callback,
1569 seen_tees,
1570 built_tees,
1571 next_stmt_id,
1572 fold_hooked_idents,
1573 );
1574
1575 let stmt_id = next_stmt_id.get_and_increment();
1576
1577 match builders_or_callback {
1578 BuildersOrCallback::Builders(graph_builders) => {
1579 let (sink_expr, _) = match instantiate_fn {
1580 DebugInstantiate::Building => (
1581 syn::parse_quote!(DUMMY_SINK),
1582 syn::parse_quote!(DUMMY_SOURCE),
1583 ),
1584
1585 DebugInstantiate::Finalized(finalized) => {
1586 (finalized.sink.clone(), finalized.source.clone())
1587 }
1588 };
1589
1590 graph_builders.create_external_output(
1591 &input.metadata().location_id,
1592 sink_expr,
1593 &input_ident,
1594 serialize_fn.as_ref(),
1595 stmt_id,
1596 );
1597 }
1598 BuildersOrCallback::Callback(leaf_callback, _) => {
1599 leaf_callback(self, next_stmt_id);
1600 }
1601 }
1602 }
1603
1604 HydroRoot::DestSink { sink, input, .. } => {
1605 let input_ident = input.emit_core(
1606 builders_or_callback,
1607 seen_tees,
1608 built_tees,
1609 next_stmt_id,
1610 fold_hooked_idents,
1611 );
1612
1613 let stmt_id = next_stmt_id.get_and_increment();
1614
1615 match builders_or_callback {
1616 BuildersOrCallback::Builders(graph_builders) => {
1617 graph_builders
1618 .get_dfir_mut(&input.metadata().location_id)
1619 .add_dfir(
1620 parse_quote! {
1621 #input_ident -> dest_sink(#sink);
1622 },
1623 None,
1624 Some(&stmt_id.to_string()),
1625 );
1626 }
1627 BuildersOrCallback::Callback(leaf_callback, _) => {
1628 leaf_callback(self, next_stmt_id);
1629 }
1630 }
1631 }
1632
1633 HydroRoot::CycleSink {
1634 cycle_id, input, ..
1635 } => {
1636 let input_ident = input.emit_core(
1637 builders_or_callback,
1638 seen_tees,
1639 built_tees,
1640 next_stmt_id,
1641 fold_hooked_idents,
1642 );
1643
1644 match builders_or_callback {
1645 BuildersOrCallback::Builders(graph_builders) => {
1646 let elem_type: syn::Type = match &input.metadata().collection_kind {
1647 CollectionKind::KeyedSingleton {
1648 key_type,
1649 value_type,
1650 ..
1651 }
1652 | CollectionKind::KeyedStream {
1653 key_type,
1654 value_type,
1655 ..
1656 } => {
1657 parse_quote!((#key_type, #value_type))
1658 }
1659 CollectionKind::Stream { element_type, .. }
1660 | CollectionKind::Singleton { element_type, .. }
1661 | CollectionKind::Optional { element_type, .. } => {
1662 parse_quote!(#element_type)
1663 }
1664 };
1665
1666 let cycle_id_ident = cycle_id.as_ident();
1667 graph_builders
1668 .get_dfir_mut(&input.metadata().location_id)
1669 .add_dfir(
1670 parse_quote! {
1671 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1672 },
1673 None,
1674 None,
1675 );
1676 }
1677 BuildersOrCallback::Callback(_, _) => {}
1679 }
1680 }
1681
1682 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1683 let input_ident = input.emit_core(
1684 builders_or_callback,
1685 seen_tees,
1686 built_tees,
1687 next_stmt_id,
1688 fold_hooked_idents,
1689 );
1690
1691 let stmt_id = next_stmt_id.get_and_increment();
1692
1693 match builders_or_callback {
1694 BuildersOrCallback::Builders(graph_builders) => {
1695 graph_builders
1696 .get_dfir_mut(&input.metadata().location_id)
1697 .add_dfir(
1698 parse_quote! {
1699 #input_ident -> for_each(&mut #ident);
1700 },
1701 None,
1702 Some(&stmt_id.to_string()),
1703 );
1704 }
1705 BuildersOrCallback::Callback(leaf_callback, _) => {
1706 leaf_callback(self, next_stmt_id);
1707 }
1708 }
1709 }
1710
1711 HydroRoot::Null { input, .. } => {
1712 let input_ident = input.emit_core(
1713 builders_or_callback,
1714 seen_tees,
1715 built_tees,
1716 next_stmt_id,
1717 fold_hooked_idents,
1718 );
1719
1720 let stmt_id = next_stmt_id.get_and_increment();
1721
1722 match builders_or_callback {
1723 BuildersOrCallback::Builders(graph_builders) => {
1724 graph_builders
1725 .get_dfir_mut(&input.metadata().location_id)
1726 .add_dfir(
1727 parse_quote! {
1728 #input_ident -> for_each(|_| {});
1729 },
1730 None,
1731 Some(&stmt_id.to_string()),
1732 );
1733 }
1734 BuildersOrCallback::Callback(leaf_callback, _) => {
1735 leaf_callback(self, next_stmt_id);
1736 }
1737 }
1738 }
1739 }
1740 }
1741
1742 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1743 match self {
1744 HydroRoot::ForEach { op_metadata, .. }
1745 | HydroRoot::SendExternal { op_metadata, .. }
1746 | HydroRoot::DestSink { op_metadata, .. }
1747 | HydroRoot::CycleSink { op_metadata, .. }
1748 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1749 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1750 }
1751 }
1752
1753 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1754 match self {
1755 HydroRoot::ForEach { op_metadata, .. }
1756 | HydroRoot::SendExternal { op_metadata, .. }
1757 | HydroRoot::DestSink { op_metadata, .. }
1758 | HydroRoot::CycleSink { op_metadata, .. }
1759 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1760 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1761 }
1762 }
1763
1764 pub fn input(&self) -> &HydroNode {
1765 match self {
1766 HydroRoot::ForEach { input, .. }
1767 | HydroRoot::SendExternal { input, .. }
1768 | HydroRoot::DestSink { input, .. }
1769 | HydroRoot::CycleSink { input, .. }
1770 | HydroRoot::EmbeddedOutput { input, .. }
1771 | HydroRoot::Null { input, .. } => input,
1772 }
1773 }
1774
1775 pub fn input_metadata(&self) -> &HydroIrMetadata {
1776 self.input().metadata()
1777 }
1778
1779 pub fn print_root(&self) -> String {
1780 match self {
1781 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1782 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1783 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1784 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1785 HydroRoot::EmbeddedOutput { ident, .. } => {
1786 format!("EmbeddedOutput({})", ident)
1787 }
1788 HydroRoot::Null { .. } => "Null".to_owned(),
1789 }
1790 }
1791
1792 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1793 match self {
1794 HydroRoot::ForEach { f, .. } => {
1795 transform(&mut f.expr);
1796 }
1797 HydroRoot::DestSink { sink, .. } => {
1798 transform(sink);
1799 }
1800 HydroRoot::SendExternal { .. }
1801 | HydroRoot::CycleSink { .. }
1802 | HydroRoot::EmbeddedOutput { .. }
1803 | HydroRoot::Null { .. } => {}
1804 }
1805 }
1806}
1807
1808#[cfg(feature = "build")]
1809fn tick_of(loc: &LocationId) -> Option<ClockId> {
1810 match loc {
1811 LocationId::Tick(id, _) => Some(*id),
1812 LocationId::Atomic(inner) => tick_of(inner),
1813 _ => None,
1814 }
1815}
1816
1817#[cfg(feature = "build")]
1818fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1819 match loc {
1820 LocationId::Tick(id, inner) => {
1821 *id = uf_find(uf, *id);
1822 remap_location(inner, uf);
1823 }
1824 LocationId::Atomic(inner) => {
1825 remap_location(inner, uf);
1826 }
1827 LocationId::Process(_) | LocationId::Cluster(_) => {}
1828 }
1829}
1830
1831#[cfg(feature = "build")]
1832fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1833 let p = *parent.get(&x).unwrap_or(&x);
1834 if p == x {
1835 return x;
1836 }
1837 let root = uf_find(parent, p);
1838 parent.insert(x, root);
1839 root
1840}
1841
1842#[cfg(feature = "build")]
1843fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1844 let ra = uf_find(parent, a);
1845 let rb = uf_find(parent, b);
1846 if ra != rb {
1847 parent.insert(ra, rb);
1848 }
1849}
1850
1851#[cfg(feature = "build")]
1855pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1856 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1857
1858 transform_bottom_up(
1860 ir,
1861 &mut |_| {},
1862 &mut |node: &mut HydroNode| match node {
1863 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1864 if let (Some(a), Some(b)) = (
1865 tick_of(&inner.metadata().location_id),
1866 tick_of(&metadata.location_id),
1867 ) {
1868 uf_union(&mut uf, a, b);
1869 }
1870 }
1871 HydroNode::Chain {
1872 first,
1873 second,
1874 metadata,
1875 }
1876 | HydroNode::ChainFirst {
1877 first,
1878 second,
1879 metadata,
1880 }
1881 | HydroNode::MergeOrdered {
1882 first,
1883 second,
1884 metadata,
1885 } => {
1886 if let (Some(a), Some(b)) = (
1887 tick_of(&first.metadata().location_id),
1888 tick_of(&metadata.location_id),
1889 ) {
1890 uf_union(&mut uf, a, b);
1891 }
1892 if let (Some(a), Some(b)) = (
1893 tick_of(&second.metadata().location_id),
1894 tick_of(&metadata.location_id),
1895 ) {
1896 uf_union(&mut uf, a, b);
1897 }
1898 }
1899 _ => {}
1900 },
1901 false,
1902 );
1903
1904 transform_bottom_up(
1906 ir,
1907 &mut |_| {},
1908 &mut |node: &mut HydroNode| {
1909 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1910 },
1911 false,
1912 );
1913}
1914
1915#[cfg(feature = "build")]
1916pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1917 let mut builders = SecondaryMap::new();
1918 let mut seen_tees = HashMap::new();
1919 let mut built_tees = HashMap::new();
1920 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1921 let mut fold_hooked_idents = HashSet::new();
1922 for leaf in ir {
1923 leaf.emit(
1924 &mut builders,
1925 &mut seen_tees,
1926 &mut built_tees,
1927 &mut next_stmt_id,
1928 &mut fold_hooked_idents,
1929 );
1930 }
1931 builders
1932}
1933
1934#[cfg(feature = "build")]
1935pub fn traverse_dfir(
1936 ir: &mut [HydroRoot],
1937 transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1938 transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1939) {
1940 let mut seen_tees = HashMap::new();
1941 let mut built_tees = HashMap::new();
1942 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1943 let mut fold_hooked_idents = HashSet::new();
1944 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1945 ir.iter_mut().for_each(|leaf| {
1946 leaf.emit_core(
1947 &mut callback,
1948 &mut seen_tees,
1949 &mut built_tees,
1950 &mut next_stmt_id,
1951 &mut fold_hooked_idents,
1952 );
1953 });
1954}
1955
1956pub fn transform_bottom_up(
1957 ir: &mut [HydroRoot],
1958 transform_root: &mut impl FnMut(&mut HydroRoot),
1959 transform_node: &mut impl FnMut(&mut HydroNode),
1960 check_well_formed: bool,
1961) {
1962 let mut seen_tees = HashMap::new();
1963 ir.iter_mut().for_each(|leaf| {
1964 leaf.transform_bottom_up(
1965 transform_root,
1966 transform_node,
1967 &mut seen_tees,
1968 check_well_formed,
1969 );
1970 });
1971}
1972
1973pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1974 let mut seen_tees = HashMap::new();
1975 ir.iter()
1976 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1977 .collect()
1978}
1979
1980type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1981thread_local! {
1982 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1983 static SERIALIZED_SHARED: PrintedTees
1987 = const { RefCell::new(None) };
1988}
1989
1990pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1991 PRINTED_TEES.with(|printed_tees| {
1992 let mut printed_tees_mut = printed_tees.borrow_mut();
1993 *printed_tees_mut = Some((0, HashMap::new()));
1994 drop(printed_tees_mut);
1995
1996 let ret = f();
1997
1998 let mut printed_tees_mut = printed_tees.borrow_mut();
1999 *printed_tees_mut = None;
2000
2001 ret
2002 })
2003}
2004
2005pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
2010 let _guard = SerializedSharedGuard::enter();
2011 f()
2012}
2013
2014struct SerializedSharedGuard {
2017 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
2018}
2019
2020impl SerializedSharedGuard {
2021 fn enter() -> Self {
2022 let previous = SERIALIZED_SHARED.with(|cell| {
2023 let mut guard = cell.borrow_mut();
2024 guard.replace((0, HashMap::new()))
2025 });
2026 Self { previous }
2027 }
2028}
2029
2030impl Drop for SerializedSharedGuard {
2031 fn drop(&mut self) {
2032 SERIALIZED_SHARED.with(|cell| {
2033 *cell.borrow_mut() = self.previous.take();
2034 });
2035 }
2036}
2037
2038pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2039
2040impl serde::Serialize for SharedNode {
2041 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2052 SERIALIZED_SHARED.with(|cell| {
2053 let mut guard = cell.borrow_mut();
2054 let state = guard.as_mut().ok_or_else(|| {
2056 serde::ser::Error::custom(
2057 "SharedNode serialization requires an active serialize_dedup_shared scope",
2058 )
2059 })?;
2060 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2061
2062 if let Some(&id) = state.1.get(&ptr) {
2063 drop(guard);
2064 use serde::ser::SerializeMap;
2065 let mut map = serializer.serialize_map(Some(1))?;
2066 map.serialize_entry("$shared_ref", &id)?;
2067 map.end()
2068 } else {
2069 let id = state.0;
2070 state.0 += 1;
2071 state.1.insert(ptr, id);
2072 drop(guard);
2073
2074 use serde::ser::SerializeMap;
2075 let mut map = serializer.serialize_map(Some(2))?;
2076 map.serialize_entry("$shared", &id)?;
2077 map.serialize_entry("node", &*self.0.borrow())?;
2078 map.end()
2079 }
2080 })
2081 }
2082}
2083
2084impl SharedNode {
2085 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2086 Rc::as_ptr(&self.0)
2087 }
2088}
2089
2090impl Debug for SharedNode {
2091 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2092 PRINTED_TEES.with(|printed_tees| {
2093 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2094 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2095
2096 if let Some(printed_tees_mut) = printed_tees_mut {
2097 if let Some(existing) = printed_tees_mut
2098 .1
2099 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2100 {
2101 write!(f, "<shared {}>", existing)
2102 } else {
2103 let next_id = printed_tees_mut.0;
2104 printed_tees_mut.0 += 1;
2105 printed_tees_mut
2106 .1
2107 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2108 drop(printed_tees_mut_borrow);
2109 write!(f, "<shared {}>: ", next_id)?;
2110 Debug::fmt(&self.0.borrow(), f)
2111 }
2112 } else {
2113 drop(printed_tees_mut_borrow);
2114 write!(f, "<shared>: ")?;
2115 Debug::fmt(&self.0.borrow(), f)
2116 }
2117 })
2118 }
2119}
2120
2121impl Hash for SharedNode {
2122 fn hash<H: Hasher>(&self, state: &mut H) {
2123 self.0.borrow_mut().hash(state);
2124 }
2125}
2126
2127#[derive(Debug)]
2132pub enum AccessCounter {
2133 Counting(Cell<u32>),
2134 Frozen(u32),
2135}
2136
2137impl AccessCounter {
2138 pub fn new() -> Self {
2139 Self::Counting(Cell::new(0))
2140 }
2141
2142 pub fn next_group(&self, is_mut: bool) -> Self {
2146 let AccessCounter::Counting(count) = self else {
2147 panic!("Cannot count on `AccessCounter::Frozen`");
2148 };
2149 let c = if is_mut {
2150 let c = count.get() + 1;
2151 count.set(c + 1);
2152 c
2153 } else {
2154 count.get()
2155 };
2156 Self::Frozen(c)
2157 }
2158
2159 pub fn freeze(&self) -> Self {
2161 Self::Frozen(match self {
2162 Self::Counting(count) => count.get(),
2163 Self::Frozen(count) => *count,
2164 })
2165 }
2166
2167 pub fn frozen_group(&self) -> u32 {
2168 let Self::Frozen(count) = self else {
2169 panic!("`AccessCounter` not frozen");
2170 };
2171 *count
2172 }
2173}
2174
2175impl Default for AccessCounter {
2176 fn default() -> Self {
2177 Self::new()
2178 }
2179}
2180
2181impl Hash for AccessCounter {
2182 fn hash<H: Hasher>(&self, _state: &mut H) {
2183 }
2185}
2186
2187impl serde::Serialize for AccessCounter {
2188 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2189 let count = match self {
2190 AccessCounter::Counting(count) => count.get(),
2191 AccessCounter::Frozen(count) => *count,
2192 };
2193 count.serialize(serializer)
2194 }
2195}
2196
2197#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2198pub enum BoundKind {
2199 Unbounded,
2200 Bounded,
2201}
2202
2203#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2204pub enum StreamOrder {
2205 NoOrder,
2206 TotalOrder,
2207}
2208
2209#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2210pub enum StreamRetry {
2211 AtLeastOnce,
2212 ExactlyOnce,
2213}
2214
2215#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2216pub enum KeyedSingletonBoundKind {
2217 Unbounded,
2218 MonotonicKeys,
2219 MonotonicValue,
2220 BoundedValue,
2221 Bounded,
2222}
2223
2224#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2225pub enum SingletonBoundKind {
2226 Unbounded,
2227 Monotonic,
2228 Bounded,
2229}
2230
2231#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2232pub enum CollectionKind {
2233 Stream {
2234 bound: BoundKind,
2235 order: StreamOrder,
2236 retry: StreamRetry,
2237 element_type: DebugType,
2238 },
2239 Singleton {
2240 bound: SingletonBoundKind,
2241 element_type: DebugType,
2242 },
2243 Optional {
2244 bound: BoundKind,
2245 element_type: DebugType,
2246 },
2247 KeyedStream {
2248 bound: BoundKind,
2249 value_order: StreamOrder,
2250 value_retry: StreamRetry,
2251 key_type: DebugType,
2252 value_type: DebugType,
2253 },
2254 KeyedSingleton {
2255 bound: KeyedSingletonBoundKind,
2256 key_type: DebugType,
2257 value_type: DebugType,
2258 },
2259}
2260
2261impl CollectionKind {
2262 pub fn is_bounded(&self) -> bool {
2263 matches!(
2264 self,
2265 CollectionKind::Stream {
2266 bound: BoundKind::Bounded,
2267 ..
2268 } | CollectionKind::Singleton {
2269 bound: SingletonBoundKind::Bounded,
2270 ..
2271 } | CollectionKind::Optional {
2272 bound: BoundKind::Bounded,
2273 ..
2274 } | CollectionKind::KeyedStream {
2275 bound: BoundKind::Bounded,
2276 ..
2277 } | CollectionKind::KeyedSingleton {
2278 bound: KeyedSingletonBoundKind::Bounded,
2279 ..
2280 }
2281 )
2282 }
2283
2284 pub fn is_strict(&self) -> bool {
2287 match self {
2288 CollectionKind::Stream { order, retry, .. } => {
2289 *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2290 }
2291 CollectionKind::KeyedStream {
2292 value_order,
2293 value_retry,
2294 ..
2295 } => {
2296 *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2297 }
2298 CollectionKind::Singleton { .. }
2301 | CollectionKind::Optional { .. }
2302 | CollectionKind::KeyedSingleton { .. } => true,
2303 }
2304 }
2305
2306 pub fn strict_kind(&self) -> CollectionKind {
2308 match self {
2309 CollectionKind::Stream {
2310 bound,
2311 element_type,
2312 ..
2313 } => CollectionKind::Stream {
2314 bound: bound.clone(),
2315 order: StreamOrder::TotalOrder,
2316 retry: StreamRetry::ExactlyOnce,
2317 element_type: element_type.clone(),
2318 },
2319 CollectionKind::KeyedStream {
2320 bound,
2321 key_type,
2322 value_type,
2323 ..
2324 } => CollectionKind::KeyedStream {
2325 bound: bound.clone(),
2326 value_order: StreamOrder::TotalOrder,
2327 value_retry: StreamRetry::ExactlyOnce,
2328 key_type: key_type.clone(),
2329 value_type: value_type.clone(),
2330 },
2331 other => other.clone(),
2332 }
2333 }
2334}
2335
2336#[derive(Clone, serde::Serialize)]
2337pub struct HydroIrMetadata {
2338 pub location_id: LocationId,
2339 pub collection_kind: CollectionKind,
2340 pub consistency: Option<ClusterConsistency>,
2341 pub cardinality: Option<usize>,
2342 pub tag: Option<String>,
2343 pub op: HydroIrOpMetadata,
2344}
2345
2346impl Hash for HydroIrMetadata {
2348 fn hash<H: Hasher>(&self, _: &mut H) {}
2349}
2350
2351impl PartialEq for HydroIrMetadata {
2352 fn eq(&self, _: &Self) -> bool {
2353 true
2354 }
2355}
2356
2357impl Eq for HydroIrMetadata {}
2358
2359impl Debug for HydroIrMetadata {
2360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2361 f.debug_struct("HydroIrMetadata")
2362 .field("location_id", &self.location_id)
2363 .field("collection_kind", &self.collection_kind)
2364 .finish()
2365 }
2366}
2367
2368#[derive(Clone, serde::Serialize)]
2371pub struct HydroIrOpMetadata {
2372 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2373 pub backtrace: Backtrace,
2374 pub cpu_usage: Option<f64>,
2375 pub network_recv_cpu_usage: Option<f64>,
2376 pub id: Option<usize>,
2377}
2378
2379impl HydroIrOpMetadata {
2380 #[expect(
2381 clippy::new_without_default,
2382 reason = "explicit calls to new ensure correct backtrace bounds"
2383 )]
2384 pub fn new() -> HydroIrOpMetadata {
2385 Self::new_with_skip(1)
2386 }
2387
2388 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2389 HydroIrOpMetadata {
2390 backtrace: Backtrace::get_backtrace(2 + skip_count),
2391 cpu_usage: None,
2392 network_recv_cpu_usage: None,
2393 id: None,
2394 }
2395 }
2396}
2397
2398impl Debug for HydroIrOpMetadata {
2399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2400 f.debug_struct("HydroIrOpMetadata").finish()
2401 }
2402}
2403
2404impl Hash for HydroIrOpMetadata {
2405 fn hash<H: Hasher>(&self, _: &mut H) {}
2406}
2407
2408#[derive(Debug, Hash, serde::Serialize)]
2411pub enum HydroNode {
2412 Placeholder,
2413
2414 Cast {
2422 inner: Box<HydroNode>,
2423 metadata: HydroIrMetadata,
2424 },
2425
2426 ObserveNonDet {
2432 inner: Box<HydroNode>,
2433 trusted: bool, metadata: HydroIrMetadata,
2435 },
2436
2437 Source {
2438 source: HydroSource,
2439 metadata: HydroIrMetadata,
2440 },
2441
2442 SingletonSource {
2443 value: DebugExpr,
2444 first_tick_only: bool,
2445 metadata: HydroIrMetadata,
2446 },
2447
2448 CycleSource {
2449 cycle_id: CycleId,
2450 metadata: HydroIrMetadata,
2451 },
2452
2453 Tee {
2454 inner: SharedNode,
2455 metadata: HydroIrMetadata,
2456 },
2457
2458 Reference {
2467 inner: SharedNode,
2468 kind: crate::handoff_ref::HandoffRefKind,
2469 access_counter: AccessCounter,
2470 metadata: HydroIrMetadata,
2471 },
2472
2473 Partition {
2474 inner: SharedNode,
2475 f: ClosureExpr,
2476 is_true: bool,
2477 metadata: HydroIrMetadata,
2478 },
2479
2480 BeginAtomic {
2481 inner: Box<HydroNode>,
2482 metadata: HydroIrMetadata,
2483 },
2484
2485 EndAtomic {
2486 inner: Box<HydroNode>,
2487 metadata: HydroIrMetadata,
2488 },
2489
2490 Batch {
2491 inner: Box<HydroNode>,
2492 metadata: HydroIrMetadata,
2493 },
2494
2495 YieldConcat {
2496 inner: Box<HydroNode>,
2497 metadata: HydroIrMetadata,
2498 },
2499
2500 Chain {
2501 first: Box<HydroNode>,
2502 second: Box<HydroNode>,
2503 metadata: HydroIrMetadata,
2504 },
2505
2506 MergeOrdered {
2507 first: Box<HydroNode>,
2508 second: Box<HydroNode>,
2509 metadata: HydroIrMetadata,
2510 },
2511
2512 ChainFirst {
2513 first: Box<HydroNode>,
2514 second: Box<HydroNode>,
2515 metadata: HydroIrMetadata,
2516 },
2517
2518 CrossProduct {
2519 left: Box<HydroNode>,
2520 right: Box<HydroNode>,
2521 metadata: HydroIrMetadata,
2522 },
2523
2524 CrossSingleton {
2525 left: Box<HydroNode>,
2526 right: Box<HydroNode>,
2527 metadata: HydroIrMetadata,
2528 },
2529
2530 Join {
2531 left: Box<HydroNode>,
2532 right: Box<HydroNode>,
2533 metadata: HydroIrMetadata,
2534 },
2535
2536 JoinHalf {
2540 left: Box<HydroNode>,
2541 right: Box<HydroNode>,
2542 metadata: HydroIrMetadata,
2543 },
2544
2545 Difference {
2546 pos: Box<HydroNode>,
2547 neg: Box<HydroNode>,
2548 metadata: HydroIrMetadata,
2549 },
2550
2551 AntiJoin {
2552 pos: Box<HydroNode>,
2553 neg: Box<HydroNode>,
2554 metadata: HydroIrMetadata,
2555 },
2556
2557 ResolveFutures {
2558 input: Box<HydroNode>,
2559 metadata: HydroIrMetadata,
2560 },
2561 ResolveFuturesBlocking {
2562 input: Box<HydroNode>,
2563 metadata: HydroIrMetadata,
2564 },
2565 ResolveFuturesOrdered {
2566 input: Box<HydroNode>,
2567 metadata: HydroIrMetadata,
2568 },
2569
2570 Map {
2571 f: ClosureExpr,
2572 input: Box<HydroNode>,
2573 metadata: HydroIrMetadata,
2574 },
2575 FlatMap {
2576 f: ClosureExpr,
2577 input: Box<HydroNode>,
2578 metadata: HydroIrMetadata,
2579 },
2580 FlatMapStreamBlocking {
2581 f: ClosureExpr,
2582 input: Box<HydroNode>,
2583 metadata: HydroIrMetadata,
2584 },
2585 Filter {
2586 f: ClosureExpr,
2587 input: Box<HydroNode>,
2588 metadata: HydroIrMetadata,
2589 },
2590 FilterMap {
2591 f: ClosureExpr,
2592 input: Box<HydroNode>,
2593 metadata: HydroIrMetadata,
2594 },
2595
2596 DeferTick {
2597 input: Box<HydroNode>,
2598 metadata: HydroIrMetadata,
2599 },
2600 Enumerate {
2601 input: Box<HydroNode>,
2602 metadata: HydroIrMetadata,
2603 },
2604 Inspect {
2605 f: ClosureExpr,
2606 input: Box<HydroNode>,
2607 metadata: HydroIrMetadata,
2608 },
2609
2610 Unique {
2611 input: Box<HydroNode>,
2612 metadata: HydroIrMetadata,
2613 },
2614
2615 Sort {
2616 input: Box<HydroNode>,
2617 metadata: HydroIrMetadata,
2618 },
2619 Fold {
2620 init: ClosureExpr,
2621 acc: ClosureExpr,
2622 input: Box<HydroNode>,
2623 metadata: HydroIrMetadata,
2624 },
2625
2626 Scan {
2627 init: ClosureExpr,
2628 acc: ClosureExpr,
2629 input: Box<HydroNode>,
2630 metadata: HydroIrMetadata,
2631 },
2632 ScanAsyncBlocking {
2633 init: ClosureExpr,
2634 acc: ClosureExpr,
2635 input: Box<HydroNode>,
2636 metadata: HydroIrMetadata,
2637 },
2638 FoldKeyed {
2639 init: ClosureExpr,
2640 acc: ClosureExpr,
2641 input: Box<HydroNode>,
2642 metadata: HydroIrMetadata,
2643 },
2644
2645 Reduce {
2646 f: ClosureExpr,
2647 input: Box<HydroNode>,
2648 metadata: HydroIrMetadata,
2649 },
2650 ReduceKeyed {
2651 f: ClosureExpr,
2652 input: Box<HydroNode>,
2653 metadata: HydroIrMetadata,
2654 },
2655 ReduceKeyedWatermark {
2656 f: ClosureExpr,
2657 input: Box<HydroNode>,
2658 watermark: Box<HydroNode>,
2659 metadata: HydroIrMetadata,
2660 },
2661
2662 Network {
2663 name: Option<String>,
2664 networking_info: crate::networking::NetworkingInfo,
2665 serialize_fn: Option<DebugExpr>,
2666 instantiate_fn: DebugInstantiate,
2667 deserialize_fn: Option<DebugExpr>,
2668 input: Box<HydroNode>,
2669 metadata: HydroIrMetadata,
2670 },
2671
2672 VersionedNetworkFork {
2673 channel_id: u32,
2674 channel_name: String,
2675 senders: Vec<(u32, Box<HydroNode>, Option<DebugExpr>)>,
2676 metadata: HydroIrMetadata,
2677 },
2678
2679 VersionedNetwork {
2680 fork: SharedNode,
2681 version: u32,
2682 deserialize_fn: Option<DebugExpr>,
2683 metadata: HydroIrMetadata,
2684 },
2685
2686 ExternalInput {
2687 from_external_key: LocationKey,
2688 from_port_id: ExternalPortId,
2689 from_many: bool,
2690 codec_type: DebugType,
2691 #[serde(skip)]
2692 port_hint: NetworkHint,
2693 instantiate_fn: DebugInstantiate,
2694 deserialize_fn: Option<DebugExpr>,
2695 metadata: HydroIrMetadata,
2696 },
2697
2698 Counter {
2699 tag: String,
2700 duration: DebugExpr,
2701 prefix: String,
2702 input: Box<HydroNode>,
2703 metadata: HydroIrMetadata,
2704 },
2705
2706 AssertIsConsistent {
2707 inner: Box<HydroNode>,
2708 trusted: bool,
2709 metadata: HydroIrMetadata,
2710 },
2711
2712 UnboundSingleton {
2713 inner: Box<HydroNode>,
2714 metadata: HydroIrMetadata,
2715 },
2716}
2717
2718pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2719pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2720
2721#[cfg(feature = "build")]
2725fn maybe_observe_for_mut(
2726 f: &ClosureExpr,
2727 in_ident: syn::Ident,
2728 in_location: &LocationId,
2729 in_kind: &CollectionKind,
2730 op_meta: &HydroIrOpMetadata,
2731 builders_or_callback: &mut BuildersOrCallback<
2732 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2733 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2734 >,
2735 next_stmt_id: &mut crate::Counter<StmtId>,
2736) -> syn::Ident {
2737 if f.has_mut_ref() && !in_kind.is_strict() {
2738 let observe_stmt_id = next_stmt_id.get_and_increment();
2739 let observe_ident =
2740 syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2741 if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2742 graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2743 }
2744 observe_ident
2745 } else {
2746 in_ident
2747 }
2748}
2749
2750impl HydroNode {
2751 pub fn transform_bottom_up(
2752 &mut self,
2753 transform: &mut impl FnMut(&mut HydroNode),
2754 seen_tees: &mut SeenSharedNodes,
2755 check_well_formed: bool,
2756 ) {
2757 self.transform_children(
2758 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2759 seen_tees,
2760 );
2761
2762 transform(self);
2763
2764 let self_location = self.metadata().location_id.root();
2765
2766 if check_well_formed {
2767 match &*self {
2768 HydroNode::Network { .. } => {}
2769 _ => {
2770 self.input_metadata().iter().for_each(|i| {
2771 if i.location_id.root() != self_location {
2772 panic!(
2773 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2774 i,
2775 i.location_id.root(),
2776 self,
2777 self_location
2778 )
2779 }
2780 });
2781 }
2782 }
2783 }
2784 }
2785
2786 #[inline(always)]
2787 pub fn transform_children(
2788 &mut self,
2789 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2790 seen_tees: &mut SeenSharedNodes,
2791 ) {
2792 match self {
2793 HydroNode::Placeholder => {
2794 panic!();
2795 }
2796
2797 HydroNode::Source { .. }
2798 | HydroNode::SingletonSource { .. }
2799 | HydroNode::CycleSource { .. }
2800 | HydroNode::ExternalInput { .. } => {}
2801
2802 HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2803 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2804 *inner = SharedNode(transformed.clone());
2805 } else {
2806 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2807 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2808 let mut orig = inner.0.replace(HydroNode::Placeholder);
2809 transform(&mut orig, seen_tees);
2810 *transformed_cell.borrow_mut() = orig;
2811 *inner = SharedNode(transformed_cell);
2812 }
2813 }
2814
2815 HydroNode::Partition { inner, f, .. } => {
2816 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2817 *inner = SharedNode(transformed.clone());
2818 } else {
2819 f.transform_children(&mut transform, seen_tees);
2820 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2821 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2822 let mut orig = inner.0.replace(HydroNode::Placeholder);
2823 transform(&mut orig, seen_tees);
2824 *transformed_cell.borrow_mut() = orig;
2825 *inner = SharedNode(transformed_cell);
2826 }
2827 }
2828
2829 HydroNode::Cast { inner, .. }
2830 | HydroNode::ObserveNonDet { inner, .. }
2831 | HydroNode::BeginAtomic { inner, .. }
2832 | HydroNode::EndAtomic { inner, .. }
2833 | HydroNode::Batch { inner, .. }
2834 | HydroNode::YieldConcat { inner, .. }
2835 | HydroNode::UnboundSingleton { inner, .. }
2836 | HydroNode::AssertIsConsistent { inner, .. } => {
2837 transform(inner.as_mut(), seen_tees);
2838 }
2839
2840 HydroNode::Chain { first, second, .. } => {
2841 transform(first.as_mut(), seen_tees);
2842 transform(second.as_mut(), seen_tees);
2843 }
2844
2845 HydroNode::MergeOrdered { first, second, .. } => {
2846 transform(first.as_mut(), seen_tees);
2847 transform(second.as_mut(), seen_tees);
2848 }
2849
2850 HydroNode::ChainFirst { first, second, .. } => {
2851 transform(first.as_mut(), seen_tees);
2852 transform(second.as_mut(), seen_tees);
2853 }
2854
2855 HydroNode::CrossSingleton { left, right, .. }
2856 | HydroNode::CrossProduct { left, right, .. }
2857 | HydroNode::Join { left, right, .. }
2858 | HydroNode::JoinHalf { left, right, .. } => {
2859 transform(left.as_mut(), seen_tees);
2860 transform(right.as_mut(), seen_tees);
2861 }
2862
2863 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2864 transform(pos.as_mut(), seen_tees);
2865 transform(neg.as_mut(), seen_tees);
2866 }
2867
2868 HydroNode::Map { f, input, .. } => {
2869 f.transform_children(&mut transform, seen_tees);
2870 transform(input.as_mut(), seen_tees);
2871 }
2872 HydroNode::FlatMap { f, input, .. }
2873 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2874 | HydroNode::Filter { f, input, .. }
2875 | HydroNode::FilterMap { f, input, .. }
2876 | HydroNode::Inspect { f, input, .. }
2877 | HydroNode::Reduce { f, input, .. }
2878 | HydroNode::ReduceKeyed { f, input, .. } => {
2879 f.transform_children(&mut transform, seen_tees);
2880 transform(input.as_mut(), seen_tees);
2881 }
2882 HydroNode::ReduceKeyedWatermark {
2883 f,
2884 input,
2885 watermark,
2886 ..
2887 } => {
2888 f.transform_children(&mut transform, seen_tees);
2889 transform(input.as_mut(), seen_tees);
2890 transform(watermark.as_mut(), seen_tees);
2891 }
2892 HydroNode::Fold {
2893 init, acc, input, ..
2894 }
2895 | HydroNode::Scan {
2896 init, acc, input, ..
2897 }
2898 | HydroNode::ScanAsyncBlocking {
2899 init, acc, input, ..
2900 }
2901 | HydroNode::FoldKeyed {
2902 init, acc, input, ..
2903 } => {
2904 init.transform_children(&mut transform, seen_tees);
2905 acc.transform_children(&mut transform, seen_tees);
2906 transform(input.as_mut(), seen_tees);
2907 }
2908 HydroNode::ResolveFutures { input, .. }
2909 | HydroNode::ResolveFuturesBlocking { input, .. }
2910 | HydroNode::ResolveFuturesOrdered { input, .. }
2911 | HydroNode::Sort { input, .. }
2912 | HydroNode::DeferTick { input, .. }
2913 | HydroNode::Enumerate { input, .. }
2914 | HydroNode::Unique { input, .. }
2915 | HydroNode::Network { input, .. }
2916 | HydroNode::Counter { input, .. } => {
2917 transform(input.as_mut(), seen_tees);
2918 }
2919
2920 HydroNode::VersionedNetworkFork { senders, .. } => {
2921 for (_version, sender, _serialize) in senders.iter_mut() {
2922 transform(sender.as_mut(), seen_tees);
2923 }
2924 }
2925
2926 HydroNode::VersionedNetwork { fork, .. } => {
2927 if let Some(transformed) = seen_tees.get(&fork.as_ptr()) {
2928 *fork = SharedNode(transformed.clone());
2929 } else {
2930 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2931 seen_tees.insert(fork.as_ptr(), transformed_cell.clone());
2932 let mut orig = fork.0.replace(HydroNode::Placeholder);
2933 transform(&mut orig, seen_tees);
2934 *transformed_cell.borrow_mut() = orig;
2935 *fork = SharedNode(transformed_cell);
2936 }
2937 }
2938 }
2939 }
2940
2941 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2942 match self {
2943 HydroNode::Placeholder => HydroNode::Placeholder,
2944 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2945 inner: Box::new(inner.deep_clone(seen_tees)),
2946 metadata: metadata.clone(),
2947 },
2948 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2949 inner: Box::new(inner.deep_clone(seen_tees)),
2950 metadata: metadata.clone(),
2951 },
2952 HydroNode::ObserveNonDet {
2953 inner,
2954 trusted,
2955 metadata,
2956 } => HydroNode::ObserveNonDet {
2957 inner: Box::new(inner.deep_clone(seen_tees)),
2958 trusted: *trusted,
2959 metadata: metadata.clone(),
2960 },
2961 HydroNode::AssertIsConsistent {
2962 inner,
2963 trusted,
2964 metadata,
2965 } => HydroNode::AssertIsConsistent {
2966 inner: Box::new(inner.deep_clone(seen_tees)),
2967 trusted: *trusted,
2968 metadata: metadata.clone(),
2969 },
2970 HydroNode::Source { source, metadata } => HydroNode::Source {
2971 source: source.clone(),
2972 metadata: metadata.clone(),
2973 },
2974 HydroNode::SingletonSource {
2975 value,
2976 first_tick_only,
2977 metadata,
2978 } => HydroNode::SingletonSource {
2979 value: value.clone(),
2980 first_tick_only: *first_tick_only,
2981 metadata: metadata.clone(),
2982 },
2983 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2984 cycle_id: *cycle_id,
2985 metadata: metadata.clone(),
2986 },
2987 HydroNode::Tee { inner, metadata }
2988 | HydroNode::Reference {
2989 inner, metadata, ..
2990 } => {
2991 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2992 SharedNode(transformed.clone())
2993 } else {
2994 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2995 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2996 let cloned = inner.0.borrow().deep_clone(seen_tees);
2997 *new_rc.borrow_mut() = cloned;
2998 SharedNode(new_rc)
2999 };
3000 if let HydroNode::Reference {
3001 kind,
3002 access_counter,
3003 ..
3004 } = self
3005 {
3006 HydroNode::Reference {
3007 inner: cloned_inner,
3008 kind: *kind,
3009 access_counter: access_counter.freeze(),
3010 metadata: metadata.clone(),
3011 }
3012 } else {
3013 HydroNode::Tee {
3014 inner: cloned_inner,
3015 metadata: metadata.clone(),
3016 }
3017 }
3018 }
3019 HydroNode::Partition {
3020 inner,
3021 f,
3022 is_true,
3023 metadata,
3024 } => {
3025 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
3026 HydroNode::Partition {
3027 inner: SharedNode(transformed.clone()),
3028 f: f.deep_clone(seen_tees),
3029 is_true: *is_true,
3030 metadata: metadata.clone(),
3031 }
3032 } else {
3033 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3034 seen_tees.insert(inner.as_ptr(), new_rc.clone());
3035 let cloned = inner.0.borrow().deep_clone(seen_tees);
3036 *new_rc.borrow_mut() = cloned;
3037 HydroNode::Partition {
3038 inner: SharedNode(new_rc),
3039 f: f.deep_clone(seen_tees),
3040 is_true: *is_true,
3041 metadata: metadata.clone(),
3042 }
3043 }
3044 }
3045 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
3046 inner: Box::new(inner.deep_clone(seen_tees)),
3047 metadata: metadata.clone(),
3048 },
3049 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
3050 inner: Box::new(inner.deep_clone(seen_tees)),
3051 metadata: metadata.clone(),
3052 },
3053 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
3054 inner: Box::new(inner.deep_clone(seen_tees)),
3055 metadata: metadata.clone(),
3056 },
3057 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
3058 inner: Box::new(inner.deep_clone(seen_tees)),
3059 metadata: metadata.clone(),
3060 },
3061 HydroNode::Chain {
3062 first,
3063 second,
3064 metadata,
3065 } => HydroNode::Chain {
3066 first: Box::new(first.deep_clone(seen_tees)),
3067 second: Box::new(second.deep_clone(seen_tees)),
3068 metadata: metadata.clone(),
3069 },
3070 HydroNode::MergeOrdered {
3071 first,
3072 second,
3073 metadata,
3074 } => HydroNode::MergeOrdered {
3075 first: Box::new(first.deep_clone(seen_tees)),
3076 second: Box::new(second.deep_clone(seen_tees)),
3077 metadata: metadata.clone(),
3078 },
3079 HydroNode::ChainFirst {
3080 first,
3081 second,
3082 metadata,
3083 } => HydroNode::ChainFirst {
3084 first: Box::new(first.deep_clone(seen_tees)),
3085 second: Box::new(second.deep_clone(seen_tees)),
3086 metadata: metadata.clone(),
3087 },
3088 HydroNode::CrossProduct {
3089 left,
3090 right,
3091 metadata,
3092 } => HydroNode::CrossProduct {
3093 left: Box::new(left.deep_clone(seen_tees)),
3094 right: Box::new(right.deep_clone(seen_tees)),
3095 metadata: metadata.clone(),
3096 },
3097 HydroNode::CrossSingleton {
3098 left,
3099 right,
3100 metadata,
3101 } => HydroNode::CrossSingleton {
3102 left: Box::new(left.deep_clone(seen_tees)),
3103 right: Box::new(right.deep_clone(seen_tees)),
3104 metadata: metadata.clone(),
3105 },
3106 HydroNode::Join {
3107 left,
3108 right,
3109 metadata,
3110 } => HydroNode::Join {
3111 left: Box::new(left.deep_clone(seen_tees)),
3112 right: Box::new(right.deep_clone(seen_tees)),
3113 metadata: metadata.clone(),
3114 },
3115 HydroNode::JoinHalf {
3116 left,
3117 right,
3118 metadata,
3119 } => HydroNode::JoinHalf {
3120 left: Box::new(left.deep_clone(seen_tees)),
3121 right: Box::new(right.deep_clone(seen_tees)),
3122 metadata: metadata.clone(),
3123 },
3124 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3125 pos: Box::new(pos.deep_clone(seen_tees)),
3126 neg: Box::new(neg.deep_clone(seen_tees)),
3127 metadata: metadata.clone(),
3128 },
3129 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3130 pos: Box::new(pos.deep_clone(seen_tees)),
3131 neg: Box::new(neg.deep_clone(seen_tees)),
3132 metadata: metadata.clone(),
3133 },
3134 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3135 input: Box::new(input.deep_clone(seen_tees)),
3136 metadata: metadata.clone(),
3137 },
3138 HydroNode::ResolveFuturesBlocking { input, metadata } => {
3139 HydroNode::ResolveFuturesBlocking {
3140 input: Box::new(input.deep_clone(seen_tees)),
3141 metadata: metadata.clone(),
3142 }
3143 }
3144 HydroNode::ResolveFuturesOrdered { input, metadata } => {
3145 HydroNode::ResolveFuturesOrdered {
3146 input: Box::new(input.deep_clone(seen_tees)),
3147 metadata: metadata.clone(),
3148 }
3149 }
3150 HydroNode::Map { f, input, metadata } => HydroNode::Map {
3151 f: f.deep_clone(seen_tees),
3152 input: Box::new(input.deep_clone(seen_tees)),
3153 metadata: metadata.clone(),
3154 },
3155 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3156 f: f.deep_clone(seen_tees),
3157 input: Box::new(input.deep_clone(seen_tees)),
3158 metadata: metadata.clone(),
3159 },
3160 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3161 HydroNode::FlatMapStreamBlocking {
3162 f: f.deep_clone(seen_tees),
3163 input: Box::new(input.deep_clone(seen_tees)),
3164 metadata: metadata.clone(),
3165 }
3166 }
3167 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3168 f: f.deep_clone(seen_tees),
3169 input: Box::new(input.deep_clone(seen_tees)),
3170 metadata: metadata.clone(),
3171 },
3172 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3173 f: f.deep_clone(seen_tees),
3174 input: Box::new(input.deep_clone(seen_tees)),
3175 metadata: metadata.clone(),
3176 },
3177 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3178 input: Box::new(input.deep_clone(seen_tees)),
3179 metadata: metadata.clone(),
3180 },
3181 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3182 input: Box::new(input.deep_clone(seen_tees)),
3183 metadata: metadata.clone(),
3184 },
3185 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3186 f: f.deep_clone(seen_tees),
3187 input: Box::new(input.deep_clone(seen_tees)),
3188 metadata: metadata.clone(),
3189 },
3190 HydroNode::Unique { input, metadata } => HydroNode::Unique {
3191 input: Box::new(input.deep_clone(seen_tees)),
3192 metadata: metadata.clone(),
3193 },
3194 HydroNode::Sort { input, metadata } => HydroNode::Sort {
3195 input: Box::new(input.deep_clone(seen_tees)),
3196 metadata: metadata.clone(),
3197 },
3198 HydroNode::Fold {
3199 init,
3200 acc,
3201 input,
3202 metadata,
3203 } => HydroNode::Fold {
3204 init: init.deep_clone(seen_tees),
3205 acc: acc.deep_clone(seen_tees),
3206 input: Box::new(input.deep_clone(seen_tees)),
3207 metadata: metadata.clone(),
3208 },
3209 HydroNode::Scan {
3210 init,
3211 acc,
3212 input,
3213 metadata,
3214 } => HydroNode::Scan {
3215 init: init.deep_clone(seen_tees),
3216 acc: acc.deep_clone(seen_tees),
3217 input: Box::new(input.deep_clone(seen_tees)),
3218 metadata: metadata.clone(),
3219 },
3220 HydroNode::ScanAsyncBlocking {
3221 init,
3222 acc,
3223 input,
3224 metadata,
3225 } => HydroNode::ScanAsyncBlocking {
3226 init: init.deep_clone(seen_tees),
3227 acc: acc.deep_clone(seen_tees),
3228 input: Box::new(input.deep_clone(seen_tees)),
3229 metadata: metadata.clone(),
3230 },
3231 HydroNode::FoldKeyed {
3232 init,
3233 acc,
3234 input,
3235 metadata,
3236 } => HydroNode::FoldKeyed {
3237 init: init.deep_clone(seen_tees),
3238 acc: acc.deep_clone(seen_tees),
3239 input: Box::new(input.deep_clone(seen_tees)),
3240 metadata: metadata.clone(),
3241 },
3242 HydroNode::ReduceKeyedWatermark {
3243 f,
3244 input,
3245 watermark,
3246 metadata,
3247 } => HydroNode::ReduceKeyedWatermark {
3248 f: f.deep_clone(seen_tees),
3249 input: Box::new(input.deep_clone(seen_tees)),
3250 watermark: Box::new(watermark.deep_clone(seen_tees)),
3251 metadata: metadata.clone(),
3252 },
3253 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3254 f: f.deep_clone(seen_tees),
3255 input: Box::new(input.deep_clone(seen_tees)),
3256 metadata: metadata.clone(),
3257 },
3258 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3259 f: f.deep_clone(seen_tees),
3260 input: Box::new(input.deep_clone(seen_tees)),
3261 metadata: metadata.clone(),
3262 },
3263 HydroNode::Network {
3264 name,
3265 networking_info,
3266 serialize_fn,
3267 instantiate_fn,
3268 deserialize_fn,
3269 input,
3270 metadata,
3271 } => HydroNode::Network {
3272 name: name.clone(),
3273 networking_info: networking_info.clone(),
3274 serialize_fn: serialize_fn.clone(),
3275 instantiate_fn: instantiate_fn.clone(),
3276 deserialize_fn: deserialize_fn.clone(),
3277 input: Box::new(input.deep_clone(seen_tees)),
3278 metadata: metadata.clone(),
3279 },
3280 HydroNode::ExternalInput {
3281 from_external_key,
3282 from_port_id,
3283 from_many,
3284 codec_type,
3285 port_hint,
3286 instantiate_fn,
3287 deserialize_fn,
3288 metadata,
3289 } => HydroNode::ExternalInput {
3290 from_external_key: *from_external_key,
3291 from_port_id: *from_port_id,
3292 from_many: *from_many,
3293 codec_type: codec_type.clone(),
3294 port_hint: *port_hint,
3295 instantiate_fn: instantiate_fn.clone(),
3296 deserialize_fn: deserialize_fn.clone(),
3297 metadata: metadata.clone(),
3298 },
3299 HydroNode::Counter {
3300 tag,
3301 duration,
3302 prefix,
3303 input,
3304 metadata,
3305 } => HydroNode::Counter {
3306 tag: tag.clone(),
3307 duration: duration.clone(),
3308 prefix: prefix.clone(),
3309 input: Box::new(input.deep_clone(seen_tees)),
3310 metadata: metadata.clone(),
3311 },
3312 HydroNode::VersionedNetworkFork {
3313 channel_id,
3314 channel_name,
3315 senders,
3316 metadata,
3317 } => HydroNode::VersionedNetworkFork {
3318 channel_id: *channel_id,
3319 channel_name: channel_name.clone(),
3320 senders: senders
3321 .iter()
3322 .map(|(version, sender, serialize)| {
3323 (
3324 *version,
3325 Box::new(sender.deep_clone(seen_tees)),
3326 serialize.clone(),
3327 )
3328 })
3329 .collect(),
3330 metadata: metadata.clone(),
3331 },
3332 HydroNode::VersionedNetwork {
3333 fork,
3334 version,
3335 deserialize_fn,
3336 metadata,
3337 } => {
3338 let cloned_fork = if let Some(transformed) = seen_tees.get(&fork.as_ptr()) {
3339 SharedNode(transformed.clone())
3340 } else {
3341 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3342 seen_tees.insert(fork.as_ptr(), new_rc.clone());
3343 let cloned = fork.0.borrow().deep_clone(seen_tees);
3344 *new_rc.borrow_mut() = cloned;
3345 SharedNode(new_rc)
3346 };
3347 HydroNode::VersionedNetwork {
3348 fork: cloned_fork,
3349 version: *version,
3350 deserialize_fn: deserialize_fn.clone(),
3351 metadata: metadata.clone(),
3352 }
3353 }
3354 }
3355 }
3356
3357 #[cfg(feature = "build")]
3358 pub fn emit_core(
3359 &mut self,
3360 builders_or_callback: &mut BuildersOrCallback<
3361 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3362 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3363 >,
3364 seen_tees: &mut SeenSharedNodes,
3365 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3366 next_stmt_id: &mut crate::Counter<StmtId>,
3367 fold_hooked_idents: &mut HashSet<String>,
3368 ) -> syn::Ident {
3369 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3370
3371 self.transform_bottom_up(
3372 &mut |node: &mut HydroNode| {
3373 let out_location = node.metadata().location_id.clone();
3374 match node {
3375 HydroNode::Placeholder => {
3376 panic!()
3377 }
3378
3379 HydroNode::Cast { .. } => {
3380 let _ = next_stmt_id.get_and_increment();
3383 match builders_or_callback {
3384 BuildersOrCallback::Builders(_) => {}
3385 BuildersOrCallback::Callback(_, node_callback) => {
3386 node_callback(node, next_stmt_id);
3387 }
3388 }
3389 }
3391
3392 HydroNode::UnboundSingleton { .. } => {
3393 let inner_ident = ident_stack.pop().unwrap();
3394
3395 let stmt_id = next_stmt_id.get_and_increment();
3396 let out_ident =
3397 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3398
3399 match builders_or_callback {
3400 BuildersOrCallback::Builders(graph_builders) => {
3401 if graph_builders.singleton_intermediates() {
3402 let builder = graph_builders.get_dfir_mut(&out_location);
3403 builder.add_dfir(
3404 parse_quote! {
3405 #out_ident = #inner_ident;
3406 },
3407 None,
3408 None,
3409 );
3410 } else {
3411 let builder = graph_builders.get_dfir_mut(&out_location);
3412 builder.add_dfir(
3413 parse_quote! {
3414 #out_ident = #inner_ident -> persist::<'static>();
3415 },
3416 None,
3417 None,
3418 );
3419 }
3420 }
3421 BuildersOrCallback::Callback(_, node_callback) => {
3422 node_callback(node, next_stmt_id);
3423 }
3424 }
3425
3426 ident_stack.push(out_ident);
3427 }
3428
3429 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3430 let inner_ident = ident_stack.pop().unwrap();
3431
3432 let stmt_id = next_stmt_id.get_and_increment();
3433 let out_ident =
3434 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3435
3436 match builders_or_callback {
3437 BuildersOrCallback::Builders(graph_builders) => {
3438 graph_builders.assert_is_consistent(
3439 *trusted,
3440 &inner.metadata().location_id,
3441 inner_ident,
3442 &out_ident,
3443 );
3444 }
3445 BuildersOrCallback::Callback(_, node_callback) => {
3446 node_callback(node, next_stmt_id);
3447 }
3448 }
3449
3450 ident_stack.push(out_ident);
3451 }
3452
3453 HydroNode::ObserveNonDet {
3454 inner,
3455 trusted,
3456 metadata,
3457 ..
3458 } => {
3459 let inner_ident = ident_stack.pop().unwrap();
3460
3461 let stmt_id = next_stmt_id.get_and_increment();
3462 let observe_ident =
3463 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3464
3465 match builders_or_callback {
3466 BuildersOrCallback::Builders(graph_builders) => {
3467 graph_builders.observe_nondet(
3468 *trusted,
3469 &inner.metadata().location_id,
3470 inner_ident,
3471 &inner.metadata().collection_kind,
3472 &observe_ident,
3473 &metadata.collection_kind,
3474 &metadata.op,
3475 );
3476 }
3477 BuildersOrCallback::Callback(_, node_callback) => {
3478 node_callback(node, next_stmt_id);
3479 }
3480 }
3481
3482 ident_stack.push(observe_ident);
3483 }
3484
3485 HydroNode::Batch {
3486 inner, metadata, ..
3487 } => {
3488 let inner_ident = ident_stack.pop().unwrap();
3489
3490 let stmt_id = next_stmt_id.get_and_increment();
3491 let batch_ident =
3492 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3493
3494 match builders_or_callback {
3495 BuildersOrCallback::Builders(graph_builders) => {
3496 graph_builders.batch(
3497 inner_ident,
3498 &inner.metadata().location_id,
3499 &inner.metadata().collection_kind,
3500 &batch_ident,
3501 &out_location,
3502 &metadata.op,
3503 fold_hooked_idents,
3504 );
3505 }
3506 BuildersOrCallback::Callback(_, node_callback) => {
3507 node_callback(node, next_stmt_id);
3508 }
3509 }
3510
3511 ident_stack.push(batch_ident);
3512 }
3513
3514 HydroNode::YieldConcat { inner, .. } => {
3515 let inner_ident = ident_stack.pop().unwrap();
3516
3517 let stmt_id = next_stmt_id.get_and_increment();
3518 let yield_ident =
3519 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3520
3521 match builders_or_callback {
3522 BuildersOrCallback::Builders(graph_builders) => {
3523 graph_builders.yield_from_tick(
3524 inner_ident,
3525 &inner.metadata().location_id,
3526 &inner.metadata().collection_kind,
3527 &yield_ident,
3528 &out_location,
3529 );
3530 }
3531 BuildersOrCallback::Callback(_, node_callback) => {
3532 node_callback(node, next_stmt_id);
3533 }
3534 }
3535
3536 ident_stack.push(yield_ident);
3537 }
3538
3539 HydroNode::BeginAtomic { inner, metadata } => {
3540 let inner_ident = ident_stack.pop().unwrap();
3541
3542 let stmt_id = next_stmt_id.get_and_increment();
3543 let begin_ident =
3544 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3545
3546 match builders_or_callback {
3547 BuildersOrCallback::Builders(graph_builders) => {
3548 graph_builders.begin_atomic(
3549 inner_ident,
3550 &inner.metadata().location_id,
3551 &inner.metadata().collection_kind,
3552 &begin_ident,
3553 &out_location,
3554 &metadata.op,
3555 );
3556 }
3557 BuildersOrCallback::Callback(_, node_callback) => {
3558 node_callback(node, next_stmt_id);
3559 }
3560 }
3561
3562 ident_stack.push(begin_ident);
3563 }
3564
3565 HydroNode::EndAtomic { inner, .. } => {
3566 let inner_ident = ident_stack.pop().unwrap();
3567
3568 let stmt_id = next_stmt_id.get_and_increment();
3569 let end_ident =
3570 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3571
3572 match builders_or_callback {
3573 BuildersOrCallback::Builders(graph_builders) => {
3574 graph_builders.end_atomic(
3575 inner_ident,
3576 &inner.metadata().location_id,
3577 &inner.metadata().collection_kind,
3578 &end_ident,
3579 );
3580 }
3581 BuildersOrCallback::Callback(_, node_callback) => {
3582 node_callback(node, next_stmt_id);
3583 }
3584 }
3585
3586 ident_stack.push(end_ident);
3587 }
3588
3589 HydroNode::Source {
3590 source, metadata, ..
3591 } => {
3592 if let HydroSource::ExternalNetwork() = source {
3593 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3594 } else {
3595 let stmt_id = next_stmt_id.get_and_increment();
3596 let source_ident =
3597 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3598
3599 let source_stmt = match source {
3600 HydroSource::Stream(expr) => {
3601 debug_assert!(metadata.location_id.is_top_level());
3602 parse_quote! {
3603 #source_ident = source_stream(#expr);
3604 }
3605 }
3606
3607 HydroSource::ExternalNetwork() => {
3608 unreachable!()
3609 }
3610
3611 HydroSource::Iter(expr) => {
3612 if metadata.location_id.is_top_level() {
3613 parse_quote! {
3614 #source_ident = source_iter(#expr);
3615 }
3616 } else {
3617 parse_quote! {
3619 #source_ident = source_iter(#expr) -> persist::<'static>();
3620 }
3621 }
3622 }
3623
3624 HydroSource::Spin() => {
3625 debug_assert!(metadata.location_id.is_top_level());
3626 parse_quote! {
3627 #source_ident = spin();
3628 }
3629 }
3630
3631 HydroSource::ClusterMembers(target_loc, state) => {
3632 debug_assert!(metadata.location_id.is_top_level());
3633
3634 let members_tee_ident = syn::Ident::new(
3635 &format!(
3636 "__cluster_members_tee_{}_{}",
3637 metadata.location_id.root().key(),
3638 target_loc.key(),
3639 ),
3640 Span::call_site(),
3641 );
3642
3643 match state {
3644 ClusterMembersState::Stream(d) => {
3645 parse_quote! {
3646 #members_tee_ident = source_stream(#d) -> tee();
3647 #source_ident = #members_tee_ident;
3648 }
3649 },
3650 ClusterMembersState::Uninit => syn::parse_quote! {
3651 #source_ident = source_stream(DUMMY);
3652 },
3653 ClusterMembersState::Tee(..) => parse_quote! {
3654 #source_ident = #members_tee_ident;
3655 },
3656 }
3657 }
3658
3659 HydroSource::Embedded(ident) => {
3660 parse_quote! {
3661 #source_ident = source_stream(#ident);
3662 }
3663 }
3664
3665 HydroSource::EmbeddedSingleton(ident) => {
3666 parse_quote! {
3667 #source_ident = source_iter([#ident]);
3668 }
3669 }
3670 };
3671
3672 match builders_or_callback {
3673 BuildersOrCallback::Builders(graph_builders) => {
3674 let builder = graph_builders.get_dfir_mut(&out_location);
3675 builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3676 }
3677 BuildersOrCallback::Callback(_, node_callback) => {
3678 node_callback(node, next_stmt_id);
3679 }
3680 }
3681
3682 ident_stack.push(source_ident);
3683 }
3684 }
3685
3686 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3687 let stmt_id = next_stmt_id.get_and_increment();
3688 let source_ident =
3689 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3690
3691 match builders_or_callback {
3692 BuildersOrCallback::Builders(graph_builders) => {
3693 let builder = graph_builders.get_dfir_mut(&out_location);
3694
3695 if *first_tick_only {
3696 assert!(
3697 !metadata.location_id.is_top_level(),
3698 "first_tick_only SingletonSource must be inside a tick"
3699 );
3700 }
3701
3702 if *first_tick_only
3703 || (metadata.location_id.is_top_level()
3704 && metadata.collection_kind.is_bounded())
3705 {
3706 builder.add_dfir(
3707 parse_quote! {
3708 #source_ident = source_iter([#value]);
3709 },
3710 None,
3711 Some(&stmt_id.to_string()),
3712 );
3713 } else {
3714 builder.add_dfir(
3715 parse_quote! {
3716 #source_ident = source_iter([#value]) -> persist::<'static>();
3717 },
3718 None,
3719 Some(&stmt_id.to_string()),
3720 );
3721 }
3722 }
3723 BuildersOrCallback::Callback(_, node_callback) => {
3724 node_callback(node, next_stmt_id);
3725 }
3726 }
3727
3728 ident_stack.push(source_ident);
3729 }
3730
3731 HydroNode::CycleSource { cycle_id, .. } => {
3732 let ident = cycle_id.as_ident();
3733
3734 let _ = next_stmt_id.get_and_increment();
3736
3737 match builders_or_callback {
3738 BuildersOrCallback::Builders(_) => {}
3739 BuildersOrCallback::Callback(_, node_callback) => {
3740 node_callback(node, next_stmt_id);
3741 }
3742 }
3743
3744 ident_stack.push(ident);
3745 }
3746
3747 HydroNode::Tee { inner, .. } => {
3748 let stmt_id = next_stmt_id.get_and_increment();
3751
3752 let ret_ident = if let Some(built_idents) =
3753 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3754 {
3755 match builders_or_callback {
3756 BuildersOrCallback::Builders(_) => {}
3757 BuildersOrCallback::Callback(_, node_callback) => {
3758 node_callback(node, next_stmt_id);
3759 }
3760 }
3761
3762 built_idents[0].clone()
3763 } else {
3764 let inner_ident = ident_stack.pop().unwrap();
3767
3768 let tee_ident =
3769 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3770
3771 built_tees.insert(
3772 inner.0.as_ref() as *const RefCell<HydroNode>,
3773 vec![tee_ident.clone()],
3774 );
3775
3776 match builders_or_callback {
3777 BuildersOrCallback::Builders(graph_builders) => {
3778 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3790 fold_hooked_idents.insert(tee_ident.to_string());
3791 }
3792 let builder = graph_builders.get_dfir_mut(&out_location);
3793 builder.add_dfir(
3794 parse_quote! {
3795 #tee_ident = #inner_ident -> tee();
3796 },
3797 None,
3798 Some(&stmt_id.to_string()),
3799 );
3800 }
3801 BuildersOrCallback::Callback(_, node_callback) => {
3802 node_callback(node, next_stmt_id);
3803 }
3804 }
3805
3806 tee_ident
3807 };
3808
3809 ident_stack.push(ret_ident);
3810 }
3811
3812 HydroNode::Reference { inner, kind, .. } => {
3813 let stmt_id = next_stmt_id.get_and_increment();
3816
3817 let ret_ident = if let Some(built_idents) =
3818 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3819 {
3820 built_idents[0].clone()
3821 } else {
3822 let inner_ident = ident_stack.pop().unwrap();
3823
3824 let ref_ident =
3825 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3826
3827 built_tees.insert(
3828 inner.0.as_ref() as *const RefCell<HydroNode>,
3829 vec![ref_ident.clone()],
3830 );
3831
3832 match builders_or_callback {
3833 BuildersOrCallback::Builders(graph_builders) => {
3834 let builder = graph_builders.get_dfir_mut(&out_location);
3835 let op_ident = syn::Ident::new(
3836 match kind {
3837 crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3838 crate::handoff_ref::HandoffRefKind::Optional => "optional",
3839 crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3840 },
3841 Span::call_site(),
3842 );
3843 builder.add_dfir(
3844 parse_quote! {
3845 #ref_ident = #inner_ident -> #op_ident();
3846 },
3847 None,
3848 Some(&stmt_id.to_string()),
3849 );
3850 }
3851 BuildersOrCallback::Callback(_, node_callback) => {
3852 node_callback(node, next_stmt_id);
3853 }
3854 }
3855
3856 ref_ident
3857 };
3858
3859 ident_stack.push(ret_ident);
3860 }
3861
3862 HydroNode::Partition {
3863 inner, f, is_true, metadata,
3864 } => {
3865 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3867 let stmt_id = next_stmt_id.get_and_increment();
3868
3869 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3870 match builders_or_callback {
3871 BuildersOrCallback::Builders(_) => {}
3872 BuildersOrCallback::Callback(_, node_callback) => {
3873 node_callback(node, next_stmt_id);
3874 }
3875 }
3876
3877 let idx = if is_true { 0 } else { 1 };
3878 built_idents[idx].clone()
3879 } else {
3880 let inner_ident = ident_stack.pop().unwrap();
3883 let f_tokens = f.emit_tokens(&mut ident_stack);
3884
3885 let inner_ident = {
3886 let inner_borrow = inner.0.borrow();
3887 maybe_observe_for_mut(
3888 f, inner_ident,
3889 &inner_borrow.metadata().location_id,
3890 &inner_borrow.metadata().collection_kind,
3891 &metadata.op,
3892 builders_or_callback, next_stmt_id,
3893 )
3894 };
3895
3896 let partition_ident = syn::Ident::new(
3897 &format!("stream_{}_partition", stmt_id),
3898 Span::call_site(),
3899 );
3900 let true_ident = syn::Ident::new(
3901 &format!("stream_{}_true", stmt_id),
3902 Span::call_site(),
3903 );
3904 let false_ident = syn::Ident::new(
3905 &format!("stream_{}_false", stmt_id),
3906 Span::call_site(),
3907 );
3908
3909 built_tees.insert(
3910 ptr,
3911 vec![true_ident.clone(), false_ident.clone()],
3912 );
3913
3914 let stmt_id = next_stmt_id.get_and_increment();
3915 match builders_or_callback {
3916 BuildersOrCallback::Builders(graph_builders) => {
3917 let builder = graph_builders.get_dfir_mut(&out_location);
3918 builder.add_dfir(
3919 parse_quote! {
3920 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3921 #true_ident = #partition_ident[0];
3922 #false_ident = #partition_ident[1];
3923 },
3924 None,
3925 Some(&stmt_id.to_string()),
3926 );
3927 }
3928 BuildersOrCallback::Callback(_, node_callback) => {
3929 node_callback(node, next_stmt_id);
3930 }
3931 }
3932
3933 if is_true { true_ident } else { false_ident }
3934 };
3935
3936 ident_stack.push(ret_ident);
3937 }
3938
3939 HydroNode::Chain { .. } => {
3940 let second_ident = ident_stack.pop().unwrap();
3942 let first_ident = ident_stack.pop().unwrap();
3943
3944 let stmt_id = next_stmt_id.get_and_increment();
3945 let chain_ident =
3946 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3947
3948 match builders_or_callback {
3949 BuildersOrCallback::Builders(graph_builders) => {
3950 let builder = graph_builders.get_dfir_mut(&out_location);
3951 builder.add_dfir(
3952 parse_quote! {
3953 #chain_ident = chain();
3954 #first_ident -> [0]#chain_ident;
3955 #second_ident -> [1]#chain_ident;
3956 },
3957 None,
3958 Some(&stmt_id.to_string()),
3959 );
3960 }
3961 BuildersOrCallback::Callback(_, node_callback) => {
3962 node_callback(node, next_stmt_id);
3963 }
3964 }
3965
3966 ident_stack.push(chain_ident);
3967 }
3968
3969 HydroNode::MergeOrdered { first, metadata, .. } => {
3970 let second_ident = ident_stack.pop().unwrap();
3971 let first_ident = ident_stack.pop().unwrap();
3972
3973 let stmt_id = next_stmt_id.get_and_increment();
3974 let merge_ident =
3975 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3976
3977 match builders_or_callback {
3978 BuildersOrCallback::Builders(graph_builders) => {
3979 graph_builders.merge_ordered(
3980 &first.metadata().location_id,
3981 first_ident,
3982 second_ident,
3983 &merge_ident,
3984 &first.metadata().collection_kind,
3985 &metadata.op,
3986 Some(&stmt_id.to_string()),
3987 );
3988 }
3989 BuildersOrCallback::Callback(_, node_callback) => {
3990 node_callback(node, next_stmt_id);
3991 }
3992 }
3993
3994 ident_stack.push(merge_ident);
3995 }
3996
3997 HydroNode::ChainFirst { .. } => {
3998 let second_ident = ident_stack.pop().unwrap();
3999 let first_ident = ident_stack.pop().unwrap();
4000
4001 let stmt_id = next_stmt_id.get_and_increment();
4002 let chain_ident =
4003 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4004
4005 match builders_or_callback {
4006 BuildersOrCallback::Builders(graph_builders) => {
4007 let builder = graph_builders.get_dfir_mut(&out_location);
4008 builder.add_dfir(
4009 parse_quote! {
4010 #chain_ident = chain_first_n(1);
4011 #first_ident -> [0]#chain_ident;
4012 #second_ident -> [1]#chain_ident;
4013 },
4014 None,
4015 Some(&stmt_id.to_string()),
4016 );
4017 }
4018 BuildersOrCallback::Callback(_, node_callback) => {
4019 node_callback(node, next_stmt_id);
4020 }
4021 }
4022
4023 ident_stack.push(chain_ident);
4024 }
4025
4026 HydroNode::CrossSingleton { right, .. } => {
4027 let right_ident = ident_stack.pop().unwrap();
4028 let left_ident = ident_stack.pop().unwrap();
4029
4030 let stmt_id = next_stmt_id.get_and_increment();
4031 let cross_ident =
4032 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4033
4034 match builders_or_callback {
4035 BuildersOrCallback::Builders(graph_builders) => {
4036 let builder = graph_builders.get_dfir_mut(&out_location);
4037
4038 if right.metadata().location_id.is_top_level()
4039 && right.metadata().collection_kind.is_bounded()
4040 {
4041 builder.add_dfir(
4042 parse_quote! {
4043 #cross_ident = cross_singleton::<'static>();
4044 #left_ident -> [input]#cross_ident;
4045 #right_ident -> [single]#cross_ident;
4046 },
4047 None,
4048 Some(&stmt_id.to_string()),
4049 );
4050 } else {
4051 builder.add_dfir(
4052 parse_quote! {
4053 #cross_ident = cross_singleton();
4054 #left_ident -> [input]#cross_ident;
4055 #right_ident -> [single]#cross_ident;
4056 },
4057 None,
4058 Some(&stmt_id.to_string()),
4059 );
4060 }
4061 }
4062 BuildersOrCallback::Callback(_, node_callback) => {
4063 node_callback(node, next_stmt_id);
4064 }
4065 }
4066
4067 ident_stack.push(cross_ident);
4068 }
4069
4070 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
4071 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
4072 parse_quote!(cross_join_multiset)
4073 } else {
4074 parse_quote!(join_multiset)
4075 };
4076
4077 let (HydroNode::CrossProduct { left, right, .. }
4078 | HydroNode::Join { left, right, .. }) = node
4079 else {
4080 unreachable!()
4081 };
4082
4083 let is_top_level = left.metadata().location_id.is_top_level()
4084 && right.metadata().location_id.is_top_level();
4085 let left_lifetime = if left.metadata().location_id.is_top_level() {
4086 quote!('static)
4087 } else {
4088 quote!('tick)
4089 };
4090
4091 let right_lifetime = if right.metadata().location_id.is_top_level() {
4092 quote!('static)
4093 } else {
4094 quote!('tick)
4095 };
4096
4097 let right_ident = ident_stack.pop().unwrap();
4098 let left_ident = ident_stack.pop().unwrap();
4099
4100 let stmt_id = next_stmt_id.get_and_increment();
4101 let stream_ident =
4102 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4103
4104 match builders_or_callback {
4105 BuildersOrCallback::Builders(graph_builders) => {
4106 let builder = graph_builders.get_dfir_mut(&out_location);
4107 builder.add_dfir(
4108 if is_top_level {
4109 parse_quote! {
4112 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4113 #left_ident -> [0]#stream_ident;
4114 #right_ident -> [1]#stream_ident;
4115 }
4116 } else {
4117 parse_quote! {
4118 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4119 #left_ident -> [0]#stream_ident;
4120 #right_ident -> [1]#stream_ident;
4121 }
4122 }
4123 ,
4124 None,
4125 Some(&stmt_id.to_string()),
4126 );
4127 }
4128 BuildersOrCallback::Callback(_, node_callback) => {
4129 node_callback(node, next_stmt_id);
4130 }
4131 }
4132
4133 ident_stack.push(stream_ident);
4134 }
4135
4136 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4137 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4138 parse_quote!(difference)
4139 } else {
4140 parse_quote!(anti_join)
4141 };
4142
4143 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4144 node
4145 else {
4146 unreachable!()
4147 };
4148
4149 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4150 quote!('static)
4151 } else {
4152 quote!('tick)
4153 };
4154
4155 let neg_ident = ident_stack.pop().unwrap();
4156 let pos_ident = ident_stack.pop().unwrap();
4157
4158 let stmt_id = next_stmt_id.get_and_increment();
4159 let stream_ident =
4160 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4161
4162 match builders_or_callback {
4163 BuildersOrCallback::Builders(graph_builders) => {
4164 let builder = graph_builders.get_dfir_mut(&out_location);
4165 builder.add_dfir(
4166 parse_quote! {
4167 #stream_ident = #operator::<'tick, #neg_lifetime>();
4168 #pos_ident -> [pos]#stream_ident;
4169 #neg_ident -> [neg]#stream_ident;
4170 },
4171 None,
4172 Some(&stmt_id.to_string()),
4173 );
4174 }
4175 BuildersOrCallback::Callback(_, node_callback) => {
4176 node_callback(node, next_stmt_id);
4177 }
4178 }
4179
4180 ident_stack.push(stream_ident);
4181 }
4182
4183 HydroNode::JoinHalf { .. } => {
4184 let HydroNode::JoinHalf { right, .. } = node else {
4185 unreachable!()
4186 };
4187
4188 assert!(
4189 right.metadata().collection_kind.is_bounded(),
4190 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4191 right.metadata().collection_kind
4192 );
4193
4194 let build_lifetime = if right.metadata().location_id.is_top_level() {
4195 quote!('static)
4196 } else {
4197 quote!('tick)
4198 };
4199
4200 let build_ident = ident_stack.pop().unwrap();
4201 let probe_ident = ident_stack.pop().unwrap();
4202
4203 let stmt_id = next_stmt_id.get_and_increment();
4204 let stream_ident =
4205 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4206
4207 match builders_or_callback {
4208 BuildersOrCallback::Builders(graph_builders) => {
4209 let builder = graph_builders.get_dfir_mut(&out_location);
4210 builder.add_dfir(
4211 parse_quote! {
4212 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4213 #probe_ident -> [probe]#stream_ident;
4214 #build_ident -> [build]#stream_ident;
4215 },
4216 None,
4217 Some(&stmt_id.to_string()),
4218 );
4219 }
4220 BuildersOrCallback::Callback(_, node_callback) => {
4221 node_callback(node, next_stmt_id);
4222 }
4223 }
4224
4225 ident_stack.push(stream_ident);
4226 }
4227
4228 HydroNode::ResolveFutures { .. } => {
4229 let input_ident = ident_stack.pop().unwrap();
4230
4231 let stmt_id = next_stmt_id.get_and_increment();
4232 let futures_ident =
4233 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4234
4235 match builders_or_callback {
4236 BuildersOrCallback::Builders(graph_builders) => {
4237 let builder = graph_builders.get_dfir_mut(&out_location);
4238 builder.add_dfir(
4239 parse_quote! {
4240 #futures_ident = #input_ident -> resolve_futures();
4241 },
4242 None,
4243 Some(&stmt_id.to_string()),
4244 );
4245 }
4246 BuildersOrCallback::Callback(_, node_callback) => {
4247 node_callback(node, next_stmt_id);
4248 }
4249 }
4250
4251 ident_stack.push(futures_ident);
4252 }
4253
4254 HydroNode::ResolveFuturesBlocking { .. } => {
4255 let input_ident = ident_stack.pop().unwrap();
4256
4257 let stmt_id = next_stmt_id.get_and_increment();
4258 let futures_ident =
4259 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4260
4261 match builders_or_callback {
4262 BuildersOrCallback::Builders(graph_builders) => {
4263 let builder = graph_builders.get_dfir_mut(&out_location);
4264 builder.add_dfir(
4265 parse_quote! {
4266 #futures_ident = #input_ident -> resolve_futures_blocking();
4267 },
4268 None,
4269 Some(&stmt_id.to_string()),
4270 );
4271 }
4272 BuildersOrCallback::Callback(_, node_callback) => {
4273 node_callback(node, next_stmt_id);
4274 }
4275 }
4276
4277 ident_stack.push(futures_ident);
4278 }
4279
4280 HydroNode::ResolveFuturesOrdered { .. } => {
4281 let input_ident = ident_stack.pop().unwrap();
4282
4283 let stmt_id = next_stmt_id.get_and_increment();
4284 let futures_ident =
4285 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4286
4287 match builders_or_callback {
4288 BuildersOrCallback::Builders(graph_builders) => {
4289 let builder = graph_builders.get_dfir_mut(&out_location);
4290 builder.add_dfir(
4291 parse_quote! {
4292 #futures_ident = #input_ident -> resolve_futures_ordered();
4293 },
4294 None,
4295 Some(&stmt_id.to_string()),
4296 );
4297 }
4298 BuildersOrCallback::Callback(_, node_callback) => {
4299 node_callback(node, next_stmt_id);
4300 }
4301 }
4302
4303 ident_stack.push(futures_ident);
4304 }
4305
4306 HydroNode::Map {
4307 f,
4308 input,
4309 metadata,
4310 } => {
4311 let input_ident = ident_stack.pop().unwrap();
4313 let f_tokens = f.emit_tokens(&mut ident_stack);
4314
4315 let input_ident = maybe_observe_for_mut(
4316 f,
4317 input_ident,
4318 &input.metadata().location_id,
4319 &input.metadata().collection_kind,
4320 &metadata.op,
4321 builders_or_callback,
4322 next_stmt_id,
4323 );
4324
4325 let stmt_id = next_stmt_id.get_and_increment();
4326 let map_ident =
4327 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4328
4329 match builders_or_callback {
4330 BuildersOrCallback::Builders(graph_builders) => {
4331 let builder = graph_builders.get_dfir_mut(&out_location);
4332 builder.add_dfir(
4333 parse_quote! {
4334 #map_ident = #input_ident -> map(#f_tokens);
4335 },
4336 None,
4337 Some(&stmt_id.to_string()),
4338 );
4339 }
4340 BuildersOrCallback::Callback(_, node_callback) => {
4341 node_callback(node, next_stmt_id);
4342 }
4343 }
4344
4345 ident_stack.push(map_ident);
4346 }
4347
4348 HydroNode::FlatMap { f, input, metadata } => {
4349 let input_ident = ident_stack.pop().unwrap();
4350 let f_tokens = f.emit_tokens(&mut ident_stack);
4351
4352 let input_ident = maybe_observe_for_mut(
4353 f, input_ident,
4354 &input.metadata().location_id,
4355 &input.metadata().collection_kind,
4356 &metadata.op,
4357 builders_or_callback, next_stmt_id,
4358 );
4359
4360 let stmt_id = next_stmt_id.get_and_increment();
4361 let flat_map_ident =
4362 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4363
4364 match builders_or_callback {
4365 BuildersOrCallback::Builders(graph_builders) => {
4366 let builder = graph_builders.get_dfir_mut(&out_location);
4367 builder.add_dfir(
4368 parse_quote! {
4369 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4370 },
4371 None,
4372 Some(&stmt_id.to_string()),
4373 );
4374 }
4375 BuildersOrCallback::Callback(_, node_callback) => {
4376 node_callback(node, next_stmt_id);
4377 }
4378 }
4379
4380 ident_stack.push(flat_map_ident);
4381 }
4382
4383 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4384 let input_ident = ident_stack.pop().unwrap();
4385 let f_tokens = f.emit_tokens(&mut ident_stack);
4386
4387 let input_ident = maybe_observe_for_mut(
4388 f, input_ident,
4389 &input.metadata().location_id,
4390 &input.metadata().collection_kind,
4391 &metadata.op,
4392 builders_or_callback, next_stmt_id,
4393 );
4394
4395 let stmt_id = next_stmt_id.get_and_increment();
4396 let flat_map_stream_blocking_ident =
4397 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4398
4399 match builders_or_callback {
4400 BuildersOrCallback::Builders(graph_builders) => {
4401 let builder = graph_builders.get_dfir_mut(&out_location);
4402 builder.add_dfir(
4403 parse_quote! {
4404 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4405 },
4406 None,
4407 Some(&stmt_id.to_string()),
4408 );
4409 }
4410 BuildersOrCallback::Callback(_, node_callback) => {
4411 node_callback(node, next_stmt_id);
4412 }
4413 }
4414
4415 ident_stack.push(flat_map_stream_blocking_ident);
4416 }
4417
4418 HydroNode::Filter { f, input, metadata } => {
4419 let input_ident = ident_stack.pop().unwrap();
4420 let f_tokens = f.emit_tokens(&mut ident_stack);
4421
4422 let input_ident = maybe_observe_for_mut(
4423 f, input_ident,
4424 &input.metadata().location_id,
4425 &input.metadata().collection_kind,
4426 &metadata.op,
4427 builders_or_callback, next_stmt_id,
4428 );
4429
4430 let stmt_id = next_stmt_id.get_and_increment();
4431 let filter_ident =
4432 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4433
4434 match builders_or_callback {
4435 BuildersOrCallback::Builders(graph_builders) => {
4436 let builder = graph_builders.get_dfir_mut(&out_location);
4437 builder.add_dfir(
4438 parse_quote! {
4439 #filter_ident = #input_ident -> filter(#f_tokens);
4440 },
4441 None,
4442 Some(&stmt_id.to_string()),
4443 );
4444 }
4445 BuildersOrCallback::Callback(_, node_callback) => {
4446 node_callback(node, next_stmt_id);
4447 }
4448 }
4449
4450 ident_stack.push(filter_ident);
4451 }
4452
4453 HydroNode::FilterMap { f, input, metadata } => {
4454 let input_ident = ident_stack.pop().unwrap();
4455 let f_tokens = f.emit_tokens(&mut ident_stack);
4456
4457 let input_ident = maybe_observe_for_mut(
4458 f, input_ident,
4459 &input.metadata().location_id,
4460 &input.metadata().collection_kind,
4461 &metadata.op,
4462 builders_or_callback, next_stmt_id,
4463 );
4464
4465 let stmt_id = next_stmt_id.get_and_increment();
4466 let filter_map_ident =
4467 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4468
4469 match builders_or_callback {
4470 BuildersOrCallback::Builders(graph_builders) => {
4471 let builder = graph_builders.get_dfir_mut(&out_location);
4472 builder.add_dfir(
4473 parse_quote! {
4474 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4475 },
4476 None,
4477 Some(&stmt_id.to_string()),
4478 );
4479 }
4480 BuildersOrCallback::Callback(_, node_callback) => {
4481 node_callback(node, next_stmt_id);
4482 }
4483 }
4484
4485 ident_stack.push(filter_map_ident);
4486 }
4487
4488 HydroNode::Sort { .. } => {
4489 let input_ident = ident_stack.pop().unwrap();
4490
4491 let stmt_id = next_stmt_id.get_and_increment();
4492 let sort_ident =
4493 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4494
4495 match builders_or_callback {
4496 BuildersOrCallback::Builders(graph_builders) => {
4497 let builder = graph_builders.get_dfir_mut(&out_location);
4498 builder.add_dfir(
4499 parse_quote! {
4500 #sort_ident = #input_ident -> sort();
4501 },
4502 None,
4503 Some(&stmt_id.to_string()),
4504 );
4505 }
4506 BuildersOrCallback::Callback(_, node_callback) => {
4507 node_callback(node, next_stmt_id);
4508 }
4509 }
4510
4511 ident_stack.push(sort_ident);
4512 }
4513
4514 HydroNode::DeferTick { .. } => {
4515 let input_ident = ident_stack.pop().unwrap();
4516
4517 let stmt_id = next_stmt_id.get_and_increment();
4518 let defer_tick_ident =
4519 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4520
4521 match builders_or_callback {
4522 BuildersOrCallback::Builders(graph_builders) => {
4523 let builder = graph_builders.get_dfir_mut(&out_location);
4524 builder.add_dfir(
4525 parse_quote! {
4526 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4527 },
4528 None,
4529 Some(&stmt_id.to_string()),
4530 );
4531 }
4532 BuildersOrCallback::Callback(_, node_callback) => {
4533 node_callback(node, next_stmt_id);
4534 }
4535 }
4536
4537 ident_stack.push(defer_tick_ident);
4538 }
4539
4540 HydroNode::Enumerate { input, .. } => {
4541 let input_ident = ident_stack.pop().unwrap();
4542
4543 let stmt_id = next_stmt_id.get_and_increment();
4544 let enumerate_ident =
4545 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4546
4547 match builders_or_callback {
4548 BuildersOrCallback::Builders(graph_builders) => {
4549 let builder = graph_builders.get_dfir_mut(&out_location);
4550 let lifetime = if input.metadata().location_id.is_top_level() {
4551 quote!('static)
4552 } else {
4553 quote!('tick)
4554 };
4555 builder.add_dfir(
4556 parse_quote! {
4557 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4558 },
4559 None,
4560 Some(&stmt_id.to_string()),
4561 );
4562 }
4563 BuildersOrCallback::Callback(_, node_callback) => {
4564 node_callback(node, next_stmt_id);
4565 }
4566 }
4567
4568 ident_stack.push(enumerate_ident);
4569 }
4570
4571 HydroNode::Inspect { f, input, metadata } => {
4572 let input_ident = ident_stack.pop().unwrap();
4573 let f_tokens = f.emit_tokens(&mut ident_stack);
4574
4575 let input_ident = maybe_observe_for_mut(
4576 f, input_ident,
4577 &input.metadata().location_id,
4578 &input.metadata().collection_kind,
4579 &metadata.op,
4580 builders_or_callback, next_stmt_id,
4581 );
4582
4583 let stmt_id = next_stmt_id.get_and_increment();
4584 let inspect_ident =
4585 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4586
4587 match builders_or_callback {
4588 BuildersOrCallback::Builders(graph_builders) => {
4589 let builder = graph_builders.get_dfir_mut(&out_location);
4590 builder.add_dfir(
4591 parse_quote! {
4592 #inspect_ident = #input_ident -> inspect(#f_tokens);
4593 },
4594 None,
4595 Some(&stmt_id.to_string()),
4596 );
4597 }
4598 BuildersOrCallback::Callback(_, node_callback) => {
4599 node_callback(node, next_stmt_id);
4600 }
4601 }
4602
4603 ident_stack.push(inspect_ident);
4604 }
4605
4606 HydroNode::Unique { input, .. } => {
4607 let input_ident = ident_stack.pop().unwrap();
4608
4609 let stmt_id = next_stmt_id.get_and_increment();
4610 let unique_ident =
4611 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4612
4613 match builders_or_callback {
4614 BuildersOrCallback::Builders(graph_builders) => {
4615 let builder = graph_builders.get_dfir_mut(&out_location);
4616 let lifetime = if input.metadata().location_id.is_top_level() {
4617 quote!('static)
4618 } else {
4619 quote!('tick)
4620 };
4621
4622 builder.add_dfir(
4623 parse_quote! {
4624 #unique_ident = #input_ident -> unique::<#lifetime>();
4625 },
4626 None,
4627 Some(&stmt_id.to_string()),
4628 );
4629 }
4630 BuildersOrCallback::Callback(_, node_callback) => {
4631 node_callback(node, next_stmt_id);
4632 }
4633 }
4634
4635 ident_stack.push(unique_ident);
4636 }
4637
4638 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4639 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4640 if input.metadata().location_id.is_top_level()
4641 && input.metadata().collection_kind.is_bounded()
4642 {
4643 parse_quote!(fold_no_replay)
4644 } else {
4645 parse_quote!(fold)
4646 }
4647 } else if matches!(node, HydroNode::Scan { .. }) {
4648 parse_quote!(scan)
4649 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4650 parse_quote!(scan_async_blocking)
4651 } else if let HydroNode::FoldKeyed { input, .. } = node {
4652 if input.metadata().location_id.is_top_level()
4653 && input.metadata().collection_kind.is_bounded()
4654 {
4655 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4656 } else {
4657 parse_quote!(fold_keyed)
4658 }
4659 } else {
4660 unreachable!()
4661 };
4662
4663 let (HydroNode::Fold { input, .. }
4664 | HydroNode::FoldKeyed { input, .. }
4665 | HydroNode::Scan { input, .. }
4666 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4667 else {
4668 unreachable!()
4669 };
4670
4671 let lifetime = if input.metadata().location_id.is_top_level() {
4672 quote!('static)
4673 } else {
4674 quote!('tick)
4675 };
4676
4677 let input_ident = ident_stack.pop().unwrap();
4678
4679 let (HydroNode::Fold { init, acc, .. }
4680 | HydroNode::FoldKeyed { init, acc, .. }
4681 | HydroNode::Scan { init, acc, .. }
4682 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4683 else {
4684 unreachable!()
4685 };
4686
4687 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4688 let init_tokens = init.emit_tokens(&mut ident_stack);
4689
4690 let stmt_id = next_stmt_id.get_and_increment();
4691 let fold_ident =
4692 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4693
4694 match builders_or_callback {
4695 BuildersOrCallback::Builders(graph_builders) => {
4696 if matches!(node, HydroNode::Fold { .. })
4697 && node.metadata().location_id.is_top_level()
4698 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4699 && graph_builders.singleton_intermediates()
4700 && !node.metadata().collection_kind.is_bounded()
4701 {
4702 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4703 let hooked_input_ident = graph_builders.emit_fold_hook(
4704 &input.metadata().location_id,
4705 &input_ident,
4706 &input.metadata().collection_kind,
4707 &node.metadata().op,
4708 );
4709
4710 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4711 let acc: syn::Expr = parse_quote!({
4712 let mut __inner = #acc_tokens;
4713 move |__state, __batch: Vec<_>| {
4714 if __batch.is_empty() {
4715 return None;
4716 }
4717 for __value in __batch {
4718 __inner(__state, __value);
4719 }
4720 Some(__state.clone())
4721 }
4722 });
4723 (hooked, acc)
4724 } else {
4725 let acc: syn::Expr = parse_quote!({
4726 let mut __inner = #acc_tokens;
4727 move |__state, __value| {
4728 __inner(__state, __value);
4729 Some(__state.clone())
4730 }
4731 });
4732 (&input_ident, acc)
4733 };
4734
4735 let builder = graph_builders.get_dfir_mut(&out_location);
4736 builder.add_dfir(
4737 parse_quote! {
4738 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4739 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4740 #fold_ident = chain();
4741 },
4742 None,
4743 Some(&stmt_id.to_string()),
4744 );
4745
4746 if hooked_input_ident.is_some() {
4747 fold_hooked_idents.insert(fold_ident.to_string());
4748 }
4749 } else if matches!(node, HydroNode::FoldKeyed { .. })
4750 && node.metadata().location_id.is_top_level()
4751 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4752 && graph_builders.singleton_intermediates()
4753 && !node.metadata().collection_kind.is_bounded()
4754 {
4755 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4756 let hooked_input_ident = graph_builders.emit_fold_hook(
4757 &input.metadata().location_id,
4758 &input_ident,
4759 &input.metadata().collection_kind,
4760 &node.metadata().op,
4761 );
4762 let builder = graph_builders.get_dfir_mut(&out_location);
4763
4764 let wrapped_acc: syn::Expr = parse_quote!({
4765 let mut __init = #init_tokens;
4766 let mut __inner = #acc_tokens;
4767 move |__state, __kv: (_, _)| {
4768 let __state = __state
4770 .entry(::std::clone::Clone::clone(&__kv.0))
4771 .or_insert_with(|| (__init)());
4772 __inner(__state, __kv.1);
4773 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4774 }
4775 });
4776
4777 if let Some(hooked_input_ident) = hooked_input_ident {
4778 builder.add_dfir(
4779 parse_quote! {
4780 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4781 },
4782 None,
4783 Some(&stmt_id.to_string()),
4784 );
4785
4786 fold_hooked_idents.insert(fold_ident.to_string());
4787 } else {
4788 builder.add_dfir(
4789 parse_quote! {
4790 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4791 },
4792 None,
4793 Some(&stmt_id.to_string()),
4794 );
4795 }
4796 } else if (matches!(node, HydroNode::Fold { .. })
4797 || matches!(node, HydroNode::FoldKeyed { .. }))
4798 && !node.metadata().location_id.is_top_level()
4799 && graph_builders.singleton_intermediates()
4800 {
4801 let input_ref = match &*node {
4802 HydroNode::Fold { input, .. } => input,
4803 HydroNode::FoldKeyed { input, .. } => input,
4804 _ => unreachable!(),
4805 };
4806 let hooked_input_ident = graph_builders.emit_fold_hook(
4807 &input_ref.metadata().location_id,
4808 &input_ident,
4809 &input_ref.metadata().collection_kind,
4810 &node.metadata().op,
4811 );
4812
4813 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4814 let builder = graph_builders.get_dfir_mut(&out_location);
4815 builder.add_dfir(
4816 parse_quote! {
4817 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4818 },
4819 None,
4820 Some(&stmt_id.to_string()),
4821 );
4822 } else {
4823 let builder = graph_builders.get_dfir_mut(&out_location);
4824 builder.add_dfir(
4825 parse_quote! {
4826 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4827 },
4828 None,
4829 Some(&stmt_id.to_string()),
4830 );
4831 }
4832 }
4833 BuildersOrCallback::Callback(_, node_callback) => {
4834 node_callback(node, next_stmt_id);
4835 }
4836 }
4837
4838 ident_stack.push(fold_ident);
4839 }
4840
4841 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4842 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4843 if input.metadata().location_id.is_top_level()
4844 && input.metadata().collection_kind.is_bounded()
4845 {
4846 parse_quote!(reduce_no_replay)
4847 } else {
4848 parse_quote!(reduce)
4849 }
4850 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4851 if input.metadata().location_id.is_top_level()
4852 && input.metadata().collection_kind.is_bounded()
4853 {
4854 todo!(
4855 "Calling keyed reduce on a top-level bounded collection is not supported"
4856 )
4857 } else {
4858 parse_quote!(reduce_keyed)
4859 }
4860 } else {
4861 unreachable!()
4862 };
4863
4864 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4865 else {
4866 unreachable!()
4867 };
4868
4869 let lifetime = if input.metadata().location_id.is_top_level() {
4870 quote!('static)
4871 } else {
4872 quote!('tick)
4873 };
4874
4875 let input_ident = ident_stack.pop().unwrap();
4876
4877 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4878 else {
4879 unreachable!()
4880 };
4881
4882 let f_tokens = f.emit_tokens(&mut ident_stack);
4883
4884 let stmt_id = next_stmt_id.get_and_increment();
4885 let reduce_ident =
4886 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4887
4888 match builders_or_callback {
4889 BuildersOrCallback::Builders(graph_builders) => {
4890 if matches!(node, HydroNode::Reduce { .. })
4891 && node.metadata().location_id.is_top_level()
4892 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4893 && graph_builders.singleton_intermediates()
4894 && !node.metadata().collection_kind.is_bounded()
4895 {
4896 todo!(
4897 "Reduce with optional intermediates is not yet supported in simulator"
4898 );
4899 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4900 && node.metadata().location_id.is_top_level()
4901 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4902 && graph_builders.singleton_intermediates()
4903 && !node.metadata().collection_kind.is_bounded()
4904 {
4905 todo!(
4906 "Reduce keyed with optional intermediates is not yet supported in simulator"
4907 );
4908 } else {
4909 let builder = graph_builders.get_dfir_mut(&out_location);
4910 builder.add_dfir(
4911 parse_quote! {
4912 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4913 },
4914 None,
4915 Some(&stmt_id.to_string()),
4916 );
4917 }
4918 }
4919 BuildersOrCallback::Callback(_, node_callback) => {
4920 node_callback(node, next_stmt_id);
4921 }
4922 }
4923
4924 ident_stack.push(reduce_ident);
4925 }
4926
4927 HydroNode::ReduceKeyedWatermark {
4928 f,
4929 input,
4930 metadata,
4931 ..
4932 } => {
4933 let lifetime = if input.metadata().location_id.is_top_level() {
4934 quote!('static)
4935 } else {
4936 quote!('tick)
4937 };
4938
4939 let watermark_ident = ident_stack.pop().unwrap();
4941 let input_ident = ident_stack.pop().unwrap();
4942 let f_tokens = f.emit_tokens(&mut ident_stack);
4943
4944 let stmt_id = next_stmt_id.get_and_increment();
4945 let chain_ident = syn::Ident::new(
4946 &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4947 Span::call_site(),
4948 );
4949
4950 let fold_ident =
4951 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4952
4953 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4954 && input.metadata().collection_kind.is_bounded()
4955 {
4956 parse_quote!(fold_no_replay)
4957 } else {
4958 parse_quote!(fold)
4959 };
4960
4961 match builders_or_callback {
4962 BuildersOrCallback::Builders(graph_builders) => {
4963 if metadata.location_id.is_top_level()
4964 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4965 && graph_builders.singleton_intermediates()
4966 && !metadata.collection_kind.is_bounded()
4967 {
4968 todo!(
4969 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4970 )
4971 } else {
4972 let builder = graph_builders.get_dfir_mut(&out_location);
4973 builder.add_dfir(
4974 parse_quote! {
4975 #chain_ident = chain();
4976 #input_ident
4977 -> map(|x| (Some(x), None))
4978 -> [0]#chain_ident;
4979 #watermark_ident
4980 -> map(|watermark| (None, Some(watermark)))
4981 -> [1]#chain_ident;
4982
4983 #fold_ident = #chain_ident
4984 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4985 let __reduce_keyed_fn = #f_tokens;
4986 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4987 if let Some((k, v)) = opt_payload {
4988 if let Some(curr_watermark) = *opt_curr_watermark {
4989 if k < curr_watermark {
4990 return;
4991 }
4992 }
4993 match map.entry(k) {
4994 ::std::collections::hash_map::Entry::Vacant(e) => {
4995 e.insert(v);
4996 }
4997 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4998 __reduce_keyed_fn(e.get_mut(), v);
4999 }
5000 }
5001 } else {
5002 let watermark = opt_watermark.unwrap();
5003 if let Some(curr_watermark) = *opt_curr_watermark {
5004 if watermark <= curr_watermark {
5005 return;
5006 }
5007 }
5008 map.retain(|k, _| *k >= watermark);
5009 *opt_curr_watermark = Some(watermark);
5010 }
5011 }
5012 })
5013 -> flat_map(|(map, _curr_watermark)| map);
5014 },
5015 None,
5016 Some(&stmt_id.to_string()),
5017 );
5018 }
5019 }
5020 BuildersOrCallback::Callback(_, node_callback) => {
5021 node_callback(node, next_stmt_id);
5022 }
5023 }
5024
5025 ident_stack.push(fold_ident);
5026 }
5027
5028 HydroNode::Network {
5029 networking_info,
5030 serialize_fn: serialize_pipeline,
5031 instantiate_fn,
5032 deserialize_fn: deserialize_pipeline,
5033 input,
5034 ..
5035 } => {
5036 let input_ident = ident_stack.pop().unwrap();
5037
5038 let stmt_id = next_stmt_id.get_and_increment();
5039 let receiver_stream_ident =
5040 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5041
5042 match builders_or_callback {
5043 BuildersOrCallback::Builders(graph_builders) => {
5044 let (sink_expr, source_expr) = match instantiate_fn {
5045 DebugInstantiate::Building => (
5046 syn::parse_quote!(DUMMY_SINK),
5047 syn::parse_quote!(DUMMY_SOURCE),
5048 ),
5049
5050 DebugInstantiate::Finalized(finalized) => {
5051 (finalized.sink.clone(), finalized.source.clone())
5052 }
5053 };
5054
5055 graph_builders.create_network(
5056 &input.metadata().location_id,
5057 &out_location,
5058 input_ident,
5059 &receiver_stream_ident,
5060 serialize_pipeline.as_ref(),
5061 sink_expr,
5062 source_expr,
5063 deserialize_pipeline.as_ref(),
5064 stmt_id,
5065 networking_info,
5066 );
5067 }
5068 BuildersOrCallback::Callback(_, node_callback) => {
5069 node_callback(node, next_stmt_id);
5070 }
5071 }
5072
5073 ident_stack.push(receiver_stream_ident);
5074 }
5075
5076 HydroNode::ExternalInput {
5077 instantiate_fn,
5078 deserialize_fn: deserialize_pipeline,
5079 ..
5080 } => {
5081 let stmt_id = next_stmt_id.get_and_increment();
5082 let receiver_stream_ident =
5083 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5084
5085 match builders_or_callback {
5086 BuildersOrCallback::Builders(graph_builders) => {
5087 let (_, source_expr) = match instantiate_fn {
5088 DebugInstantiate::Building => (
5089 syn::parse_quote!(DUMMY_SINK),
5090 syn::parse_quote!(DUMMY_SOURCE),
5091 ),
5092
5093 DebugInstantiate::Finalized(finalized) => {
5094 (finalized.sink.clone(), finalized.source.clone())
5095 }
5096 };
5097
5098 graph_builders.create_external_source(
5099 &out_location,
5100 source_expr,
5101 &receiver_stream_ident,
5102 deserialize_pipeline.as_ref(),
5103 stmt_id,
5104 );
5105 }
5106 BuildersOrCallback::Callback(_, node_callback) => {
5107 node_callback(node, next_stmt_id);
5108 }
5109 }
5110
5111 ident_stack.push(receiver_stream_ident);
5112 }
5113
5114 HydroNode::Counter {
5115 tag,
5116 duration,
5117 prefix,
5118 ..
5119 } => {
5120 let input_ident = ident_stack.pop().unwrap();
5121
5122 let stmt_id = next_stmt_id.get_and_increment();
5123 let counter_ident =
5124 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5125
5126 match builders_or_callback {
5127 BuildersOrCallback::Builders(graph_builders) => {
5128 let arg = format!("{}({})", prefix, tag);
5129 let builder = graph_builders.get_dfir_mut(&out_location);
5130 builder.add_dfir(
5131 parse_quote! {
5132 #counter_ident = #input_ident -> _counter(#arg, #duration);
5133 },
5134 None,
5135 Some(&stmt_id.to_string()),
5136 );
5137 }
5138 BuildersOrCallback::Callback(_, node_callback) => {
5139 node_callback(node, next_stmt_id);
5140 }
5141 }
5142
5143 ident_stack.push(counter_ident);
5144 }
5145
5146 HydroNode::VersionedNetworkFork {
5147 channel_id,
5148 senders,
5149 metadata,
5150 ..
5151 } => {
5152 let mut sender_idents: Vec<syn::Ident> =
5153 (0..senders.len()).map(|_| ident_stack.pop().unwrap()).collect();
5154 sender_idents.reverse();
5155
5156 let stmt_id = next_stmt_id.get_and_increment();
5157
5158 match builders_or_callback {
5159 BuildersOrCallback::Builders(graph_builders) => {
5160 let sender_args: Vec<(LocationId, syn::Ident, Option<DebugExpr>)> =
5161 senders
5162 .iter()
5163 .zip(sender_idents)
5164 .map(|((_version, sender, serialize), ident)| {
5165 (
5166 sender.metadata().location_id.clone(),
5167 ident,
5168 serialize.clone(),
5169 )
5170 })
5171 .collect();
5172 graph_builders.create_versioned_network_fork(
5173 *channel_id,
5174 &metadata.location_id,
5175 sender_args,
5176 stmt_id,
5177 );
5178 }
5179 BuildersOrCallback::Callback(_, node_callback) => {
5180 node_callback(node, next_stmt_id);
5181 }
5182 }
5183 }
5184
5185 HydroNode::VersionedNetwork {
5186 fork,
5187 deserialize_fn,
5188 metadata,
5189 ..
5190 } => {
5191 let stmt_id = next_stmt_id.get_and_increment();
5192 let receiver_stream_ident =
5193 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5194
5195 let (channel_id, source_loc) = {
5198 let fork_ref = fork.0.borrow();
5199 let HydroNode::VersionedNetworkFork {
5200 channel_id,
5201 senders,
5202 ..
5203 } = &*fork_ref
5204 else {
5205 unreachable!("VersionedNetwork.fork must be a VersionedNetworkFork");
5206 };
5207 let source_loc = senders
5208 .first()
5209 .map(|(_v, sender, _s)| sender.metadata().location_id.clone())
5210 .expect("a VersionedNetworkFork always has at least one sender");
5211 (*channel_id, source_loc)
5212 };
5213
5214 match builders_or_callback {
5215 BuildersOrCallback::Builders(graph_builders) => {
5216 graph_builders.create_versioned_network(
5217 channel_id,
5218 &source_loc,
5219 &metadata.location_id,
5220 &receiver_stream_ident,
5221 deserialize_fn.as_ref(),
5222 stmt_id,
5223 );
5224 }
5225 BuildersOrCallback::Callback(_, node_callback) => {
5226 node_callback(node, next_stmt_id);
5227 }
5228 }
5229
5230 ident_stack.push(receiver_stream_ident);
5231 }
5232 }
5233 },
5234 seen_tees,
5235 false,
5236 );
5237
5238 let ret = ident_stack
5239 .pop()
5240 .expect("ident_stack should have exactly one element after traversal");
5241 assert!(
5242 ident_stack.is_empty(),
5243 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5244 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5245 ident_stack.len()
5246 );
5247 ret
5248 }
5249
5250 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5251 match self {
5252 HydroNode::Placeholder => {
5253 panic!()
5254 }
5255 HydroNode::Cast { .. }
5256 | HydroNode::ObserveNonDet { .. }
5257 | HydroNode::UnboundSingleton { .. }
5258 | HydroNode::AssertIsConsistent { .. } => {}
5259 HydroNode::Source { source, .. } => match source {
5260 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5261 HydroSource::ExternalNetwork()
5262 | HydroSource::Spin()
5263 | HydroSource::ClusterMembers(_, _)
5264 | HydroSource::Embedded(_)
5265 | HydroSource::EmbeddedSingleton(_) => {} },
5267 HydroNode::SingletonSource { value, .. } => {
5268 transform(value);
5269 }
5270 HydroNode::CycleSource { .. }
5271 | HydroNode::Tee { .. }
5272 | HydroNode::Reference { .. }
5273 | HydroNode::YieldConcat { .. }
5274 | HydroNode::BeginAtomic { .. }
5275 | HydroNode::EndAtomic { .. }
5276 | HydroNode::Batch { .. }
5277 | HydroNode::Chain { .. }
5278 | HydroNode::MergeOrdered { .. }
5279 | HydroNode::ChainFirst { .. }
5280 | HydroNode::CrossProduct { .. }
5281 | HydroNode::CrossSingleton { .. }
5282 | HydroNode::ResolveFutures { .. }
5283 | HydroNode::ResolveFuturesBlocking { .. }
5284 | HydroNode::ResolveFuturesOrdered { .. }
5285 | HydroNode::Join { .. }
5286 | HydroNode::JoinHalf { .. }
5287 | HydroNode::Difference { .. }
5288 | HydroNode::AntiJoin { .. }
5289 | HydroNode::DeferTick { .. }
5290 | HydroNode::Enumerate { .. }
5291 | HydroNode::Unique { .. }
5292 | HydroNode::Sort { .. }
5293 | HydroNode::VersionedNetworkFork { .. }
5294 | HydroNode::VersionedNetwork { .. } => {}
5295 HydroNode::Map { f, .. }
5296 | HydroNode::FlatMap { f, .. }
5297 | HydroNode::FlatMapStreamBlocking { f, .. }
5298 | HydroNode::Filter { f, .. }
5299 | HydroNode::FilterMap { f, .. }
5300 | HydroNode::Inspect { f, .. }
5301 | HydroNode::Partition { f, .. }
5302 | HydroNode::Reduce { f, .. }
5303 | HydroNode::ReduceKeyed { f, .. }
5304 | HydroNode::ReduceKeyedWatermark { f, .. } => {
5305 transform(&mut f.expr);
5306 }
5307 HydroNode::Fold { init, acc, .. }
5308 | HydroNode::Scan { init, acc, .. }
5309 | HydroNode::ScanAsyncBlocking { init, acc, .. }
5310 | HydroNode::FoldKeyed { init, acc, .. } => {
5311 transform(&mut init.expr);
5312 transform(&mut acc.expr);
5313 }
5314 HydroNode::Network {
5315 serialize_fn,
5316 deserialize_fn,
5317 ..
5318 } => {
5319 if let Some(serialize_fn) = serialize_fn {
5320 transform(serialize_fn);
5321 }
5322 if let Some(deserialize_fn) = deserialize_fn {
5323 transform(deserialize_fn);
5324 }
5325 }
5326 HydroNode::ExternalInput { deserialize_fn, .. } => {
5327 if let Some(deserialize_fn) = deserialize_fn {
5328 transform(deserialize_fn);
5329 }
5330 }
5331 HydroNode::Counter { duration, .. } => {
5332 transform(duration);
5333 }
5334 }
5335 }
5336
5337 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5338 &self.metadata().op
5339 }
5340
5341 pub fn metadata(&self) -> &HydroIrMetadata {
5342 match self {
5343 HydroNode::Placeholder => {
5344 panic!()
5345 }
5346 HydroNode::VersionedNetworkFork { metadata, .. }
5347 | HydroNode::VersionedNetwork { metadata, .. } => metadata,
5348 HydroNode::Cast { metadata, .. }
5349 | HydroNode::ObserveNonDet { metadata, .. }
5350 | HydroNode::AssertIsConsistent { metadata, .. }
5351 | HydroNode::UnboundSingleton { metadata, .. }
5352 | HydroNode::Source { metadata, .. }
5353 | HydroNode::SingletonSource { metadata, .. }
5354 | HydroNode::CycleSource { metadata, .. }
5355 | HydroNode::Tee { metadata, .. }
5356 | HydroNode::Reference { metadata, .. }
5357 | HydroNode::Partition { metadata, .. }
5358 | HydroNode::YieldConcat { metadata, .. }
5359 | HydroNode::BeginAtomic { metadata, .. }
5360 | HydroNode::EndAtomic { metadata, .. }
5361 | HydroNode::Batch { metadata, .. }
5362 | HydroNode::Chain { metadata, .. }
5363 | HydroNode::MergeOrdered { metadata, .. }
5364 | HydroNode::ChainFirst { metadata, .. }
5365 | HydroNode::CrossProduct { metadata, .. }
5366 | HydroNode::CrossSingleton { metadata, .. }
5367 | HydroNode::Join { metadata, .. }
5368 | HydroNode::JoinHalf { metadata, .. }
5369 | HydroNode::Difference { metadata, .. }
5370 | HydroNode::AntiJoin { metadata, .. }
5371 | HydroNode::ResolveFutures { metadata, .. }
5372 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5373 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5374 | HydroNode::Map { metadata, .. }
5375 | HydroNode::FlatMap { metadata, .. }
5376 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5377 | HydroNode::Filter { metadata, .. }
5378 | HydroNode::FilterMap { metadata, .. }
5379 | HydroNode::DeferTick { metadata, .. }
5380 | HydroNode::Enumerate { metadata, .. }
5381 | HydroNode::Inspect { metadata, .. }
5382 | HydroNode::Unique { metadata, .. }
5383 | HydroNode::Sort { metadata, .. }
5384 | HydroNode::Scan { metadata, .. }
5385 | HydroNode::ScanAsyncBlocking { metadata, .. }
5386 | HydroNode::Fold { metadata, .. }
5387 | HydroNode::FoldKeyed { metadata, .. }
5388 | HydroNode::Reduce { metadata, .. }
5389 | HydroNode::ReduceKeyed { metadata, .. }
5390 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5391 | HydroNode::ExternalInput { metadata, .. }
5392 | HydroNode::Network { metadata, .. }
5393 | HydroNode::Counter { metadata, .. } => metadata,
5394 }
5395 }
5396
5397 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5398 &mut self.metadata_mut().op
5399 }
5400
5401 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5402 match self {
5403 HydroNode::Placeholder => {
5404 panic!()
5405 }
5406 HydroNode::VersionedNetworkFork { metadata, .. }
5407 | HydroNode::VersionedNetwork { metadata, .. } => metadata,
5408 HydroNode::Cast { metadata, .. }
5409 | HydroNode::ObserveNonDet { metadata, .. }
5410 | HydroNode::AssertIsConsistent { metadata, .. }
5411 | HydroNode::UnboundSingleton { metadata, .. }
5412 | HydroNode::Source { metadata, .. }
5413 | HydroNode::SingletonSource { metadata, .. }
5414 | HydroNode::CycleSource { metadata, .. }
5415 | HydroNode::Tee { metadata, .. }
5416 | HydroNode::Reference { metadata, .. }
5417 | HydroNode::Partition { metadata, .. }
5418 | HydroNode::YieldConcat { metadata, .. }
5419 | HydroNode::BeginAtomic { metadata, .. }
5420 | HydroNode::EndAtomic { metadata, .. }
5421 | HydroNode::Batch { metadata, .. }
5422 | HydroNode::Chain { metadata, .. }
5423 | HydroNode::MergeOrdered { metadata, .. }
5424 | HydroNode::ChainFirst { metadata, .. }
5425 | HydroNode::CrossProduct { metadata, .. }
5426 | HydroNode::CrossSingleton { metadata, .. }
5427 | HydroNode::Join { metadata, .. }
5428 | HydroNode::JoinHalf { metadata, .. }
5429 | HydroNode::Difference { metadata, .. }
5430 | HydroNode::AntiJoin { metadata, .. }
5431 | HydroNode::ResolveFutures { metadata, .. }
5432 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5433 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5434 | HydroNode::Map { metadata, .. }
5435 | HydroNode::FlatMap { metadata, .. }
5436 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5437 | HydroNode::Filter { metadata, .. }
5438 | HydroNode::FilterMap { metadata, .. }
5439 | HydroNode::DeferTick { metadata, .. }
5440 | HydroNode::Enumerate { metadata, .. }
5441 | HydroNode::Inspect { metadata, .. }
5442 | HydroNode::Unique { metadata, .. }
5443 | HydroNode::Sort { metadata, .. }
5444 | HydroNode::Scan { metadata, .. }
5445 | HydroNode::ScanAsyncBlocking { metadata, .. }
5446 | HydroNode::Fold { metadata, .. }
5447 | HydroNode::FoldKeyed { metadata, .. }
5448 | HydroNode::Reduce { metadata, .. }
5449 | HydroNode::ReduceKeyed { metadata, .. }
5450 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5451 | HydroNode::ExternalInput { metadata, .. }
5452 | HydroNode::Network { metadata, .. }
5453 | HydroNode::Counter { metadata, .. } => metadata,
5454 }
5455 }
5456
5457 pub fn input(&self) -> Vec<&HydroNode> {
5458 match self {
5459 HydroNode::Placeholder => {
5460 panic!()
5461 }
5462 HydroNode::Source { .. }
5463 | HydroNode::SingletonSource { .. }
5464 | HydroNode::ExternalInput { .. }
5465 | HydroNode::CycleSource { .. }
5466 | HydroNode::Tee { .. }
5467 | HydroNode::Reference { .. }
5468 | HydroNode::Partition { .. }
5469 | HydroNode::VersionedNetwork { .. } => {
5470 vec![]
5472 }
5473 HydroNode::Cast { inner, .. }
5474 | HydroNode::ObserveNonDet { inner, .. }
5475 | HydroNode::YieldConcat { inner, .. }
5476 | HydroNode::BeginAtomic { inner, .. }
5477 | HydroNode::EndAtomic { inner, .. }
5478 | HydroNode::Batch { inner, .. }
5479 | HydroNode::UnboundSingleton { inner, .. }
5480 | HydroNode::AssertIsConsistent { inner, .. } => {
5481 vec![inner]
5482 }
5483 HydroNode::Chain { first, second, .. } => {
5484 vec![first, second]
5485 }
5486 HydroNode::MergeOrdered { first, second, .. } => {
5487 vec![first, second]
5488 }
5489 HydroNode::ChainFirst { first, second, .. } => {
5490 vec![first, second]
5491 }
5492 HydroNode::CrossProduct { left, right, .. }
5493 | HydroNode::CrossSingleton { left, right, .. }
5494 | HydroNode::Join { left, right, .. }
5495 | HydroNode::JoinHalf { left, right, .. } => {
5496 vec![left, right]
5497 }
5498 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5499 vec![pos, neg]
5500 }
5501 HydroNode::Map { input, .. }
5502 | HydroNode::FlatMap { input, .. }
5503 | HydroNode::FlatMapStreamBlocking { input, .. }
5504 | HydroNode::Filter { input, .. }
5505 | HydroNode::FilterMap { input, .. }
5506 | HydroNode::Sort { input, .. }
5507 | HydroNode::DeferTick { input, .. }
5508 | HydroNode::Enumerate { input, .. }
5509 | HydroNode::Inspect { input, .. }
5510 | HydroNode::Unique { input, .. }
5511 | HydroNode::Network { input, .. }
5512 | HydroNode::Counter { input, .. }
5513 | HydroNode::ResolveFutures { input, .. }
5514 | HydroNode::ResolveFuturesBlocking { input, .. }
5515 | HydroNode::ResolveFuturesOrdered { input, .. }
5516 | HydroNode::Fold { input, .. }
5517 | HydroNode::FoldKeyed { input, .. }
5518 | HydroNode::Reduce { input, .. }
5519 | HydroNode::ReduceKeyed { input, .. }
5520 | HydroNode::Scan { input, .. }
5521 | HydroNode::ScanAsyncBlocking { input, .. } => {
5522 vec![input]
5523 }
5524 HydroNode::ReduceKeyedWatermark {
5525 input, watermark, ..
5526 } => {
5527 vec![input, watermark]
5528 }
5529 HydroNode::VersionedNetworkFork { senders, .. } => senders
5530 .iter()
5531 .map(|(_version, sender, _serialize)| sender.as_ref())
5532 .collect(),
5533 }
5534 }
5535
5536 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5537 self.input()
5538 .iter()
5539 .map(|input_node| input_node.metadata())
5540 .collect()
5541 }
5542
5543 pub fn is_shared_with_others(&self) -> bool {
5547 match self {
5548 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5549 Rc::strong_count(&inner.0) > 1
5550 }
5551 HydroNode::Reference { .. } => false,
5554 _ => false,
5555 }
5556 }
5557
5558 pub fn print_root(&self) -> String {
5559 match self {
5560 HydroNode::Placeholder => {
5561 panic!()
5562 }
5563 HydroNode::Cast { .. } => "Cast()".to_owned(),
5564 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5565 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5566 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5567 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5568 HydroNode::SingletonSource {
5569 value,
5570 first_tick_only,
5571 ..
5572 } => format!(
5573 "SingletonSource({:?}, first_tick_only={})",
5574 value, first_tick_only
5575 ),
5576 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5577 HydroNode::Tee { inner, .. } => {
5578 format!("Tee({})", inner.0.borrow().print_root())
5579 }
5580 HydroNode::Reference { inner, kind, .. } => {
5581 format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5582 }
5583 HydroNode::Partition { f, is_true, .. } => {
5584 format!("Partition({:?}, is_true={})", f, is_true)
5585 }
5586 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5587 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5588 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5589 HydroNode::Batch { .. } => "Batch()".to_owned(),
5590 HydroNode::Chain { first, second, .. } => {
5591 format!("Chain({}, {})", first.print_root(), second.print_root())
5592 }
5593 HydroNode::MergeOrdered { first, second, .. } => {
5594 format!(
5595 "MergeOrdered({}, {})",
5596 first.print_root(),
5597 second.print_root()
5598 )
5599 }
5600 HydroNode::ChainFirst { first, second, .. } => {
5601 format!(
5602 "ChainFirst({}, {})",
5603 first.print_root(),
5604 second.print_root()
5605 )
5606 }
5607 HydroNode::CrossProduct { left, right, .. } => {
5608 format!(
5609 "CrossProduct({}, {})",
5610 left.print_root(),
5611 right.print_root()
5612 )
5613 }
5614 HydroNode::CrossSingleton { left, right, .. } => {
5615 format!(
5616 "CrossSingleton({}, {})",
5617 left.print_root(),
5618 right.print_root()
5619 )
5620 }
5621 HydroNode::Join { left, right, .. } => {
5622 format!("Join({}, {})", left.print_root(), right.print_root())
5623 }
5624 HydroNode::JoinHalf { left, right, .. } => {
5625 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5626 }
5627 HydroNode::Difference { pos, neg, .. } => {
5628 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5629 }
5630 HydroNode::AntiJoin { pos, neg, .. } => {
5631 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5632 }
5633 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5634 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5635 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5636 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5637 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5638 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5639 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5640 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5641 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5642 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5643 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5644 HydroNode::Unique { .. } => "Unique()".to_owned(),
5645 HydroNode::Sort { .. } => "Sort()".to_owned(),
5646 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5647 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5648 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5649 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5650 }
5651 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5652 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5653 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5654 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5655 HydroNode::Network { .. } => "Network()".to_owned(),
5656 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5657 HydroNode::Counter { tag, duration, .. } => {
5658 format!("Counter({:?}, {:?})", tag, duration)
5659 }
5660 HydroNode::VersionedNetworkFork {
5661 channel_name,
5662 senders,
5663 ..
5664 } => {
5665 let versions: Vec<u32> = senders.iter().map(|(v, _, _)| *v).collect();
5666 format!(
5667 "VersionedNetworkFork({}, senders={:?})",
5668 channel_name, versions
5669 )
5670 }
5671 HydroNode::VersionedNetwork { version, .. } => {
5672 format!("VersionedNetwork(v{})", version)
5673 }
5674 }
5675 }
5676}
5677
5678#[cfg(feature = "build")]
5679fn instantiate_network<'a, D>(
5680 env: &mut D::InstantiateEnv,
5681 from_location: &LocationId,
5682 to_location: &LocationId,
5683 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5684 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5685 name: Option<&str>,
5686 networking_info: &crate::networking::NetworkingInfo,
5687) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5688where
5689 D: Deploy<'a>,
5690{
5691 let ((sink, source), connect_fn) = match (from_location, to_location) {
5692 (&LocationId::Process(from), &LocationId::Process(to)) => {
5693 let from_node = processes
5694 .get(from)
5695 .unwrap_or_else(|| {
5696 panic!("A process used in the graph was not instantiated: {}", from)
5697 })
5698 .clone();
5699 let to_node = processes
5700 .get(to)
5701 .unwrap_or_else(|| {
5702 panic!("A process used in the graph was not instantiated: {}", to)
5703 })
5704 .clone();
5705
5706 let sink_port = from_node.next_port();
5707 let source_port = to_node.next_port();
5708
5709 (
5710 D::o2o_sink_source(
5711 env,
5712 &from_node,
5713 &sink_port,
5714 &to_node,
5715 &source_port,
5716 name,
5717 networking_info,
5718 ),
5719 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5720 )
5721 }
5722 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5723 let from_node = processes
5724 .get(from)
5725 .unwrap_or_else(|| {
5726 panic!("A process used in the graph was not instantiated: {}", from)
5727 })
5728 .clone();
5729 let to_node = clusters
5730 .get(to)
5731 .unwrap_or_else(|| {
5732 panic!("A cluster used in the graph was not instantiated: {}", to)
5733 })
5734 .clone();
5735
5736 let sink_port = from_node.next_port();
5737 let source_port = to_node.next_port();
5738
5739 (
5740 D::o2m_sink_source(
5741 env,
5742 &from_node,
5743 &sink_port,
5744 &to_node,
5745 &source_port,
5746 name,
5747 networking_info,
5748 ),
5749 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5750 )
5751 }
5752 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5753 let from_node = clusters
5754 .get(from)
5755 .unwrap_or_else(|| {
5756 panic!("A cluster used in the graph was not instantiated: {}", from)
5757 })
5758 .clone();
5759 let to_node = processes
5760 .get(to)
5761 .unwrap_or_else(|| {
5762 panic!("A process used in the graph was not instantiated: {}", to)
5763 })
5764 .clone();
5765
5766 let sink_port = from_node.next_port();
5767 let source_port = to_node.next_port();
5768
5769 (
5770 D::m2o_sink_source(
5771 env,
5772 &from_node,
5773 &sink_port,
5774 &to_node,
5775 &source_port,
5776 name,
5777 networking_info,
5778 ),
5779 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5780 )
5781 }
5782 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5783 let from_node = clusters
5784 .get(from)
5785 .unwrap_or_else(|| {
5786 panic!("A cluster used in the graph was not instantiated: {}", from)
5787 })
5788 .clone();
5789 let to_node = clusters
5790 .get(to)
5791 .unwrap_or_else(|| {
5792 panic!("A cluster used in the graph was not instantiated: {}", to)
5793 })
5794 .clone();
5795
5796 let sink_port = from_node.next_port();
5797 let source_port = to_node.next_port();
5798
5799 (
5800 D::m2m_sink_source(
5801 env,
5802 &from_node,
5803 &sink_port,
5804 &to_node,
5805 &source_port,
5806 name,
5807 networking_info,
5808 ),
5809 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5810 )
5811 }
5812 (LocationId::Tick(_, _), _) => panic!(),
5813 (_, LocationId::Tick(_, _)) => panic!(),
5814 (LocationId::Atomic(_), _) => panic!(),
5815 (_, LocationId::Atomic(_)) => panic!(),
5816 };
5817 (sink, source, connect_fn)
5818}
5819
5820#[cfg(test)]
5821mod serde_test;
5822
5823#[cfg(test)]
5824mod test {
5825 use std::mem::size_of;
5826
5827 use stageleft::{QuotedWithContext, q};
5828
5829 use super::*;
5830
5831 #[test]
5832 #[cfg_attr(
5833 not(feature = "build"),
5834 ignore = "expects inclusion of feature-gated fields"
5835 )]
5836 fn hydro_node_size() {
5837 assert_eq!(size_of::<HydroNode>(), 264);
5838 }
5839
5840 #[test]
5841 #[cfg_attr(
5842 not(feature = "build"),
5843 ignore = "expects inclusion of feature-gated fields"
5844 )]
5845 fn hydro_root_size() {
5846 assert_eq!(size_of::<HydroRoot>(), 136);
5847 }
5848
5849 #[test]
5850 fn test_simplify_q_macro_basic() {
5851 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5853 let result = simplify_q_macro(simple_expr.clone());
5854 assert_eq!(result, simple_expr);
5855 }
5856
5857 #[test]
5858 fn test_simplify_q_macro_actual_stageleft_call() {
5859 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5861 let result = simplify_q_macro(stageleft_call);
5862 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5865 }
5866
5867 #[test]
5868 fn test_closure_no_pipe_at_start() {
5869 let stageleft_call = q!({
5871 let foo = 123;
5872 move |b: usize| b + foo
5873 })
5874 .splice_fn1_ctx(&());
5875 let result = simplify_q_macro(stageleft_call);
5876 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5877 }
5878}