datafusion_physical_plan/sorts/
stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::sorts::cursor::{ArrayValues, CursorArray, RowValues};
19use crate::SendableRecordBatchStream;
20use crate::{PhysicalExpr, PhysicalSortExpr};
21use arrow::array::Array;
22use arrow::datatypes::Schema;
23use arrow::record_batch::RecordBatch;
24use arrow::row::{RowConverter, Rows, SortField};
25use datafusion_common::{internal_datafusion_err, Result};
26use datafusion_execution::memory_pool::MemoryReservation;
27use datafusion_physical_expr_common::sort_expr::LexOrdering;
28use futures::stream::{Fuse, StreamExt};
29use std::marker::PhantomData;
30use std::sync::Arc;
31use std::task::{ready, Context, Poll};
32
33/// A [`Stream`](futures::Stream) that has multiple partitions that can
34/// be polled separately but not concurrently
35///
36/// Used by sort preserving merge to decouple the cursor merging logic from
37/// the source of the cursors, the intention being to allow preserving
38/// any row encoding performed for intermediate sorts
39pub trait PartitionedStream: std::fmt::Debug + Send {
40    type Output;
41
42    /// Returns the number of partitions
43    fn partitions(&self) -> usize;
44
45    fn poll_next(
46        &mut self,
47        cx: &mut Context<'_>,
48        stream_idx: usize,
49    ) -> Poll<Option<Self::Output>>;
50}
51
52/// A new type wrapper around a set of fused [`SendableRecordBatchStream`]
53/// that implements debug, and skips over empty [`RecordBatch`]
54struct FusedStreams(Vec<Fuse<SendableRecordBatchStream>>);
55
56impl std::fmt::Debug for FusedStreams {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("FusedStreams")
59            .field("num_streams", &self.0.len())
60            .finish()
61    }
62}
63
64impl FusedStreams {
65    fn poll_next(
66        &mut self,
67        cx: &mut Context<'_>,
68        stream_idx: usize,
69    ) -> Poll<Option<Result<RecordBatch>>> {
70        loop {
71            match ready!(self.0[stream_idx].poll_next_unpin(cx)) {
72                Some(Ok(b)) if b.num_rows() == 0 => continue,
73                r => return Poll::Ready(r),
74            }
75        }
76    }
77}
78
79/// A pair of `Arc<Rows>` that can be reused
80#[derive(Debug)]
81struct ReusableRows {
82    // inner[stream_idx] holds a two Arcs:
83    // at start of a new poll
84    // .0 is the rows from the previous poll (at start),
85    // .1 is the one that is being written to
86    // at end of a poll, .0 will be swapped with .1,
87    inner: Vec<[Option<Arc<Rows>>; 2]>,
88}
89
90impl ReusableRows {
91    // return a Rows for writing,
92    // does not clone if the existing rows can be reused
93    fn take_next(&mut self, stream_idx: usize) -> Result<Rows> {
94        Arc::try_unwrap(self.inner[stream_idx][1].take().unwrap()).map_err(|_| {
95            internal_datafusion_err!(
96                "Rows from RowCursorStream is still in use by consumer"
97            )
98        })
99    }
100    // save the Rows
101    fn save(&mut self, stream_idx: usize, rows: Arc<Rows>) {
102        self.inner[stream_idx][1] = Some(Arc::clone(&rows));
103        // swap the current with the previous one, so that the next poll can reuse the Rows from the previous poll
104        let [a, b] = &mut self.inner[stream_idx];
105        std::mem::swap(a, b);
106    }
107}
108
109/// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`]
110/// and computes [`RowValues`] based on the provided [`PhysicalSortExpr`]
111/// Note: the stream returns an error if the consumer buffers more than one RowValues (i.e. holds on to two RowValues
112/// from the same partition at the same time).
113#[derive(Debug)]
114pub struct RowCursorStream {
115    /// Converter to convert output of physical expressions
116    converter: RowConverter,
117    /// The physical expressions to sort by
118    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
119    /// Input streams
120    streams: FusedStreams,
121    /// Tracks the memory used by `converter`
122    reservation: MemoryReservation,
123    /// Allocated rows for each partition, we keep two to allow for buffering one
124    /// in the consumer of the stream
125    rows: ReusableRows,
126}
127
128impl RowCursorStream {
129    pub fn try_new(
130        schema: &Schema,
131        expressions: &LexOrdering,
132        streams: Vec<SendableRecordBatchStream>,
133        reservation: MemoryReservation,
134    ) -> Result<Self> {
135        let sort_fields = expressions
136            .iter()
137            .map(|expr| {
138                let data_type = expr.expr.data_type(schema)?;
139                Ok(SortField::new_with_options(data_type, expr.options))
140            })
141            .collect::<Result<Vec<_>>>()?;
142
143        let streams: Vec<_> = streams.into_iter().map(|s| s.fuse()).collect();
144        let converter = RowConverter::new(sort_fields)?;
145        let mut rows = Vec::with_capacity(streams.len());
146        for _ in &streams {
147            // Initialize each stream with an empty Rows
148            rows.push([
149                Some(Arc::new(converter.empty_rows(0, 0))),
150                Some(Arc::new(converter.empty_rows(0, 0))),
151            ]);
152        }
153        Ok(Self {
154            converter,
155            reservation,
156            column_expressions: expressions.iter().map(|x| Arc::clone(&x.expr)).collect(),
157            streams: FusedStreams(streams),
158            rows: ReusableRows { inner: rows },
159        })
160    }
161
162    fn convert_batch(
163        &mut self,
164        batch: &RecordBatch,
165        stream_idx: usize,
166    ) -> Result<RowValues> {
167        let cols = self
168            .column_expressions
169            .iter()
170            .map(|expr| expr.evaluate(batch)?.into_array(batch.num_rows()))
171            .collect::<Result<Vec<_>>>()?;
172
173        // At this point, ownership should of this Rows should be unique
174        let mut rows = self.rows.take_next(stream_idx)?;
175
176        rows.clear();
177
178        self.converter.append(&mut rows, &cols)?;
179        self.reservation.try_resize(self.converter.size())?;
180
181        let rows = Arc::new(rows);
182
183        self.rows.save(stream_idx, Arc::clone(&rows));
184
185        // track the memory in the newly created Rows.
186        let mut rows_reservation = self.reservation.new_empty();
187        rows_reservation.try_grow(rows.size())?;
188        Ok(RowValues::new(rows, rows_reservation))
189    }
190}
191
192impl PartitionedStream for RowCursorStream {
193    type Output = Result<(RowValues, RecordBatch)>;
194
195    fn partitions(&self) -> usize {
196        self.streams.0.len()
197    }
198
199    fn poll_next(
200        &mut self,
201        cx: &mut Context<'_>,
202        stream_idx: usize,
203    ) -> Poll<Option<Self::Output>> {
204        Poll::Ready(ready!(self.streams.poll_next(cx, stream_idx)).map(|r| {
205            r.and_then(|batch| {
206                let cursor = self.convert_batch(&batch, stream_idx)?;
207                Ok((cursor, batch))
208            })
209        }))
210    }
211}
212
213/// Specialized stream for sorts on single primitive columns
214pub struct FieldCursorStream<T: CursorArray> {
215    /// The physical expressions to sort by
216    sort: PhysicalSortExpr,
217    /// Input streams
218    streams: FusedStreams,
219    /// Create new reservations for each array
220    reservation: MemoryReservation,
221    phantom: PhantomData<fn(T) -> T>,
222}
223
224impl<T: CursorArray> std::fmt::Debug for FieldCursorStream<T> {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        f.debug_struct("PrimitiveCursorStream")
227            .field("num_streams", &self.streams)
228            .finish()
229    }
230}
231
232impl<T: CursorArray> FieldCursorStream<T> {
233    pub fn new(
234        sort: PhysicalSortExpr,
235        streams: Vec<SendableRecordBatchStream>,
236        reservation: MemoryReservation,
237    ) -> Self {
238        let streams = streams.into_iter().map(|s| s.fuse()).collect();
239        Self {
240            sort,
241            streams: FusedStreams(streams),
242            reservation,
243            phantom: Default::default(),
244        }
245    }
246
247    fn convert_batch(&mut self, batch: &RecordBatch) -> Result<ArrayValues<T::Values>> {
248        let value = self.sort.expr.evaluate(batch)?;
249        let array = value.into_array(batch.num_rows())?;
250        let size_in_mem = array.get_buffer_memory_size();
251        let array = array.as_any().downcast_ref::<T>().expect("field values");
252        let mut array_reservation = self.reservation.new_empty();
253        array_reservation.try_grow(size_in_mem)?;
254        Ok(ArrayValues::new(
255            self.sort.options,
256            array,
257            array_reservation,
258        ))
259    }
260}
261
262impl<T: CursorArray> PartitionedStream for FieldCursorStream<T> {
263    type Output = Result<(ArrayValues<T::Values>, RecordBatch)>;
264
265    fn partitions(&self) -> usize {
266        self.streams.0.len()
267    }
268
269    fn poll_next(
270        &mut self,
271        cx: &mut Context<'_>,
272        stream_idx: usize,
273    ) -> Poll<Option<Self::Output>> {
274        Poll::Ready(ready!(self.streams.poll_next(cx, stream_idx)).map(|r| {
275            r.and_then(|batch| {
276                let cursor = self.convert_batch(&batch)?;
277                Ok((cursor, batch))
278            })
279        }))
280    }
281}