196196< a href =#196 id =196 data-nosnippet > 196</ a > < span class ="self "> self</ span > .predicates
197197< a href =#197 id =197 data-nosnippet > 197</ a > .clone()
198198< a href =#198 id =198 data-nosnippet > 198</ a > .map_or(String::from(< span class ="string "> ""</ span > ), |p| < span class ="macro "> format!</ span > (< span class ="string "> "{p}"</ span > ))
199- < a href =#199 id =199 data-nosnippet > 199</ a > )
200- < a href =#200 id =200 data-nosnippet > 200</ a > }
201- < a href =#201 id =201 data-nosnippet > 201</ a > }
202- < a href =#202 id =202 data-nosnippet > 202</ a >
203- < a href =#203 id =203 data-nosnippet > 203</ a > < span class ="doccomment "> /// Asynchronously retrieves a stream of [`RecordBatch`] instances
204- < a href =#204 id =204 data-nosnippet > 204</ a > /// from a given table.
205- < a href =#205 id =205 data-nosnippet > 205</ a > ///
206- < a href =#206 id =206 data-nosnippet > 206</ a > /// This function initializes a [`TableScan`], builds it,
207- < a href =#207 id =207 data-nosnippet > 207</ a > /// and then converts it into a stream of Arrow [`RecordBatch`]es.
208- < a href =#208 id =208 data-nosnippet > 208</ a > </ span > < span class ="kw "> async fn </ span > get_batch_stream(
209- < a href =#209 id =209 data-nosnippet > 209</ a > table: Table,
210- < a href =#210 id =210 data-nosnippet > 210</ a > snapshot_id: < span class ="prelude-ty "> Option</ span > <i64>,
211- < a href =#211 id =211 data-nosnippet > 211</ a > column_names: < span class ="prelude-ty "> Option</ span > <Vec<String>>,
212- < a href =#212 id =212 data-nosnippet > 212</ a > predicates: < span class ="prelude-ty "> Option</ span > <Predicate>,
213- < a href =#213 id =213 data-nosnippet > 213</ a > ) -> DFResult<Pin<Box<< span class ="kw "> dyn </ span > Stream<Item = DFResult<RecordBatch>> + Send>>> {
214- < a href =#214 id =214 data-nosnippet > 214</ a > < span class ="kw "> let </ span > scan_builder = < span class ="kw "> match </ span > snapshot_id {
215- < a href =#215 id =215 data-nosnippet > 215</ a > < span class ="prelude-val "> Some</ span > (snapshot_id) => table.scan().snapshot_id(snapshot_id),
216- < a href =#216 id =216 data-nosnippet > 216</ a > < span class ="prelude-val "> None </ span > => table.scan(),
217- < a href =#217 id =217 data-nosnippet > 217</ a > };
218- < a href =#218 id =218 data-nosnippet > 218</ a >
219- < a href =#219 id =219 data-nosnippet > 219</ a > < span class ="kw "> let </ span > < span class ="kw-2 "> mut </ span > scan_builder = < span class ="kw "> match </ span > column_names {
220- < a href =#220 id =220 data-nosnippet > 220</ a > < span class ="prelude-val "> Some</ span > (column_names) => scan_builder.select(column_names),
221- < a href =#221 id =221 data-nosnippet > 221</ a > < span class ="prelude-val "> None </ span > => scan_builder.select_all(),
222- < a href =#222 id =222 data-nosnippet > 222</ a > };
223- < a href =#223 id =223 data-nosnippet > 223</ a > < span class ="kw "> if let </ span > < span class ="prelude-val "> Some</ span > (pred) = predicates {
224- < a href =#224 id =224 data-nosnippet > 224</ a > scan_builder = scan_builder.with_filter(pred);
225- < a href =#225 id =225 data-nosnippet > 225</ a > }
226- < a href =#226 id =226 data-nosnippet > 226</ a > < span class ="kw "> let </ span > table_scan = scan_builder.build().map_err(to_datafusion_error)< span class ="question-mark "> ?</ span > ;
227- < a href =#227 id =227 data-nosnippet > 227</ a >
228- < a href =#228 id =228 data-nosnippet > 228</ a > < span class ="kw "> let </ span > stream = table_scan
229- < a href =#229 id =229 data-nosnippet > 229</ a > .to_arrow()
230- < a href =#230 id =230 data-nosnippet > 230</ a > .< span class ="kw "> await
231- < a href =#231 id =231 data-nosnippet > 231</ a > </ span > .map_err(to_datafusion_error)< span class ="question-mark "> ?
232- < a href =#232 id =232 data-nosnippet > 232</ a > </ span > .map_err(to_datafusion_error);
233- < a href =#233 id =233 data-nosnippet > 233</ a > < span class ="prelude-val "> Ok</ span > (Box::pin(stream))
234- < a href =#234 id =234 data-nosnippet > 234</ a > }
235- < a href =#235 id =235 data-nosnippet > 235</ a >
236- < a href =#236 id =236 data-nosnippet > 236</ a > < span class ="kw "> fn </ span > get_column_names(
237- < a href =#237 id =237 data-nosnippet > 237</ a > schema: ArrowSchemaRef,
238- < a href =#238 id =238 data-nosnippet > 238</ a > projection: < span class ="prelude-ty "> Option</ span > << span class ="kw-2 "> &</ span > Vec<usize>>,
239- < a href =#239 id =239 data-nosnippet > 239</ a > ) -> < span class ="prelude-ty "> Option</ span > <Vec<String>> {
240- < a href =#240 id =240 data-nosnippet > 240</ a > projection.map(|v| {
241- < a href =#241 id =241 data-nosnippet > 241</ a > v.iter()
242- < a href =#242 id =242 data-nosnippet > 242</ a > .map(|p| schema.field(< span class ="kw-2 "> *</ span > p).name().clone())
243- < a href =#243 id =243 data-nosnippet > 243</ a > .collect::<Vec<String>>()
244- < a href =#244 id =244 data-nosnippet > 244</ a > })
245- < a href =#245 id =245 data-nosnippet > 245</ a > }
199+ < a href =#199 id =199 data-nosnippet > 199</ a > )< span class ="question-mark "> ?</ span > ;
200+ < a href =#200 id =200 data-nosnippet > 200</ a > < span class ="kw "> if let </ span > < span class ="prelude-val "> Some</ span > (limit) = < span class ="self "> self</ span > .limit {
201+ < a href =#201 id =201 data-nosnippet > 201</ a > < span class ="macro "> write!</ span > (f, < span class ="string "> " limit:[{limit}]"</ span > )< span class ="question-mark "> ?</ span > ;
202+ < a href =#202 id =202 data-nosnippet > 202</ a > }
203+ < a href =#203 id =203 data-nosnippet > 203</ a > < span class ="prelude-val "> Ok</ span > (())
204+ < a href =#204 id =204 data-nosnippet > 204</ a > }
205+ < a href =#205 id =205 data-nosnippet > 205</ a > }
206+ < a href =#206 id =206 data-nosnippet > 206</ a >
207+ < a href =#207 id =207 data-nosnippet > 207</ a > < span class ="doccomment "> /// Asynchronously retrieves a stream of [`RecordBatch`] instances
208+ < a href =#208 id =208 data-nosnippet > 208</ a > /// from a given table.
209+ < a href =#209 id =209 data-nosnippet > 209</ a > ///
210+ < a href =#210 id =210 data-nosnippet > 210</ a > /// This function initializes a [`TableScan`], builds it,
211+ < a href =#211 id =211 data-nosnippet > 211</ a > /// and then converts it into a stream of Arrow [`RecordBatch`]es.
212+ < a href =#212 id =212 data-nosnippet > 212</ a > </ span > < span class ="kw "> async fn </ span > get_batch_stream(
213+ < a href =#213 id =213 data-nosnippet > 213</ a > table: Table,
214+ < a href =#214 id =214 data-nosnippet > 214</ a > snapshot_id: < span class ="prelude-ty "> Option</ span > <i64>,
215+ < a href =#215 id =215 data-nosnippet > 215</ a > column_names: < span class ="prelude-ty "> Option</ span > <Vec<String>>,
216+ < a href =#216 id =216 data-nosnippet > 216</ a > predicates: < span class ="prelude-ty "> Option</ span > <Predicate>,
217+ < a href =#217 id =217 data-nosnippet > 217</ a > ) -> DFResult<Pin<Box<< span class ="kw "> dyn </ span > Stream<Item = DFResult<RecordBatch>> + Send>>> {
218+ < a href =#218 id =218 data-nosnippet > 218</ a > < span class ="kw "> let </ span > scan_builder = < span class ="kw "> match </ span > snapshot_id {
219+ < a href =#219 id =219 data-nosnippet > 219</ a > < span class ="prelude-val "> Some</ span > (snapshot_id) => table.scan().snapshot_id(snapshot_id),
220+ < a href =#220 id =220 data-nosnippet > 220</ a > < span class ="prelude-val "> None </ span > => table.scan(),
221+ < a href =#221 id =221 data-nosnippet > 221</ a > };
222+ < a href =#222 id =222 data-nosnippet > 222</ a >
223+ < a href =#223 id =223 data-nosnippet > 223</ a > < span class ="kw "> let </ span > < span class ="kw-2 "> mut </ span > scan_builder = < span class ="kw "> match </ span > column_names {
224+ < a href =#224 id =224 data-nosnippet > 224</ a > < span class ="prelude-val "> Some</ span > (column_names) => scan_builder.select(column_names),
225+ < a href =#225 id =225 data-nosnippet > 225</ a > < span class ="prelude-val "> None </ span > => scan_builder.select_all(),
226+ < a href =#226 id =226 data-nosnippet > 226</ a > };
227+ < a href =#227 id =227 data-nosnippet > 227</ a > < span class ="kw "> if let </ span > < span class ="prelude-val "> Some</ span > (pred) = predicates {
228+ < a href =#228 id =228 data-nosnippet > 228</ a > scan_builder = scan_builder.with_filter(pred);
229+ < a href =#229 id =229 data-nosnippet > 229</ a > }
230+ < a href =#230 id =230 data-nosnippet > 230</ a > < span class ="kw "> let </ span > table_scan = scan_builder.build().map_err(to_datafusion_error)< span class ="question-mark "> ?</ span > ;
231+ < a href =#231 id =231 data-nosnippet > 231</ a >
232+ < a href =#232 id =232 data-nosnippet > 232</ a > < span class ="kw "> let </ span > stream = table_scan
233+ < a href =#233 id =233 data-nosnippet > 233</ a > .to_arrow()
234+ < a href =#234 id =234 data-nosnippet > 234</ a > .< span class ="kw "> await
235+ < a href =#235 id =235 data-nosnippet > 235</ a > </ span > .map_err(to_datafusion_error)< span class ="question-mark "> ?
236+ < a href =#236 id =236 data-nosnippet > 236</ a > </ span > .map_err(to_datafusion_error);
237+ < a href =#237 id =237 data-nosnippet > 237</ a > < span class ="prelude-val "> Ok</ span > (Box::pin(stream))
238+ < a href =#238 id =238 data-nosnippet > 238</ a > }
239+ < a href =#239 id =239 data-nosnippet > 239</ a >
240+ < a href =#240 id =240 data-nosnippet > 240</ a > < span class ="kw "> fn </ span > get_column_names(
241+ < a href =#241 id =241 data-nosnippet > 241</ a > schema: ArrowSchemaRef,
242+ < a href =#242 id =242 data-nosnippet > 242</ a > projection: < span class ="prelude-ty "> Option</ span > << span class ="kw-2 "> &</ span > Vec<usize>>,
243+ < a href =#243 id =243 data-nosnippet > 243</ a > ) -> < span class ="prelude-ty "> Option</ span > <Vec<String>> {
244+ < a href =#244 id =244 data-nosnippet > 244</ a > projection.map(|v| {
245+ < a href =#245 id =245 data-nosnippet > 245</ a > v.iter()
246+ < a href =#246 id =246 data-nosnippet > 246</ a > .map(|p| schema.field(< span class ="kw-2 "> *</ span > p).name().clone())
247+ < a href =#247 id =247 data-nosnippet > 247</ a > .collect::<Vec<String>>()
248+ < a href =#248 id =248 data-nosnippet > 248</ a > })
249+ < a href =#249 id =249 data-nosnippet > 249</ a > }
246250</ code > </ pre > </ div > </ section > </ main > </ body > </ html >
0 commit comments