Skip to main content

hydro_lang/compile/
builder.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use slotmap::{SecondaryMap, SlotMap};
7
8#[cfg(feature = "build")]
9use super::compiled::CompiledFlow;
10#[cfg(feature = "build")]
11use super::deploy::{DeployFlow, DeployResult};
12#[cfg(feature = "build")]
13use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use super::ir::HydroRoot;
15use crate::location::{Cluster, External, LocationKey, LocationType, Process};
16
17/// A compile-time directive to spawn a future on a location's `LocalSet`
18/// alongside the DFIR scheduler.
19pub enum Sidecar {
20    /// A ready-to-go future expression (e.g. telemetry metrics collection).
21    Simple {
22        location_key: LocationKey,
23        future_expr: Box<syn::Expr>,
24    },
25    /// A user-owned sidecar that returns a `(Stream, Sink)` pair to the framework.
26    /// The closure is called at startup; the returned stream feeds items into the
27    /// dataflow and the returned sink receives items from the dataflow.
28    Bidi {
29        location_key: LocationKey,
30        sidecar_id: SidecarId,
31        sidecar_closure: Box<syn::Expr>,
32    },
33}
34#[cfg(feature = "sim")]
35#[cfg(stageleft_runtime)]
36use crate::sim::flow::SimFlow;
37use crate::staging_util::Invariant;
38
39#[stageleft::export(ExternalPortId, CycleId, ClockId, SidecarId, StmtId, HandoffId)]
40crate::newtype_counter! {
41    /// ID for an external output.
42    pub struct ExternalPortId(usize);
43
44    /// ID for a [`crate::location::Location::forward_ref`] cycle.
45    pub struct CycleId(usize);
46
47    /// ID for clocks (ticks).
48    pub struct ClockId(usize);
49
50    /// ID for user-owned sidecars.
51    pub struct SidecarId(usize);
52
53    /// ID for a statement in the emitted DFIR graph.
54    pub struct StmtId(usize);
55
56    /// ID for a handoff channel in the simulator.
57    pub struct HandoffId(usize);
58}
59
60impl CycleId {
61    #[cfg(feature = "build")]
62    pub(crate) fn as_ident(&self) -> syn::Ident {
63        syn::Ident::new(&format!("cycle_{}", self), proc_macro2::Span::call_site())
64    }
65}
66
67impl SidecarId {
68    /// Derives the two idents for a bidi sidecar: `(stream, sink)`.
69    pub fn idents(&self) -> (syn::Ident, syn::Ident) {
70        let span = proc_macro2::Span::call_site();
71        (
72            syn::Ident::new(&format!("__hydro_sidecar_{}_stream", self), span),
73            syn::Ident::new(&format!("__hydro_sidecar_{}_sink", self), span),
74        )
75    }
76}
77
78pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
79
80pub(crate) struct FlowStateInner {
81    /// Tracks the roots of the dataflow IR. This is referenced by
82    /// `Stream` and `HfCycle` to build the IR. The inner option will
83    /// be set to `None` when this builder is finalized.
84    roots: Option<Vec<HydroRoot>>,
85
86    /// Counter for generating unique external output identifiers.
87    next_external_port: crate::Counter<ExternalPortId>,
88
89    /// Counters for generating identifiers for cycles.
90    next_cycle_id: crate::Counter<CycleId>,
91
92    /// Counters for clock IDs.
93    next_clock_id: crate::Counter<ClockId>,
94
95    /// Counter for generating unique sidecar identifiers, not used for anything else.
96    next_sidecar_id: crate::Counter<SidecarId>,
97
98    /// Compile-time sidecar directives. Processed during compilation,
99    /// not part of the dataflow IR.
100    pub sidecars: Vec<Sidecar>,
101}
102
103impl FlowStateInner {
104    pub fn next_external_port(&mut self) -> ExternalPortId {
105        self.next_external_port.get_and_increment()
106    }
107
108    pub fn next_cycle_id(&mut self) -> CycleId {
109        self.next_cycle_id.get_and_increment()
110    }
111
112    pub fn next_clock_id(&mut self) -> ClockId {
113        self.next_clock_id.get_and_increment()
114    }
115
116    pub fn next_sidecar_id(&mut self) -> SidecarId {
117        self.next_sidecar_id.get_and_increment()
118    }
119
120    pub fn push_root(&mut self, root: HydroRoot) {
121        self.roots
122            .as_mut()
123            .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
124            .push(root);
125    }
126
127    pub fn try_push_root(&mut self, root: HydroRoot) {
128        if let Some(roots) = self.roots.as_mut() {
129            roots.push(root);
130        }
131    }
132}
133
134pub struct FlowBuilder<'a> {
135    /// Hydro IR and associated counters
136    flow_state: FlowState,
137
138    /// Locations and their type.
139    locations: SlotMap<LocationKey, LocationType>,
140    /// Map from raw location ID to name (including externals).
141    location_names: SecondaryMap<LocationKey, String>,
142    /// The program version each location belongs to. Every location has an entry (0 unless it is a
143    /// `next_version` successor); populated eagerly at location creation.
144    #[cfg(feature = "sim")]
145    location_version: SecondaryMap<LocationKey, u32>,
146    /// Maps each location to the root key of its cross-version correspondence chain. Every location
147    /// has an entry (its own key unless it is a `next_version` successor); populated eagerly at
148    /// location creation.
149    #[cfg(feature = "sim")]
150    location_group: SecondaryMap<LocationKey, LocationKey>,
151
152    /// Application name used in telemetry.
153    #[cfg_attr(
154        not(feature = "build"),
155        expect(dead_code, reason = "unused without build")
156    )]
157    flow_name: String,
158
159    /// Tracks whether this flow has been finalized; it is an error to
160    /// drop without finalizing.
161    finalized: bool,
162
163    /// 'a on a FlowBuilder is used to ensure that staged code does not
164    /// capture more data that it is allowed to; 'a is generated at the
165    /// entrypoint of the staged code and we keep it invariant here
166    /// to enforce the appropriate constraints
167    _phantom: Invariant<'a>,
168}
169
170impl Drop for FlowBuilder<'_> {
171    fn drop(&mut self) {
172        if !self.finalized && !std::thread::panicking() {
173            panic!(
174                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
175            );
176        }
177    }
178}
179
180#[expect(missing_docs, reason = "TODO")]
181impl<'a> FlowBuilder<'a> {
182    /// Creates a new `FlowBuilder` to construct a Hydro program, using the Cargo package name as the program name.
183    #[expect(
184        clippy::new_without_default,
185        reason = "call `new` explicitly, not `default`"
186    )]
187    pub fn new() -> Self {
188        let mut name = std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".to_owned());
189        if let Ok(bin_path) = std::env::current_exe()
190            && let Some(bin_name) = bin_path.file_stem()
191        {
192            name = format!("{}/{}", name, bin_name.display());
193        }
194        Self::with_name(name)
195    }
196
197    /// Creates a new `FlowBuilder` to construct a Hydro program, with the given program name.
198    pub fn with_name(name: impl Into<String>) -> Self {
199        Self {
200            flow_state: Rc::new(RefCell::new(FlowStateInner {
201                roots: Some(vec![]),
202                next_external_port: crate::Counter::default(),
203                next_cycle_id: crate::Counter::default(),
204                next_clock_id: crate::Counter::default(),
205                next_sidecar_id: crate::Counter::default(),
206                sidecars: Vec::new(),
207            })),
208            locations: SlotMap::with_key(),
209            location_names: SecondaryMap::new(),
210            #[cfg(feature = "sim")]
211            location_version: SecondaryMap::new(),
212            #[cfg(feature = "sim")]
213            location_group: SecondaryMap::new(),
214            flow_name: name.into(),
215            finalized: false,
216            _phantom: PhantomData,
217        }
218    }
219
220    pub(crate) fn flow_state(&self) -> &FlowState {
221        &self.flow_state
222    }
223
224    fn insert_location(&mut self, ty: LocationType, name: String) -> LocationKey {
225        let key = self.locations.insert(ty);
226        self.location_names.insert(key, name);
227        #[cfg(feature = "sim")]
228        {
229            self.location_version.insert(key, 0);
230            self.location_group.insert(key, key);
231        }
232        key
233    }
234
235    pub fn process<P>(&mut self) -> Process<'a, P> {
236        let key = self.insert_location(LocationType::Process, type_name::<P>().to_owned());
237        Process {
238            key,
239            flow_state: self.flow_state().clone(),
240            _phantom: PhantomData,
241        }
242    }
243
244    pub fn cluster<C>(&mut self) -> Cluster<'a, C> {
245        let key = self.insert_location(LocationType::Cluster, type_name::<C>().to_owned());
246        Cluster {
247            key,
248            flow_state: self.flow_state().clone(),
249            _phantom: PhantomData,
250        }
251    }
252
253    pub fn external<E>(&mut self) -> External<'a, E> {
254        let key = self.insert_location(LocationType::External, type_name::<E>().to_owned());
255        External {
256            key,
257            flow_state: self.flow_state().clone(),
258            _phantom: PhantomData,
259        }
260    }
261
262    #[cfg(feature = "sim")]
263    pub fn next_version<C>(&mut self, cluster: &Cluster<'a, C>) -> Cluster<'a, C> {
264        let from = cluster.key;
265        let group = self.location_group[from];
266        // Compute the next version from the highest version already assigned to any location in
267        // the group, rather than from `from` directly. Otherwise calling `next_version` twice on
268        // the same source cluster would produce two clusters with the same version number.
269        let version = self
270            .location_group
271            .iter()
272            .filter(|&(_, &g)| g == group)
273            .map(|(k, _)| self.location_version[k])
274            .max()
275            .unwrap_or(0)
276            + 1;
277        let key = self.insert_location(LocationType::Cluster, type_name::<C>().to_owned());
278        self.location_version.insert(key, version);
279        self.location_group.insert(key, group);
280        Cluster {
281            key,
282            flow_state: self.flow_state().clone(),
283            _phantom: PhantomData,
284        }
285    }
286}
287
288#[cfg(feature = "build")]
289#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
290#[expect(missing_docs, reason = "TODO")]
291impl<'a> FlowBuilder<'a> {
292    pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
293        self.finalized = true;
294
295        let mut flow_state = self.flow_state.borrow_mut();
296        let mut ir = flow_state.roots.take().unwrap();
297        let sidecars = std::mem::take(&mut flow_state.sidecars);
298        drop(flow_state);
299
300        super::ir::unify_atomic_ticks(&mut ir);
301
302        super::built::BuiltFlow {
303            ir,
304            locations: std::mem::take(&mut self.locations),
305            location_names: std::mem::take(&mut self.location_names),
306            sidecars,
307            flow_name: std::mem::take(&mut self.flow_name),
308            #[cfg(feature = "sim")]
309            location_version: std::mem::take(&mut self.location_version),
310            #[cfg(feature = "sim")]
311            location_group: std::mem::take(&mut self.location_group),
312            _phantom: PhantomData,
313        }
314    }
315
316    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
317        self.finalize().with_default_optimize()
318    }
319
320    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
321        self.finalize().optimize_with(f)
322    }
323
324    pub fn with_process<P, D: Deploy<'a>>(
325        self,
326        process: &Process<P>,
327        spec: impl IntoProcessSpec<'a, D>,
328    ) -> DeployFlow<'a, D> {
329        self.with_default_optimize().with_process(process, spec)
330    }
331
332    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
333        self,
334        spec: impl Fn() -> S,
335    ) -> DeployFlow<'a, D> {
336        self.with_default_optimize().with_remaining_processes(spec)
337    }
338
339    pub fn with_external<P, D: Deploy<'a>>(
340        self,
341        process: &External<P>,
342        spec: impl ExternalSpec<'a, D>,
343    ) -> DeployFlow<'a, D> {
344        self.with_default_optimize().with_external(process, spec)
345    }
346
347    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
348        self,
349        spec: impl Fn() -> S,
350    ) -> DeployFlow<'a, D> {
351        self.with_default_optimize().with_remaining_externals(spec)
352    }
353
354    pub fn with_cluster<C, D: Deploy<'a>>(
355        self,
356        cluster: &Cluster<C>,
357        spec: impl ClusterSpec<'a, D>,
358    ) -> DeployFlow<'a, D> {
359        self.with_default_optimize().with_cluster(cluster, spec)
360    }
361
362    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
363        self,
364        spec: impl Fn() -> S,
365    ) -> DeployFlow<'a, D> {
366        self.with_default_optimize().with_remaining_clusters(spec)
367    }
368
369    pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
370        self.with_default_optimize::<D>().compile()
371    }
372
373    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
374        self.with_default_optimize().deploy(env)
375    }
376
377    #[cfg(feature = "sim")]
378    /// Creates a simulation for this builder, which can be used to run deterministic simulations
379    /// of the Hydro program.
380    pub fn sim(self) -> SimFlow<'a> {
381        self.finalize().sim()
382    }
383
384    pub fn from_built<'b>(built: &super::built::BuiltFlow) -> FlowBuilder<'b> {
385        FlowBuilder {
386            flow_state: Rc::new(RefCell::new(FlowStateInner {
387                roots: None,
388                next_external_port: crate::Counter::default(),
389                next_cycle_id: crate::Counter::default(),
390                next_clock_id: crate::Counter::default(),
391                next_sidecar_id: crate::Counter::default(),
392                sidecars: Vec::new(),
393            })),
394            locations: built.locations.clone(),
395            location_names: built.location_names.clone(),
396            #[cfg(feature = "sim")]
397            location_version: built.location_version.clone(),
398            #[cfg(feature = "sim")]
399            location_group: built.location_group.clone(),
400            flow_name: built.flow_name.clone(),
401            finalized: false,
402            _phantom: PhantomData,
403        }
404    }
405
406    #[doc(hidden)] // TODO(mingwei): This is an unstable API for now
407    pub fn replace_ir(&mut self, roots: Vec<HydroRoot>) {
408        self.flow_state.borrow_mut().roots = Some(roots);
409    }
410
411    #[doc(hidden)] // TODO(mingwei): This is an unstable API for now
412    pub fn next_clock_id(&mut self) -> ClockId {
413        self.flow_state.borrow_mut().next_clock_id()
414    }
415
416    #[doc(hidden)] // TODO(mingwei): This is an unstable API for now
417    pub fn next_cycle_id(&mut self) -> CycleId {
418        self.flow_state.borrow_mut().next_cycle_id()
419    }
420}