Skip to content

Commit b3b3afe

Browse files
committed
Move generate_series projection logic into LazyMemoryStream
1 parent 0a650a0 commit b3b3afe

File tree

4 files changed

+49
-28
lines changed

4 files changed

+49
-28
lines changed

datafusion/core/tests/execution/coop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ fn make_lazy_exec_with_range(
148148
let generator: Arc<RwLock<dyn LazyBatchGenerator>> = Arc::new(RwLock::new(gen));
149149

150150
// Create a LazyMemoryExec with one partition using our generator
151-
let mut exec = LazyMemoryExec::try_new(schema, vec![generator]).unwrap();
151+
let mut exec = LazyMemoryExec::try_new(schema, None, vec![generator]).unwrap();
152152

153153
exec.add_ordering(vec![PhysicalSortExpr::new(
154154
Arc::new(Column::new(column_name, 0)),

datafusion/functions-table/src/generate_series.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,6 @@ impl GenerateSeriesTable {
237237
pub fn as_generator(
238238
&self,
239239
batch_size: usize,
240-
projection: Option<Vec<usize>>,
241240
) -> Result<Arc<RwLock<dyn LazyBatchGenerator>>> {
242241
let generator: Arc<RwLock<dyn LazyBatchGenerator>> = match &self.args {
243242
GenSeriesArgs::ContainsNull { name } => Arc::new(RwLock::new(Empty { name })),
@@ -256,7 +255,6 @@ impl GenerateSeriesTable {
256255
batch_size,
257256
include_end: *include_end,
258257
name,
259-
projection,
260258
})),
261259
GenSeriesArgs::TimestampArgs {
262260
start,
@@ -297,7 +295,6 @@ impl GenerateSeriesTable {
297295
batch_size,
298296
include_end: *include_end,
299297
name,
300-
projection,
301298
}))
302299
}
303300
GenSeriesArgs::DateArgs {
@@ -327,7 +324,6 @@ impl GenerateSeriesTable {
327324
batch_size,
328325
include_end: *include_end,
329326
name,
330-
projection,
331327
})),
332328
};
333329

@@ -345,7 +341,6 @@ pub struct GenericSeriesState<T: SeriesValue> {
345341
current: T,
346342
include_end: bool,
347343
name: &'static str,
348-
projection: Option<Vec<usize>>,
349344
}
350345

351346
impl<T: SeriesValue> GenericSeriesState<T> {
@@ -401,11 +396,7 @@ impl<T: SeriesValue> LazyBatchGenerator for GenericSeriesState<T> {
401396

402397
let array = self.current.create_array(buf)?;
403398
let batch = RecordBatch::try_new(Arc::clone(&self.schema), vec![array])?;
404-
let projected = match self.projection.as_ref() {
405-
Some(projection) => batch.project(projection)?,
406-
None => batch,
407-
};
408-
Ok(Some(projected))
399+
Ok(Some(batch))
409400
}
410401
}
411402

@@ -481,14 +472,13 @@ impl TableProvider for GenerateSeriesTable {
481472
_limit: Option<usize>,
482473
) -> Result<Arc<dyn ExecutionPlan>> {
483474
let batch_size = state.config_options().execution.batch_size;
484-
let schema = match projection {
485-
Some(projection) => Arc::new(self.schema.project(projection)?),
486-
None => self.schema(),
487-
};
488-
489-
let generator = self.as_generator(batch_size, projection.cloned())?;
475+
let generator = self.as_generator(batch_size)?;
490476

491-
Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
477+
Ok(Arc::new(LazyMemoryExec::try_new(
478+
self.schema(),
479+
projection.cloned(),
480+
vec![generator],
481+
)?))
492482
}
493483
}
494484

datafusion/physical-plan/src/memory.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
153153
pub struct LazyMemoryExec {
154154
/// Schema representing the data
155155
schema: SchemaRef,
156+
/// Optional projection for which columns to load
157+
projection: Option<Vec<usize>>,
156158
/// Functions to generate batches for each partition
157159
batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
158160
/// Plan properties cache storing equivalence properties, partitioning, and execution mode
@@ -165,6 +167,7 @@ impl LazyMemoryExec {
165167
/// Create a new lazy memory execution plan
166168
pub fn try_new(
167169
schema: SchemaRef,
170+
projection: Option<Vec<usize>>,
168171
generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
169172
) -> Result<Self> {
170173
let boundedness = generators
@@ -189,6 +192,11 @@ impl LazyMemoryExec {
189192
})
190193
.unwrap_or(Boundedness::Bounded);
191194

195+
let schema = match projection.as_ref() {
196+
Some(columns) => Arc::new(schema.project(columns)?),
197+
None => schema,
198+
};
199+
192200
let cache = PlanProperties::new(
193201
EquivalenceProperties::new(Arc::clone(&schema)),
194202
Partitioning::RoundRobinBatch(generators.len()),
@@ -199,6 +207,7 @@ impl LazyMemoryExec {
199207

200208
Ok(Self {
201209
schema,
210+
projection,
202211
batch_generators: generators,
203212
cache,
204213
metrics: ExecutionPlanMetricsSet::new(),
@@ -320,6 +329,7 @@ impl ExecutionPlan for LazyMemoryExec {
320329

321330
let stream = LazyMemoryStream {
322331
schema: Arc::clone(&self.schema),
332+
projection: self.projection.clone(),
323333
generator: Arc::clone(&self.batch_generators[partition]),
324334
baseline_metrics,
325335
};
@@ -338,6 +348,8 @@ impl ExecutionPlan for LazyMemoryExec {
338348
/// Stream that generates record batches on demand
339349
pub struct LazyMemoryStream {
340350
schema: SchemaRef,
351+
/// Optional projection for which columns to load
352+
projection: Option<Vec<usize>>,
341353
/// Generator to produce batches
342354
///
343355
/// Note: Idiomatically, DataFusion uses plan-time parallelism - each stream
@@ -361,7 +373,14 @@ impl Stream for LazyMemoryStream {
361373
let batch = self.generator.write().generate_next_batch();
362374

363375
let poll = match batch {
364-
Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
376+
Ok(Some(batch)) => {
377+
// return just the columns requested
378+
let batch = match self.projection.as_ref() {
379+
Some(columns) => batch.project(columns)?,
380+
None => batch,
381+
};
382+
Poll::Ready(Some(Ok(batch)))
383+
}
365384
Ok(None) => Poll::Ready(None),
366385
Err(e) => Poll::Ready(Some(Err(e))),
367386
};
@@ -434,8 +453,11 @@ mod lazy_memory_tests {
434453
schema: Arc::clone(&schema),
435454
};
436455

437-
let exec =
438-
LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
456+
let exec = LazyMemoryExec::try_new(
457+
schema,
458+
None,
459+
vec![Arc::new(RwLock::new(generator))],
460+
)?;
439461

440462
// Test schema
441463
assert_eq!(exec.schema().fields().len(), 1);
@@ -485,8 +507,11 @@ mod lazy_memory_tests {
485507
schema: Arc::clone(&schema),
486508
};
487509

488-
let exec =
489-
LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
510+
let exec = LazyMemoryExec::try_new(
511+
schema,
512+
None,
513+
vec![Arc::new(RwLock::new(generator))],
514+
)?;
490515

491516
// Test invalid partition
492517
let result = exec.execute(1, Arc::new(TaskContext::default()));
@@ -519,8 +544,11 @@ mod lazy_memory_tests {
519544
schema: Arc::clone(&schema),
520545
};
521546

522-
let exec =
523-
LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
547+
let exec = LazyMemoryExec::try_new(
548+
schema,
549+
None,
550+
vec![Arc::new(RwLock::new(generator))],
551+
)?;
524552
let task_ctx = Arc::new(TaskContext::default());
525553

526554
let stream = exec.execute(0, task_ctx)?;

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,10 +1940,13 @@ impl protobuf::PhysicalPlanNode {
19401940
};
19411941

19421942
let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
1943-
let generator =
1944-
table.as_generator(generate_series.target_batch_size as usize, None)?;
1943+
let generator = table.as_generator(generate_series.target_batch_size as usize)?;
19451944

1946-
Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
1945+
Ok(Arc::new(LazyMemoryExec::try_new(
1946+
schema,
1947+
None,
1948+
vec![generator],
1949+
)?))
19471950
}
19481951

19491952
fn try_into_cooperative_physical_plan(

0 commit comments

Comments
 (0)