Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions faust/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]:

async def noack_take(
self, max_: int, within: Seconds
) -> AsyncIterable[Sequence[T_co]]:
) -> AsyncIterable[Sequence[EventT[T_co]]]:
"""
Buffer n values at a time and yield a list of buffered values.
:param max_: Max number of messages to receive. When more than this
Expand All @@ -606,7 +606,7 @@ async def noack_take(
the agent is likely to stall and block buffered events for an
unreasonable length of time(!).
"""
buffer: List[T_co] = []
buffer: List[EventT[T_co]] = []
events: List[EventT] = []
buffer_add = buffer.append
event_add = events.append
Expand Down Expand Up @@ -635,7 +635,7 @@ async def add_to_buffer(value: T) -> T:

# We want to save events instead of values to allow for manual ack
event = self.current_event
buffer_add(cast(T_co, event))
buffer_add(event)
if event is None:
raise RuntimeError("Take buffer found current_event is None")

Expand Down
2 changes: 1 addition & 1 deletion faust/types/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class _SchemaT: ... # noqa
class EventT(Generic[T], AsyncContextManager):
app: _AppT
key: K
value: V
value: T
headers: Mapping
message: Message
acked: bool
Expand Down
35 changes: 22 additions & 13 deletions faust/types/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
Tuple,
TypeVar,
Union,
no_type_check,
)

from mode import Seconds, ServiceT
Expand Down Expand Up @@ -149,22 +148,35 @@ def info(self) -> Mapping[str, Any]: ...
def clone(self, **kwargs: Any) -> "StreamT": ...

@abc.abstractmethod
@no_type_check
async def items(self) -> AsyncIterator[Tuple[K, T_co]]: ...
def noack(self) -> "StreamT": ...

@abc.abstractmethod
@no_type_check
async def events(self) -> AsyncIterable[EventT]: ...
def items(self) -> AsyncIterator[Tuple[K, T_co]]: ...

@abc.abstractmethod
@no_type_check
async def take(
def events(self) -> AsyncIterable[EventT]: ...

@abc.abstractmethod
def take(self, max_: int, within: Seconds) -> AsyncIterable[Sequence[T_co]]: ...

@abc.abstractmethod
def take_events(
self, max_: int, within: Seconds
) -> AsyncIterable[Sequence[EventT[T_co]]]: ...

@abc.abstractmethod
def take_with_timestamp(
self, max_: int, within: Seconds, timestamp_field_name: str
) -> AsyncIterable[Sequence[T_co]]: ...

@abc.abstractmethod
def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]: ...

@abc.abstractmethod
def noack_take(
self, max_: int, within: Seconds
) -> AsyncIterable[Sequence[EventT[T_co]]]: ...

@abc.abstractmethod
def through(self, channel: Union[str, ChannelT]) -> "StreamT": ...

Expand All @@ -180,6 +192,9 @@ def group_by(
topic: Optional[TopicT] = None,
) -> "StreamT": ...

@abc.abstractmethod
def filter(self, fun: Processor[T]) -> "StreamT": ...

@abc.abstractmethod
def derive_topic(
self,
Expand All @@ -198,14 +213,8 @@ async def throw(self, exc: BaseException) -> None: ...
@abc.abstractmethod
def __copy__(self) -> "StreamT": ...

@abc.abstractmethod
def __iter__(self) -> Any: ...

@abc.abstractmethod
def __next__(self) -> T: ...

@abc.abstractmethod
def __aiter__(self) -> AsyncIterator[T_co]: ...

@abc.abstractmethod
async def ack(self, event: EventT) -> bool: ...
2 changes: 1 addition & 1 deletion faust/types/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class _AppT: ... # noqa

#: Argument to Consumer.commit to specify topics/tps to commit.
TPorTopic = Union[str, TP]
TPorTopicSet = AbstractSet[TPorTopic]
TPorTopicSet = Optional[AbstractSet[TPorTopic]]

#: Callback (:keyword:`async def`) called when consumer partitions are revoked.
PartitionsRevokedCallback = Callable[[Set[TP]], Awaitable[None]]
Expand Down
Loading