Module Stream

Pull-based streams with fallible completion and explicit cancellation.

A stream can end successfully, end with an error, or yield one item at a time. Consumers may stop early by calling .cancel, which lets the producer close any underlying resource before the stream is discarded.

type Stream<e, a> = recursive either {
  .end Try<e, !>,
  .item choice {
    .cancel => Try<e, !>,
    .get => (a) self,
  },
}

A pull-based sequence of values of type a whose completion can fail with an error of type e.

Cases:

  • .end means the stream is exhausted. Its payload is .ok! on normal completion, or .err if the producer failed.
  • .item offers one item. The consumer must either call .get to receive it and continue, or .cancel to stop early.
dec Stream.All : <e, a>[Stream<e, a>] [box [a] Bool] Try<e, Bool>

Returns .ok.true! if the test function holds for every item. Short-circuits and cancels the source on the first .false!.

dec Stream.Any : <e, a>[Stream<e, a>] [box [a] Bool] Try<e, Bool>

Returns .ok.true! if the test function holds for at least one item. Short-circuits and cancels the source on the first .true!.

dec Stream.Cancel : <e, a>[Stream<e, a>] Try<e, !>

Cancels a stream if it has not already ended, returning any cancellation or completion error.

dec Stream.Concat : <e: box, a>[List<Stream<e, a>>] Stream<e, a>

Concatenates a list of streams into a single stream.

dec Stream.Drop : <e, a: box>[Stream<e, a>] [Nat] Stream<e, a>

Skips the requested number of items and yields the rest.

dec Stream.DropWhile : <e, a: box>[Stream<e, a>] [box [a] Bool] Stream<e, a>

Skips items while the test function returns .true!, then yields the rest.

dec Stream.Filter : <e, a: box>[Stream<e, a>] [box [a] Bool] Stream<e, a>

Keeps the items for which the test function returns .true!.

dec Stream.FlatMap : <e: box, a>[Stream<e, a>] <b>[box [a] Stream<e, b>] Stream<e, b>

Maps each item to a stream, then flattens the result.

dec Stream.ForEach : <r>[r] <e, a>[Stream<e, a>] [box [r, a] r] (Try<e, !>) r

Folds over the stream. The returned pair contains the stream completion status and the final accumulator.

dec Stream.FromList : <a: box>[List<a>] Stream<either {}, a>

Creates an infallible stream from a list.

dec Stream.Map : <e, a>[Stream<e, a>] <b>[box [a] b] Stream<e, b>

Applies a mapping function to each item of a stream.

dec Stream.MapErr : <e1, a>[Stream<e1, a>] <e2>[box [e1] e2] Stream<e2, a>

Applies a mapping function to the stream's error type.

dec Stream.Sum : <e, a: number>[Stream<e, a>] Try<e, a>

Calculates the sum of all stream items.

dec Stream.Take : <e, a: box>[Stream<e, a>] [Nat] Stream<e, a>

Yields at most the requested number of items, cancelling the source if enough items were taken before it ended.

dec Stream.TakeWhile : <e, a: box>[Stream<e, a>] [box [a] Bool] Stream<e, a>

Yields items while the test function returns .true!, then cancels the source.

dec Stream.ToList : <e, a: box>[Stream<e, a>] Try<e, List<a>>

Collects the stream into a list, preserving any stream error.