Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 74 additions & 33 deletions singer_sdk/streams/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import decimal
import logging
import typing as t
from contextlib import contextmanager
from functools import cached_property
from http import HTTPStatus
from urllib.parse import urlparse
Expand Down Expand Up @@ -105,8 +106,44 @@ def __init__(
self._http_headers: dict[str, str] = {}
self._http_method = http_method
self._requests_session = requests.Session()
self._current_paginator: BaseAPIPaginator | None = None
super().__init__(name=name, schema=schema, tap=tap)

@property
def paginator(self) -> BaseAPIPaginator:
"""Get the current paginator instance.

Only available during active stream processing (within request_records).
This allows access to pagination configuration from methods like get_url_params.

Returns:
The current paginator instance.

Raises:
RuntimeError: If accessed outside of request_records context.
"""
if self._current_paginator is None:
msg = (
"Paginator is only available during active stream processing. "
"Access it from methods called within the request_records lifecycle "
"(get_url_params, prepare_request_payload, etc.)."
)
raise RuntimeError(msg)
return self._current_paginator

@contextmanager
def _paginator_context(self) -> t.Iterator[BaseAPIPaginator]:
"""Context manager for paginator lifecycle.

Yields:
A fresh paginator instance for this request context.
"""
self._current_paginator = self.get_new_paginator() or SinglePagePaginator()
try:
yield self._current_paginator
finally:
self._current_paginator = None

@staticmethod
def _url_encode(val: str | datetime | bool | int | list[str]) -> str: # noqa: FBT001
"""Encode the val argument as url-compatible string.
Expand Down Expand Up @@ -350,6 +387,10 @@ def get_url_params( # noqa: PLR6301

If paging is supported, developers may override with specific paging logic.

The paginator instance is available as ``self.paginator`` during request
processing, allowing access to pagination configuration without duplicating
logic across methods.

If your source needs special handling and, for example, parentheses should not
be encoded, you can return a string constructed with
:py:func:`urllib.parse.urlencode`:
Expand Down Expand Up @@ -447,40 +488,40 @@ def request_records(self, context: Context | None) -> t.Iterable[dict]:
Yields:
An item for every record in the response.
"""
paginator = self.get_new_paginator() or SinglePagePaginator()
decorated_request = self.request_decorator(self._request)
pages = 0

with metrics.http_request_counter(self.name, self.path) as request_counter:
request_counter.context = context

while not paginator.finished:
prepared_request = self.prepare_request(
context,
next_page_token=paginator.current_value,
)
resp = decorated_request(prepared_request, context)
request_counter.increment()
self.update_sync_costs(prepared_request, resp, context)
records = iter(self.parse_response(resp))
try:
first_record = next(records)
except StopIteration:
if paginator.continue_if_empty(resp):
paginator.advance(resp)
continue

self.logger.info(
"Pagination stopped after %d pages because no records were "
"found in the last response",
pages,
)
break
yield first_record
yield from records
pages += 1
with self._paginator_context() as paginator:
decorated_request = self.request_decorator(self._request)
pages = 0

paginator.advance(resp)
with metrics.http_request_counter(self.name, self.path) as request_counter:
request_counter.context = context

while not paginator.finished:
prepared_request = self.prepare_request(
context,
next_page_token=paginator.current_value,
)
resp = decorated_request(prepared_request, context)
request_counter.increment()
self.update_sync_costs(prepared_request, resp, context)
records = iter(self.parse_response(resp))
try:
first_record = next(records)
except StopIteration:
if paginator.continue_if_empty(resp):
paginator.advance(resp)
continue

self.logger.info(
"Pagination stopped after %d pages because no records were "
"found in the last response",
pages,
)
break
yield first_record
yield from records
pages += 1

paginator.advance(resp)

def _write_request_duration_log(
self,
Expand Down
Loading
Loading