datafusion_physical_plan/sorts/
stream.rs1use crate::sorts::cursor::{ArrayValues, CursorArray, RowValues};
19use crate::SendableRecordBatchStream;
20use crate::{PhysicalExpr, PhysicalSortExpr};
21use arrow::array::Array;
22use arrow::datatypes::Schema;
23use arrow::record_batch::RecordBatch;
24use arrow::row::{RowConverter, Rows, SortField};
25use datafusion_common::{internal_datafusion_err, Result};
26use datafusion_execution::memory_pool::MemoryReservation;
27use datafusion_physical_expr_common::sort_expr::LexOrdering;
28use futures::stream::{Fuse, StreamExt};
29use std::marker::PhantomData;
30use std::sync::Arc;
31use std::task::{ready, Context, Poll};
32
33pub trait PartitionedStream: std::fmt::Debug + Send {
40 type Output;
41
42 fn partitions(&self) -> usize;
44
45 fn poll_next(
46 &mut self,
47 cx: &mut Context<'_>,
48 stream_idx: usize,
49 ) -> Poll<Option<Self::Output>>;
50}
51
52struct FusedStreams(Vec<Fuse<SendableRecordBatchStream>>);
55
56impl std::fmt::Debug for FusedStreams {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("FusedStreams")
59 .field("num_streams", &self.0.len())
60 .finish()
61 }
62}
63
64impl FusedStreams {
65 fn poll_next(
66 &mut self,
67 cx: &mut Context<'_>,
68 stream_idx: usize,
69 ) -> Poll<Option<Result<RecordBatch>>> {
70 loop {
71 match ready!(self.0[stream_idx].poll_next_unpin(cx)) {
72 Some(Ok(b)) if b.num_rows() == 0 => continue,
73 r => return Poll::Ready(r),
74 }
75 }
76 }
77}
78
79#[derive(Debug)]
81struct ReusableRows {
82 inner: Vec<[Option<Arc<Rows>>; 2]>,
88}
89
90impl ReusableRows {
91 fn take_next(&mut self, stream_idx: usize) -> Result<Rows> {
94 Arc::try_unwrap(self.inner[stream_idx][1].take().unwrap()).map_err(|_| {
95 internal_datafusion_err!(
96 "Rows from RowCursorStream is still in use by consumer"
97 )
98 })
99 }
100 fn save(&mut self, stream_idx: usize, rows: Arc<Rows>) {
102 self.inner[stream_idx][1] = Some(Arc::clone(&rows));
103 let [a, b] = &mut self.inner[stream_idx];
105 std::mem::swap(a, b);
106 }
107}
108
109#[derive(Debug)]
114pub struct RowCursorStream {
115 converter: RowConverter,
117 column_expressions: Vec<Arc<dyn PhysicalExpr>>,
119 streams: FusedStreams,
121 reservation: MemoryReservation,
123 rows: ReusableRows,
126}
127
128impl RowCursorStream {
129 pub fn try_new(
130 schema: &Schema,
131 expressions: &LexOrdering,
132 streams: Vec<SendableRecordBatchStream>,
133 reservation: MemoryReservation,
134 ) -> Result<Self> {
135 let sort_fields = expressions
136 .iter()
137 .map(|expr| {
138 let data_type = expr.expr.data_type(schema)?;
139 Ok(SortField::new_with_options(data_type, expr.options))
140 })
141 .collect::<Result<Vec<_>>>()?;
142
143 let streams: Vec<_> = streams.into_iter().map(|s| s.fuse()).collect();
144 let converter = RowConverter::new(sort_fields)?;
145 let mut rows = Vec::with_capacity(streams.len());
146 for _ in &streams {
147 rows.push([
149 Some(Arc::new(converter.empty_rows(0, 0))),
150 Some(Arc::new(converter.empty_rows(0, 0))),
151 ]);
152 }
153 Ok(Self {
154 converter,
155 reservation,
156 column_expressions: expressions.iter().map(|x| Arc::clone(&x.expr)).collect(),
157 streams: FusedStreams(streams),
158 rows: ReusableRows { inner: rows },
159 })
160 }
161
162 fn convert_batch(
163 &mut self,
164 batch: &RecordBatch,
165 stream_idx: usize,
166 ) -> Result<RowValues> {
167 let cols = self
168 .column_expressions
169 .iter()
170 .map(|expr| expr.evaluate(batch)?.into_array(batch.num_rows()))
171 .collect::<Result<Vec<_>>>()?;
172
173 let mut rows = self.rows.take_next(stream_idx)?;
175
176 rows.clear();
177
178 self.converter.append(&mut rows, &cols)?;
179 self.reservation.try_resize(self.converter.size())?;
180
181 let rows = Arc::new(rows);
182
183 self.rows.save(stream_idx, Arc::clone(&rows));
184
185 let mut rows_reservation = self.reservation.new_empty();
187 rows_reservation.try_grow(rows.size())?;
188 Ok(RowValues::new(rows, rows_reservation))
189 }
190}
191
192impl PartitionedStream for RowCursorStream {
193 type Output = Result<(RowValues, RecordBatch)>;
194
195 fn partitions(&self) -> usize {
196 self.streams.0.len()
197 }
198
199 fn poll_next(
200 &mut self,
201 cx: &mut Context<'_>,
202 stream_idx: usize,
203 ) -> Poll<Option<Self::Output>> {
204 Poll::Ready(ready!(self.streams.poll_next(cx, stream_idx)).map(|r| {
205 r.and_then(|batch| {
206 let cursor = self.convert_batch(&batch, stream_idx)?;
207 Ok((cursor, batch))
208 })
209 }))
210 }
211}
212
213pub struct FieldCursorStream<T: CursorArray> {
215 sort: PhysicalSortExpr,
217 streams: FusedStreams,
219 reservation: MemoryReservation,
221 phantom: PhantomData<fn(T) -> T>,
222}
223
224impl<T: CursorArray> std::fmt::Debug for FieldCursorStream<T> {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 f.debug_struct("PrimitiveCursorStream")
227 .field("num_streams", &self.streams)
228 .finish()
229 }
230}
231
232impl<T: CursorArray> FieldCursorStream<T> {
233 pub fn new(
234 sort: PhysicalSortExpr,
235 streams: Vec<SendableRecordBatchStream>,
236 reservation: MemoryReservation,
237 ) -> Self {
238 let streams = streams.into_iter().map(|s| s.fuse()).collect();
239 Self {
240 sort,
241 streams: FusedStreams(streams),
242 reservation,
243 phantom: Default::default(),
244 }
245 }
246
247 fn convert_batch(&mut self, batch: &RecordBatch) -> Result<ArrayValues<T::Values>> {
248 let value = self.sort.expr.evaluate(batch)?;
249 let array = value.into_array(batch.num_rows())?;
250 let size_in_mem = array.get_buffer_memory_size();
251 let array = array.as_any().downcast_ref::<T>().expect("field values");
252 let mut array_reservation = self.reservation.new_empty();
253 array_reservation.try_grow(size_in_mem)?;
254 Ok(ArrayValues::new(
255 self.sort.options,
256 array,
257 array_reservation,
258 ))
259 }
260}
261
262impl<T: CursorArray> PartitionedStream for FieldCursorStream<T> {
263 type Output = Result<(ArrayValues<T::Values>, RecordBatch)>;
264
265 fn partitions(&self) -> usize {
266 self.streams.0.len()
267 }
268
269 fn poll_next(
270 &mut self,
271 cx: &mut Context<'_>,
272 stream_idx: usize,
273 ) -> Poll<Option<Self::Output>> {
274 Poll::Ready(ready!(self.streams.poll_next(cx, stream_idx)).map(|r| {
275 r.and_then(|batch| {
276 let cursor = self.convert_batch(&batch)?;
277 Ok((cursor, batch))
278 })
279 }))
280 }
281}