Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Breaking changes

- `ToStreamBuilder` is now generic over the item type instead of the container builder; the container builder moves to a method-level generic. This enables method-call syntax: `(0..3).to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)` instead of the UFCS form `ToStreamBuilder::<CapacityContainerBuilder<_>>::to_stream_with_builder(0..3, scope)`.

## [0.29.0](https://github.com/TimelyDataflow/timely-dataflow/compare/timely-v0.28.1...timely-v0.29.0) - 2026-04-13

The theme in this release is simplifying specialization by removing monomorphization sprawl.
Expand Down
20 changes: 12 additions & 8 deletions timely/src/dataflow/operators/core/to_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use crate::dataflow::{Stream, Scope};

/// Converts to a timely [Stream], using a container builder.
pub trait ToStreamBuilder<CB: ContainerBuilder> {
pub trait ToStreamBuilder<Item> {
Comment thread
frankmcsherry marked this conversation as resolved.
Outdated
/// Converts to a timely [Stream], using the supplied container builder type.
///
/// # Examples
Expand All @@ -18,23 +18,27 @@
/// use timely::container::CapacityContainerBuilder;
///
/// let (data1, data2) = timely::example(|scope| {
/// let data1 = ToStreamBuilder::<CapacityContainerBuilder<_>>::to_stream_with_builder(0..3, scope)
/// let data1 = (0..3).to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)
/// .container::<Vec<_>>()
/// .capture();
/// let data2 = ToStreamBuilder::<CapacityContainerBuilder<_>>::to_stream_with_builder(vec![0,1,2], scope)
/// let data2 = vec![0,1,2].to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)
/// .container::<Vec<_>>()
/// .capture();
/// (data1, data2)
/// });
///
/// assert_eq!(data1.extract(), data2.extract());
/// ```
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>;
fn to_stream_with_builder<'scope, T: Timestamp, CB: ContainerBuilder>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>

Check warning on line 32 in timely/src/dataflow/operators/core/to_stream.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

bound is defined in more than one place
where
CB: PushInto<Item>;
}

impl<CB: ContainerBuilder, I: IntoIterator+'static> ToStreamBuilder<CB> for I where CB: PushInto<I::Item> {
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container> {

impl<I: IntoIterator+'static> ToStreamBuilder<I::Item> for I {
fn to_stream_with_builder<'scope, T: Timestamp, CB: ContainerBuilder>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>

Check warning on line 38 in timely/src/dataflow/operators/core/to_stream.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

bound is defined in more than one place
where
CB: PushInto<I::Item>
{
source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| {

// Acquire an activator, so that the operator can rescheduled itself.
Expand Down Expand Up @@ -84,6 +88,6 @@

impl<C: Container + SizableContainer, I: IntoIterator+'static> ToStream<C> for I where C: PushInto<I::Item> {
fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
ToStreamBuilder::<CapacityContainerBuilder<C>>::to_stream_with_builder(self, scope)
ToStreamBuilder::to_stream_with_builder::<_, CapacityContainerBuilder<C>>(self, scope)
}
}
Loading