Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ols/app/endpoints/ols.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ def check_tokens_available(

try:
for quota_limiter in quota_limiters:
# Reconcile quota limits with config before checking availability
# This ensures config changes take effect immediately on each request
if hasattr(quota_limiter, "reconcile_quota_limits"):
quota_limiter.reconcile_quota_limits()
quota_limiter.ensure_available_quota(subject_id=user_id)
except psycopg2.Error as pg_error:
message = "Error communicating with quota database backend"
Expand Down
3 changes: 2 additions & 1 deletion ols/runners/quota_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ def quota_scheduler(config: Optional[QuotaHandlersConfig]) -> bool:
logger.info("Quota scheduler sync started")
for name, limiter in config.limiters.limiters.items():
try:
# Handle periodic quota revocation
quota_revocation(connection, name, limiter)
except Exception as e:
logger.error("Quota revoke error: %s", e)
logger.error("Quota scheduler error for '%s': %s", name, e)
logger.info("Quota scheduler sync finished")
sleep(period)
# unreachable code
Expand Down
3 changes: 3 additions & 0 deletions ols/src/quota/quota_limiter_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ols.app.models.config import PostgresConfig, QuotaHandlersConfig
from ols.src.quota.cluster_quota_limiter import ClusterQuotaLimiter
from ols.src.quota.quota_limiter import QuotaLimiter
from ols.src.quota.revokable_quota_limiter import RevokableQuotaLimiter
from ols.src.quota.user_quota_limiter import UserQuotaLimiter

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -42,6 +43,8 @@ def quota_limiters(config: QuotaHandlersConfig) -> list[QuotaLimiter]:
limiter = QuotaLimiterFactory.create_limiter(
storage_config, limiter_type, initial_quota, increase_by
)
if isinstance(limiter, RevokableQuotaLimiter):
limiter.reconcile_quota_limits()
limiters.append(limiter)
logger.info("Set up quota limiter '%s'", name)
return limiters
Expand Down
98 changes: 98 additions & 0 deletions ols/src/quota/revokable_quota_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,28 @@ class RevokableQuotaLimiter(QuotaLimiter):
WHERE id=%s and subject=%s
"""

# Reconciliation SQL for user quotas - updates all user rows
RECONCILE_USER_QUOTA_LIMITS = """
UPDATE quota_limits
SET available = available + (%s - quota_limit),
quota_limit = %s,
updated_at = NOW()
WHERE subject = 'u'
AND quota_limit != %s
RETURNING id, quota_limit, available
"""

# Reconciliation SQL for cluster quotas - updates cluster row
RECONCILE_CLUSTER_QUOTA_LIMITS = """
UPDATE quota_limits
SET available = available + (%s - quota_limit),
quota_limit = %s,
updated_at = NOW()
WHERE subject = 'c'
AND quota_limit != %s
RETURNING id, quota_limit, available
"""

def __init__(
self,
initial_quota: int,
Expand All @@ -62,6 +84,20 @@ def __init__(
self.increase_by = increase_by
self.connection_config = connection_config

def reconcile_quota_limits(self) -> None:
"""Reconcile quota limits with current configuration.

Public method to trigger reconciliation after initialization.
Should be called after quota limiters are created to ensure
database quota limits match OLSConfig after updates.
"""
try:
self._reconcile_quota_limits()
except Exception as e:
logger.warning(
"Quota reconciliation failed (will retry on scheduler): %s", e
)

@connection
def available_quota(self, subject_id: str = "") -> int:
"""Retrieve available quota for given subject."""
Expand Down Expand Up @@ -173,3 +209,65 @@ def _init_quota(self, subject_id: str = "") -> None:
),
)
self.connection.commit()

def _reconcile_quota_limits(self) -> None:
"""Reconcile quota limits with current configuration.

Updates existing quota records when initial_quota changes while preserving
consumed tokens.
"""
# Select appropriate SQL based on subject type
if self.subject_type == "u":
reconcile_sql = RevokableQuotaLimiter.RECONCILE_USER_QUOTA_LIMITS
elif self.subject_type == "c":
reconcile_sql = RevokableQuotaLimiter.RECONCILE_CLUSTER_QUOTA_LIMITS
else:
logger.error(
"Unknown subject type '%s', skipping reconciliation", self.subject_type
)
return

try:
with self.connection.cursor() as cursor:
cursor.execute(
reconcile_sql,
(
self.initial_quota, # used in calculation: available + (new - old)
self.initial_quota, # new quota_limit to set
self.initial_quota, # WHERE quota_limit != ?
),
)
updated_rows = cursor.fetchall()
if updated_rows:
for row in updated_rows:
subject_id, new_limit, new_available = row
logger.info(
"Reconciled quota for subject='%s' type='%s': "
"quota_limit=%d, available=%d",
subject_id if subject_id else "(cluster)",
self.subject_type,
new_limit,
new_available,
)
logger.info(
"Quota reconciliation complete for subject type '%s': "
"updated %d record(s)",
self.subject_type,
len(updated_rows),
)
else:
logger.debug(
"No quota updates needed for subject type '%s' "
"(no records or all match current configuration)",
self.subject_type,
)

self.connection.commit()

except Exception as e:
logger.error(
"Failed to reconcile quota limits for subject type '%s': %s",
self.subject_type,
e,
)
raise
13 changes: 10 additions & 3 deletions ols/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,12 @@ def conversation_cache(self) -> Cache:
def quota_limiters(self) -> list[QuotaLimiter]:
"""Return all quota limiters."""
if self._quota_limiters is None:
self._quota_limiters = QuotaLimiterFactory.quota_limiters(
self.ols_config.quota_handlers
)
if self.ols_config.quota_handlers is not None:
self._quota_limiters = QuotaLimiterFactory.quota_limiters(
self.ols_config.quota_handlers
)
else:
self._quota_limiters = []
return self._quota_limiters

@property
Expand Down Expand Up @@ -159,6 +162,10 @@ def reload_from_yaml_file(
# values
self._query_filters = None
self._rag_index_loader = None
# reset quota limiters and token usage history to pick up
# new configuration values
self._quota_limiters = None
self._token_usage_history = None
except Exception as e:
print(f"Failed to load config file {config_file}: {e!s}")
print(traceback.format_exc())
Expand Down