|
21 | 21 | from synapse.api.errors import ShadowBanError, SynapseError, cs_error |
22 | 22 | from synapse.api.ratelimiting import Ratelimiter |
23 | 23 | from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME |
| 24 | +from synapse.http.site import SynapseRequest |
24 | 25 | from synapse.logging.context import make_deferred_yieldable |
25 | 26 | from synapse.logging.opentracing import set_tag |
26 | 27 | from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions |
|
30 | 31 | ) |
31 | 32 | from synapse.storage.databases.main.delayed_events import ( |
32 | 33 | DelayedEventDetails, |
33 | | - DelayID, |
34 | 34 | EventType, |
35 | 35 | StateKey, |
36 | 36 | Timestamp, |
37 | | - UserLocalpart, |
38 | 37 | ) |
39 | 38 | from synapse.storage.databases.main.state_deltas import StateDelta |
40 | 39 | from synapse.types import ( |
@@ -416,98 +415,66 @@ def on_added(self, next_send_ts: int) -> None: |
416 | 415 | if self._next_send_ts_changed(next_send_ts): |
417 | 416 | self._schedule_next_at(next_send_ts) |
418 | 417 |
|
419 | | - async def cancel(self, requester: Requester, delay_id: str) -> None: |
| 418 | + async def cancel(self, request: SynapseRequest, delay_id: str) -> None: |
420 | 419 | """ |
421 | 420 | Cancels the scheduled delivery of the matching delayed event. |
422 | 421 |
|
423 | | - Args: |
424 | | - requester: The owner of the delayed event to act on. |
425 | | - delay_id: The ID of the delayed event to act on. |
426 | | -
|
427 | 422 | Raises: |
428 | 423 | NotFoundError: if no matching delayed event could be found. |
429 | 424 | """ |
430 | 425 | assert self._is_master |
431 | 426 | await self._delayed_event_mgmt_ratelimiter.ratelimit( |
432 | | - requester, |
433 | | - (requester.user.to_string(), requester.device_id), |
| 427 | + None, request.getClientAddress().host |
434 | 428 | ) |
435 | 429 | await make_deferred_yieldable(self._initialized_from_db) |
436 | 430 |
|
437 | 431 | next_send_ts = await self._store.cancel_delayed_event( |
438 | | - delay_id=delay_id, |
439 | | - user_localpart=requester.user.localpart, |
440 | | - finalised_ts=self._get_current_ts(), |
| 432 | + delay_id, self._get_current_ts() |
441 | 433 | ) |
442 | 434 |
|
443 | 435 | if self._next_send_ts_changed(next_send_ts): |
444 | 436 | self._schedule_next_at_or_none(next_send_ts) |
445 | 437 |
|
446 | | - async def restart(self, requester: Requester, delay_id: str) -> None: |
| 438 | + async def restart(self, request: SynapseRequest, delay_id: str) -> None: |
447 | 439 | """ |
448 | 440 | Restarts the scheduled delivery of the matching delayed event. |
449 | 441 |
|
450 | | - Args: |
451 | | - requester: The owner of the delayed event to act on. |
452 | | - delay_id: The ID of the delayed event to act on. |
453 | | -
|
454 | 442 | Raises: |
455 | 443 | NotFoundError: if no matching delayed event could be found. |
456 | 444 | """ |
457 | 445 | assert self._is_master |
458 | 446 | await self._delayed_event_mgmt_ratelimiter.ratelimit( |
459 | | - requester, |
460 | | - (requester.user.to_string(), requester.device_id), |
| 447 | + None, request.getClientAddress().host |
461 | 448 | ) |
462 | 449 | await make_deferred_yieldable(self._initialized_from_db) |
463 | 450 |
|
464 | 451 | next_send_ts = await self._store.restart_delayed_event( |
465 | | - delay_id=delay_id, |
466 | | - user_localpart=requester.user.localpart, |
467 | | - current_ts=self._get_current_ts(), |
| 452 | + delay_id, self._get_current_ts() |
468 | 453 | ) |
469 | 454 |
|
470 | 455 | if self._next_send_ts_changed(next_send_ts): |
471 | 456 | self._schedule_next_at(next_send_ts) |
472 | 457 |
|
473 | | - async def send(self, requester: Requester, delay_id: str) -> None: |
| 458 | + async def send(self, request: SynapseRequest, delay_id: str) -> None: |
474 | 459 | """ |
475 | 460 | Immediately sends the matching delayed event, instead of waiting for its scheduled delivery. |
476 | 461 |
|
477 | | - Args: |
478 | | - requester: The owner of the delayed event to act on. |
479 | | - delay_id: The ID of the delayed event to act on. |
480 | | -
|
481 | 462 | Raises: |
482 | 463 | NotFoundError: if no matching delayed event could be found. |
483 | 464 | """ |
484 | 465 | assert self._is_master |
485 | | - # Use standard request limiter for sending delayed events on-demand, |
486 | | - # as an on-demand send is similar to sending a regular event. |
487 | | - await self._request_ratelimiter.ratelimit(requester) |
| 466 | + await self._delayed_event_mgmt_ratelimiter.ratelimit( |
| 467 | + None, request.getClientAddress().host |
| 468 | + ) |
488 | 469 | await make_deferred_yieldable(self._initialized_from_db) |
489 | 470 |
|
490 | | - event, next_send_ts = await self._store.process_target_delayed_event( |
491 | | - delay_id=delay_id, |
492 | | - user_localpart=requester.user.localpart, |
493 | | - ) |
| 471 | + event, next_send_ts = await self._store.process_target_delayed_event(delay_id) |
494 | 472 |
|
495 | 473 | if self._next_send_ts_changed(next_send_ts): |
496 | 474 | self._schedule_next_at_or_none(next_send_ts) |
497 | 475 |
|
498 | 476 | if event: |
499 | | - await self._send_event( |
500 | | - DelayedEventDetails( |
501 | | - delay_id=DelayID(delay_id), |
502 | | - user_localpart=UserLocalpart(requester.user.localpart), |
503 | | - room_id=event.room_id, |
504 | | - type=event.type, |
505 | | - state_key=event.state_key, |
506 | | - origin_server_ts=event.origin_server_ts, |
507 | | - content=event.content, |
508 | | - device_id=event.device_id, |
509 | | - ) |
510 | | - ) |
| 477 | + await self._send_event(event) |
511 | 478 |
|
512 | 479 | async def _send_on_timeout(self) -> None: |
513 | 480 | self._next_delayed_event_call = None |
@@ -674,7 +641,6 @@ async def _send_event( |
674 | 641 | try: |
675 | 642 | await self._store.finalise_processed_delayed_event( |
676 | 643 | event.delay_id, |
677 | | - event.user_localpart, |
678 | 644 | send_error or event_id, |
679 | 645 | finalised_ts, |
680 | 646 | ) |
|
0 commit comments