1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use slotmap::{SecondaryMap, SlotMap};
7
8#[cfg(feature = "build")]
9use super::compiled::CompiledFlow;
10#[cfg(feature = "build")]
11use super::deploy::{DeployFlow, DeployResult};
12#[cfg(feature = "build")]
13use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use super::ir::HydroRoot;
15use crate::location::{Cluster, External, LocationKey, LocationType, Process};
16
17pub enum Sidecar {
20 Simple {
22 location_key: LocationKey,
23 future_expr: Box<syn::Expr>,
24 },
25 Bidi {
29 location_key: LocationKey,
30 sidecar_id: SidecarId,
31 sidecar_closure: Box<syn::Expr>,
32 },
33}
34#[cfg(feature = "sim")]
35#[cfg(stageleft_runtime)]
36use crate::sim::flow::SimFlow;
37use crate::staging_util::Invariant;
38
39#[stageleft::export(ExternalPortId, CycleId, ClockId, SidecarId, StmtId, HandoffId)]
40crate::newtype_counter! {
41 pub struct ExternalPortId(usize);
43
44 pub struct CycleId(usize);
46
47 pub struct ClockId(usize);
49
50 pub struct SidecarId(usize);
52
53 pub struct StmtId(usize);
55
56 pub struct HandoffId(usize);
58}
59
60impl CycleId {
61 #[cfg(feature = "build")]
62 pub(crate) fn as_ident(&self) -> syn::Ident {
63 syn::Ident::new(&format!("cycle_{}", self), proc_macro2::Span::call_site())
64 }
65}
66
67impl SidecarId {
68 pub fn idents(&self) -> (syn::Ident, syn::Ident) {
70 let span = proc_macro2::Span::call_site();
71 (
72 syn::Ident::new(&format!("__hydro_sidecar_{}_stream", self), span),
73 syn::Ident::new(&format!("__hydro_sidecar_{}_sink", self), span),
74 )
75 }
76}
77
78pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
79
80pub(crate) struct FlowStateInner {
81 roots: Option<Vec<HydroRoot>>,
85
86 next_external_port: crate::Counter<ExternalPortId>,
88
89 next_cycle_id: crate::Counter<CycleId>,
91
92 next_clock_id: crate::Counter<ClockId>,
94
95 next_sidecar_id: crate::Counter<SidecarId>,
97
98 pub sidecars: Vec<Sidecar>,
101}
102
103impl FlowStateInner {
104 pub fn next_external_port(&mut self) -> ExternalPortId {
105 self.next_external_port.get_and_increment()
106 }
107
108 pub fn next_cycle_id(&mut self) -> CycleId {
109 self.next_cycle_id.get_and_increment()
110 }
111
112 pub fn next_clock_id(&mut self) -> ClockId {
113 self.next_clock_id.get_and_increment()
114 }
115
116 pub fn next_sidecar_id(&mut self) -> SidecarId {
117 self.next_sidecar_id.get_and_increment()
118 }
119
120 pub fn push_root(&mut self, root: HydroRoot) {
121 self.roots
122 .as_mut()
123 .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
124 .push(root);
125 }
126
127 pub fn try_push_root(&mut self, root: HydroRoot) {
128 if let Some(roots) = self.roots.as_mut() {
129 roots.push(root);
130 }
131 }
132}
133
134pub struct FlowBuilder<'a> {
135 flow_state: FlowState,
137
138 locations: SlotMap<LocationKey, LocationType>,
140 location_names: SecondaryMap<LocationKey, String>,
142 #[cfg(feature = "sim")]
145 location_version: SecondaryMap<LocationKey, u32>,
146 #[cfg(feature = "sim")]
150 location_group: SecondaryMap<LocationKey, LocationKey>,
151
152 #[cfg_attr(
154 not(feature = "build"),
155 expect(dead_code, reason = "unused without build")
156 )]
157 flow_name: String,
158
159 finalized: bool,
162
163 _phantom: Invariant<'a>,
168}
169
170impl Drop for FlowBuilder<'_> {
171 fn drop(&mut self) {
172 if !self.finalized && !std::thread::panicking() {
173 panic!(
174 "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
175 );
176 }
177 }
178}
179
180#[expect(missing_docs, reason = "TODO")]
181impl<'a> FlowBuilder<'a> {
182 #[expect(
184 clippy::new_without_default,
185 reason = "call `new` explicitly, not `default`"
186 )]
187 pub fn new() -> Self {
188 let mut name = std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".to_owned());
189 if let Ok(bin_path) = std::env::current_exe()
190 && let Some(bin_name) = bin_path.file_stem()
191 {
192 name = format!("{}/{}", name, bin_name.display());
193 }
194 Self::with_name(name)
195 }
196
197 pub fn with_name(name: impl Into<String>) -> Self {
199 Self {
200 flow_state: Rc::new(RefCell::new(FlowStateInner {
201 roots: Some(vec![]),
202 next_external_port: crate::Counter::default(),
203 next_cycle_id: crate::Counter::default(),
204 next_clock_id: crate::Counter::default(),
205 next_sidecar_id: crate::Counter::default(),
206 sidecars: Vec::new(),
207 })),
208 locations: SlotMap::with_key(),
209 location_names: SecondaryMap::new(),
210 #[cfg(feature = "sim")]
211 location_version: SecondaryMap::new(),
212 #[cfg(feature = "sim")]
213 location_group: SecondaryMap::new(),
214 flow_name: name.into(),
215 finalized: false,
216 _phantom: PhantomData,
217 }
218 }
219
220 pub(crate) fn flow_state(&self) -> &FlowState {
221 &self.flow_state
222 }
223
224 fn insert_location(&mut self, ty: LocationType, name: String) -> LocationKey {
225 let key = self.locations.insert(ty);
226 self.location_names.insert(key, name);
227 #[cfg(feature = "sim")]
228 {
229 self.location_version.insert(key, 0);
230 self.location_group.insert(key, key);
231 }
232 key
233 }
234
235 pub fn process<P>(&mut self) -> Process<'a, P> {
236 let key = self.insert_location(LocationType::Process, type_name::<P>().to_owned());
237 Process {
238 key,
239 flow_state: self.flow_state().clone(),
240 _phantom: PhantomData,
241 }
242 }
243
244 pub fn cluster<C>(&mut self) -> Cluster<'a, C> {
245 let key = self.insert_location(LocationType::Cluster, type_name::<C>().to_owned());
246 Cluster {
247 key,
248 flow_state: self.flow_state().clone(),
249 _phantom: PhantomData,
250 }
251 }
252
253 pub fn external<E>(&mut self) -> External<'a, E> {
254 let key = self.insert_location(LocationType::External, type_name::<E>().to_owned());
255 External {
256 key,
257 flow_state: self.flow_state().clone(),
258 _phantom: PhantomData,
259 }
260 }
261
262 #[cfg(feature = "sim")]
263 pub fn next_version<C>(&mut self, cluster: &Cluster<'a, C>) -> Cluster<'a, C> {
264 let from = cluster.key;
265 let group = self.location_group[from];
266 let version = self
270 .location_group
271 .iter()
272 .filter(|&(_, &g)| g == group)
273 .map(|(k, _)| self.location_version[k])
274 .max()
275 .unwrap_or(0)
276 + 1;
277 let key = self.insert_location(LocationType::Cluster, type_name::<C>().to_owned());
278 self.location_version.insert(key, version);
279 self.location_group.insert(key, group);
280 Cluster {
281 key,
282 flow_state: self.flow_state().clone(),
283 _phantom: PhantomData,
284 }
285 }
286}
287
288#[cfg(feature = "build")]
289#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
290#[expect(missing_docs, reason = "TODO")]
291impl<'a> FlowBuilder<'a> {
292 pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
293 self.finalized = true;
294
295 let mut flow_state = self.flow_state.borrow_mut();
296 let mut ir = flow_state.roots.take().unwrap();
297 let sidecars = std::mem::take(&mut flow_state.sidecars);
298 drop(flow_state);
299
300 super::ir::unify_atomic_ticks(&mut ir);
301
302 super::built::BuiltFlow {
303 ir,
304 locations: std::mem::take(&mut self.locations),
305 location_names: std::mem::take(&mut self.location_names),
306 sidecars,
307 flow_name: std::mem::take(&mut self.flow_name),
308 #[cfg(feature = "sim")]
309 location_version: std::mem::take(&mut self.location_version),
310 #[cfg(feature = "sim")]
311 location_group: std::mem::take(&mut self.location_group),
312 _phantom: PhantomData,
313 }
314 }
315
316 pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
317 self.finalize().with_default_optimize()
318 }
319
320 pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
321 self.finalize().optimize_with(f)
322 }
323
324 pub fn with_process<P, D: Deploy<'a>>(
325 self,
326 process: &Process<P>,
327 spec: impl IntoProcessSpec<'a, D>,
328 ) -> DeployFlow<'a, D> {
329 self.with_default_optimize().with_process(process, spec)
330 }
331
332 pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
333 self,
334 spec: impl Fn() -> S,
335 ) -> DeployFlow<'a, D> {
336 self.with_default_optimize().with_remaining_processes(spec)
337 }
338
339 pub fn with_external<P, D: Deploy<'a>>(
340 self,
341 process: &External<P>,
342 spec: impl ExternalSpec<'a, D>,
343 ) -> DeployFlow<'a, D> {
344 self.with_default_optimize().with_external(process, spec)
345 }
346
347 pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
348 self,
349 spec: impl Fn() -> S,
350 ) -> DeployFlow<'a, D> {
351 self.with_default_optimize().with_remaining_externals(spec)
352 }
353
354 pub fn with_cluster<C, D: Deploy<'a>>(
355 self,
356 cluster: &Cluster<C>,
357 spec: impl ClusterSpec<'a, D>,
358 ) -> DeployFlow<'a, D> {
359 self.with_default_optimize().with_cluster(cluster, spec)
360 }
361
362 pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
363 self,
364 spec: impl Fn() -> S,
365 ) -> DeployFlow<'a, D> {
366 self.with_default_optimize().with_remaining_clusters(spec)
367 }
368
369 pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
370 self.with_default_optimize::<D>().compile()
371 }
372
373 pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
374 self.with_default_optimize().deploy(env)
375 }
376
377 #[cfg(feature = "sim")]
378 pub fn sim(self) -> SimFlow<'a> {
381 self.finalize().sim()
382 }
383
384 pub fn from_built<'b>(built: &super::built::BuiltFlow) -> FlowBuilder<'b> {
385 FlowBuilder {
386 flow_state: Rc::new(RefCell::new(FlowStateInner {
387 roots: None,
388 next_external_port: crate::Counter::default(),
389 next_cycle_id: crate::Counter::default(),
390 next_clock_id: crate::Counter::default(),
391 next_sidecar_id: crate::Counter::default(),
392 sidecars: Vec::new(),
393 })),
394 locations: built.locations.clone(),
395 location_names: built.location_names.clone(),
396 #[cfg(feature = "sim")]
397 location_version: built.location_version.clone(),
398 #[cfg(feature = "sim")]
399 location_group: built.location_group.clone(),
400 flow_name: built.flow_name.clone(),
401 finalized: false,
402 _phantom: PhantomData,
403 }
404 }
405
406 #[doc(hidden)] pub fn replace_ir(&mut self, roots: Vec<HydroRoot>) {
408 self.flow_state.borrow_mut().roots = Some(roots);
409 }
410
411 #[doc(hidden)] pub fn next_clock_id(&mut self) -> ClockId {
413 self.flow_state.borrow_mut().next_clock_id()
414 }
415
416 #[doc(hidden)] pub fn next_cycle_id(&mut self) -> CycleId {
418 self.flow_state.borrow_mut().next_cycle_id()
419 }
420}