3030import java .util .Map ;
3131import java .util .Objects ;
3232import java .util .concurrent .TimeUnit ;
33+ import java .util .function .BiFunction ;
3334import java .util .logging .Logger ;
3435import java .util .stream .Collectors ;
3536import java .util .stream .IntStream ;
@@ -187,14 +188,8 @@ public Stream<Object[]> query(@Nonnull final String query, @Nonnull final Map<St
187188 public Stream <Object []> query (@ Nonnull final String query ,
188189 @ Nonnull final Map <String , Object > parameters ,
189190 @ Nonnull final QueryOptions options ) {
190- return queryData (query , parameters , options )
191- .flatMap (vector -> IntStream .range (0 , vector .getRowCount ())
192- .mapToObj (rowNumber ->
193- VectorSchemaRootConverter .INSTANCE
194- .getArrayObjectFromVectorSchemaRoot (
195- vector ,
196- rowNumber
197- )));
191+ return queryDataAndProcess (query , parameters , options ,
192+ VectorSchemaRootConverter .INSTANCE ::getArrayObjectFromVectorSchemaRoot );
198193 }
199194
200195 @ Nonnull
@@ -222,14 +217,8 @@ public Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnu
222217 public Stream <Map <String , Object >> queryRows (@ Nonnull final String query ,
223218 @ Nonnull final Map <String , Object > parameters ,
224219 @ Nonnull final QueryOptions options ) {
225- return queryData (query , parameters , options )
226- .flatMap (vector -> IntStream .range (0 , vector .getRowCount ())
227- .mapToObj (rowNumber ->
228- VectorSchemaRootConverter .INSTANCE
229- .getMapFromVectorSchemaRoot (
230- vector ,
231- rowNumber
232- )));
220+ return queryDataAndProcess (query , parameters , options ,
221+ VectorSchemaRootConverter .INSTANCE ::getMapFromVectorSchemaRoot );
233222 }
234223
235224 @ Nonnull
@@ -255,13 +244,10 @@ public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull fin
255244 public Stream <PointValues > queryPoints (@ Nonnull final String query ,
256245 @ Nonnull final Map <String , Object > parameters ,
257246 @ Nonnull final QueryOptions options ) {
258- return queryData (query , parameters , options )
259- . flatMap (vector -> {
247+ return queryDataAndProcess (query , parameters , options ,
248+ (vector , rowNumber ) -> {
260249 List <FieldVector > fieldVectors = vector .getFieldVectors ();
261- return IntStream
262- .range (0 , vector .getRowCount ())
263- .mapToObj (row ->
264- VectorSchemaRootConverter .INSTANCE .toPointValues (row , fieldVectors ));
250+ return VectorSchemaRootConverter .INSTANCE .toPointValues (rowNumber , fieldVectors );
265251 });
266252 }
267253
@@ -389,6 +375,19 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
389375 }
390376 }
391377
378+ @ Nonnull
379+ private <T > Stream <T > queryDataAndProcess (@ Nonnull final String query ,
380+ @ Nonnull final Map <String , Object > parameters ,
381+ @ Nonnull final QueryOptions options ,
382+ @ Nonnull final BiFunction <VectorSchemaRoot , Integer , T > processor ) {
383+ return queryData (query , parameters , options )
384+ .flatMap (vector ->
385+ IntStream .range (0 , vector .getRowCount ())
386+ .mapToObj (rowNumber -> processor .apply (vector , rowNumber ))
387+ .onClose (vector ::close )
388+ );
389+ }
390+
392391 @ Nonnull
393392 private Stream <VectorSchemaRoot > queryData (@ Nonnull final String query ,
394393 @ Nonnull final Map <String , Object > parameters ,
0 commit comments