AsyncIterable Stream Helpers
Pure functions for composing AsyncIterables. No EventEmitters, no callbacks.
Philosophy
Pull-based: Consumer controls flow via for await...of. Natural backpressure.
Lazy: Iterators don't start until first next(). No work until consumed.
Cleanup: All helpers honor return() for early-break cleanup.
Core Helpers
merge(...sources)
Merge multiple AsyncIterables into one. Values emit in arrival order.
import { merge, map } from "@silvery/term/streams"
const keys = term.keys()
const resizes = term.resizes()
const events = merge(
map(keys, (k) => ({ type: "key", ...k })),
map(resizes, (r) => ({ type: "resize", ...r })),
)
for await (const event of events) {
// Process any event
}Behavior:
- First-come ordering (non-deterministic if simultaneous)
- Completes when ALL sources complete
- Errors propagate, remaining sources cleaned up
- Fresh iterable per call (don't share between consumers)
map(source, fn)
Transform each value.
const keyEvents = map(keys, (k) => ({ type: "key", key: k }))filter(source, predicate)
Keep values matching predicate.
const letters = filter(keys, (k) => /^[a-z]$/.test(k.key))filterMap(source, fn)
Filter + map in one pass. Return undefined to skip.
const keyEvents = filterMap(events, (e) => (e.type === "key" ? e : undefined))takeUntil(source, signal)
Stop when AbortSignal fires. Graceful completion (no error).
const controller = new AbortController()
// Later: controller.abort() ends iteration
for await (const event of takeUntil(events, controller.signal)) {
// ...
}take(source, n)
Take first n values.
const first3 = take(events, 3)Composition Helpers
concat(...sources)
Concatenate in sequence (not interleaved).
const all = concat(header, body, footer)zip(...sources)
Zip together. Completes at shortest source.
const pairs = zip(keys, timestamps) // [key, timestamp][]batch(source, size)
Collect into arrays of size n.
const batched = batch(events, 10) // AsyncIterable<Event[]>Rate Limiting
throttle(source, ms)
Emit first, then ignore for duration.
const throttled = throttle(mouseMoves, 16) // ~60fpsdebounce(source, ms)
Note: True debouncing is complex with pull-based iterables. This implementation yields only the final value after source completes.
const debounced = debounce(source, 300) // Last value after source endsTesting Helpers
fromArray(items)
Create AsyncIterable from array.
const events = fromArray([
{ type: "key", key: "j" },
{ type: "key", key: "k" },
])fromArrayWithDelay(items, ms)
Create with delay between items.
const slow = fromArrayWithDelay([1, 2, 3], 100) // 100ms gapsCleanup Guarantee
All helpers clean up properly on:
- Normal completion (source exhausted)
- Early break (
breakinfor await) - Errors (thrown and propagated)
for await (const event of merge(a, b, c)) {
if (done) break // All 3 sources get return() called
}Known Limitations
Multiple consumers: Don't share merged iterables. Each
merge()call creates fresh iterable.Slow consumer blocks producer: If you await slow work, next event is blocked.
typescriptfor await (const e of events) { await slowWork() // Blocks next event }Mitigation: Use separate async task for slow work.
Debounce is limited: Pull-based iterables can't do real-time debouncing. Use push-based patterns for that.
Merge ordering: If two sources yield "simultaneously", order is implementation-defined (but consistent within one iteration).
React Integration
Bridge to React hooks via AbortController:
useEffect(() => {
const controller = new AbortController()
const events = takeUntil(runtime.events(), controller.signal)
;(async () => {
for await (const event of events) {
if (event.type === "key") handler(event.key)
}
})()
return () => controller.abort()
}, [])