Skip to content

Commit af45ef6

Browse files
committed
fixed StreamT annotations, TPorTopicSet can also be optional
1 parent ff75c0b commit af45ef6

File tree

4 files changed

+27
-15
lines changed

4 files changed

+27
-15
lines changed

faust/streams.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]:
594594

595595
async def noack_take(
596596
self, max_: int, within: Seconds
597-
) -> AsyncIterable[Sequence[T_co]]:
597+
) -> AsyncIterable[Sequence[EventT[T_co]]]:
598598
"""
599599
Buffer n values at a time and yield a list of buffered values.
600600
:param max_: Max number of messages to receive. When more than this
@@ -606,7 +606,7 @@ async def noack_take(
606606
the agent is likely to stall and block buffered events for an
607607
unreasonable length of time(!).
608608
"""
609-
buffer: List[T_co] = []
609+
buffer: List[EventT[T_co]] = []
610610
events: List[EventT] = []
611611
buffer_add = buffer.append
612612
event_add = events.append
@@ -635,7 +635,7 @@ async def add_to_buffer(value: T) -> T:
635635

636636
# We want to save events instead of values to allow for manual ack
637637
event = self.current_event
638-
buffer_add(cast(T_co, event))
638+
buffer_add(event)
639639
if event is None:
640640
raise RuntimeError("Take buffer found current_event is None")
641641

faust/types/events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class _SchemaT: ... # noqa
3434
class EventT(Generic[T], AsyncContextManager):
3535
app: _AppT
3636
key: K
37-
value: V
37+
value: T
3838
headers: Mapping
3939
message: Message
4040
acked: bool

faust/types/streams.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
Tuple,
1717
TypeVar,
1818
Union,
19-
no_type_check,
2019
)
2120

2221
from mode import Seconds, ServiceT
@@ -149,22 +148,35 @@ def info(self) -> Mapping[str, Any]: ...
149148
def clone(self, **kwargs: Any) -> "StreamT": ...
150149

151150
@abc.abstractmethod
152-
@no_type_check
153-
async def items(self) -> AsyncIterator[Tuple[K, T_co]]: ...
151+
def noack(self) -> "StreamT": ...
154152

155153
@abc.abstractmethod
156-
@no_type_check
157-
async def events(self) -> AsyncIterable[EventT]: ...
154+
def items(self) -> AsyncIterator[Tuple[K, T_co]]: ...
158155

159156
@abc.abstractmethod
160-
@no_type_check
161-
async def take(
157+
def events(self) -> AsyncIterable[EventT]: ...
158+
159+
@abc.abstractmethod
160+
def take(self, max_: int, within: Seconds) -> AsyncIterable[Sequence[T_co]]: ...
161+
162+
@abc.abstractmethod
163+
def take_events(
162164
self, max_: int, within: Seconds
165+
) -> AsyncIterable[Sequence[EventT[T_co]]]: ...
166+
167+
@abc.abstractmethod
168+
def take_with_timestamp(
169+
self, max_: int, within: Seconds, timestamp_field_name: str
163170
) -> AsyncIterable[Sequence[T_co]]: ...
164171

165172
@abc.abstractmethod
166173
def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]: ...
167174

175+
@abc.abstractmethod
176+
def noack_take(
177+
self, max_: int, within: Seconds
178+
) -> AsyncIterable[Sequence[EventT[T_co]]]: ...
179+
168180
@abc.abstractmethod
169181
def through(self, channel: Union[str, ChannelT]) -> "StreamT": ...
170182

@@ -180,6 +192,9 @@ def group_by(
180192
topic: Optional[TopicT] = None,
181193
) -> "StreamT": ...
182194

195+
@abc.abstractmethod
196+
def filter(self, fun: Processor[T]) -> "StreamT": ...
197+
183198
@abc.abstractmethod
184199
def derive_topic(
185200
self,
@@ -198,9 +213,6 @@ async def throw(self, exc: BaseException) -> None: ...
198213
@abc.abstractmethod
199214
def __copy__(self) -> "StreamT": ...
200215

201-
@abc.abstractmethod
202-
def __iter__(self) -> Any: ...
203-
204216
@abc.abstractmethod
205217
def __next__(self) -> T: ...
206218

faust/types/transports.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class _AppT: ... # noqa
5757

5858
#: Argument to Consumer.commit to specify topics/tps to commit.
5959
TPorTopic = Union[str, TP]
60-
TPorTopicSet = AbstractSet[TPorTopic]
60+
TPorTopicSet = Optional[AbstractSet[TPorTopic]]
6161

6262
#: Callback (:keyword:`async def`) called when consumer partitions are revoked.
6363
PartitionsRevokedCallback = Callable[[Set[TP]], Awaitable[None]]

0 commit comments

Comments
 (0)