11from __future__ import annotations
22
3+ from opensearchpy .helpers import async_bulk
4+
35__all__ = ("BaseOSDB" ,)
46
57import contextlib
@@ -197,13 +199,35 @@ async def upsert(self, vo: str, doc_id: int, document: Any) -> None:
197199 response ,
198200 )
199201
202+ async def bulk_insert (self , index_name : str , docs : list [dict [str , Any ]]) -> None :
203+ """Bulk inserting to database."""
204+ n_inserted , failed = await async_bulk (
205+ self .client , actions = [doc | {"_index" : index_name } for doc in docs ]
206+ )
207+ logger .info ("Inserted %d documents to %s" , n_inserted , index_name )
208+
209+ if failed :
210+ logger .error ("Fail to insert %d documents to %s" , failed , index_name )
211+
200212 async def search (
201- self , parameters , search , sorts , * , per_page : int = 100 , page : int | None = None
202- ) -> list [dict [str , Any ]]:
213+ self ,
214+ parameters ,
215+ search ,
216+ sorts ,
217+ * ,
218+ per_page : int = 10000 ,
219+ page : int | None = None ,
220+ ) -> tuple [int , list [dict [str , Any ]]]:
203221 """Search the database for matching results.
204222
205223 See the DiracX search API documentation for details.
206224 """
225+ if page :
226+ if page < 1 :
227+ raise InvalidQueryError ("Page must be a positive integer" )
228+ if per_page < 1 :
229+ raise InvalidQueryError ("Per page must be a positive integer" )
230+
207231 body = {}
208232 if parameters :
209233 body ["_source" ] = parameters
@@ -213,7 +237,12 @@ async def search(
213237 for sort in sorts :
214238 field_name = sort ["parameter" ]
215239 field_type = self .fields .get (field_name , {}).get ("type" )
216- require_type ("sort" , field_name , field_type , {"keyword" , "long" , "date" })
240+ require_type (
241+ "sort" ,
242+ field_name ,
243+ field_type ,
244+ {"keyword" , "long" , "date" , "date_nanos" },
245+ )
217246 body ["sort" ].append ({field_name : {"order" : sort ["direction" ]}})
218247
219248 params = {}
@@ -226,17 +255,19 @@ async def search(
226255 )
227256 hits = [hit ["_source" ] for hit in response ["hits" ]["hits" ]]
228257
258+ total_hits = response ["hits" ]["total" ]["value" ]
259+
229260 # Dates are returned as strings, convert them to Python datetimes
230261 for hit in hits :
231262 for field_name in hit :
232263 if field_name not in self .fields :
233264 continue
234- if self .fields [field_name ]["type" ] == "date" :
265+ if self .fields [field_name ]["type" ] in [ "date" , "date_nanos" ] :
235266 hit [field_name ] = datetime .strptime (
236267 hit [field_name ], "%Y-%m-%dT%H:%M:%S.%f%z"
237268 )
238269
239- return hits
270+ return total_hits , hits
240271
241272
242273def require_type (operator , field_name , field_type , allowed_types ):
0 commit comments