Skip to content

Commit 73e524d

Browse files
committed
Refactor multi stream append to use v2 protocol (#439)
* Refactor multi stream append to use v2 protocol
1 parent 6952543 commit 73e524d

32 files changed

+5220
-346
lines changed

docs/api/appending-events.md

Lines changed: 5 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -205,23 +205,12 @@ This feature is only available in KurrentDB 25.1 and later.
205205

206206
You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.
207207

208-
The `multiStreamAppend` method accepts a collection of `AppendStreamRequest` objects and returns a `MultiAppendResult`. Each `AppendStreamRequest` contains:
209-
210-
- **streamName** - The name of the stream
211-
- **expectedState** - The expected state of the stream for optimistic concurrency control
212-
- **events** - A collection of `EventData` objects to append
213-
214-
The operation returns either:
215-
- `AppendStreamSuccess` - Successful append results for all streams
216-
- `AppendStreamFailure` - Specific exceptions for any failed operations
217-
218208
::: warning
219-
Event metadata in `EventData` must be valid JSON objects. This requirement will
220-
be removed in a future major release.
209+
Currently, metadata must be valid JSON. Binary metadata will not be supported in
210+
this version. This limitation ensures compatibility with KurrentDB's metadata
211+
handling and will be removed in the next major release.
221212
:::
222213

223-
Here's a basic example of appending events to multiple streams:
224-
225214
```ts
226215
import { jsonEvent } from "@kurrent/kurrentdb-client";
227216
import { v4 as uuid } from "uuid";
@@ -265,43 +254,5 @@ const requests = [
265254
}
266255
];
267256

268-
const result = await client.multiStreamAppend(requests);
269-
270-
if (result.success) {
271-
result.output.forEach((success) => {
272-
console.log(`Stream '${success.streamName}' updated at position ${success.position}`);
273-
});
274-
}
275-
```
276-
277-
If the operation doesn't succeed, it can fail with the following exceptions:
278-
279-
```ts
280-
const result = await client.multiStreamAppend(requests);
281-
282-
if (!result.success) {
283-
result.output.forEach((failure) => {
284-
switch (failure.details.type) {
285-
case "wrong_expected_revision":
286-
console.log(`Version conflict in stream '${failure.streamName}': expected revision ${failure.details.revision}`);
287-
break;
288-
289-
case "access_denied":
290-
console.log(`Access denied to stream '${failure.streamName}': ${failure.details.reason}`);
291-
break;
292-
293-
case "stream_deleted":
294-
console.log(`Stream '${failure.streamName}' was deleted`);
295-
break;
296-
297-
case "transaction_max_size_exceeded":
298-
console.log(`Transaction too large for stream '${failure.streamName}': max size is ${failure.details.maxSize} bytes`);
299-
break;
300-
301-
default:
302-
console.log(`Unexpected error for stream '${failure.streamName}': ${failure.details.type}`);
303-
break;
304-
}
305-
});
306-
}
307-
```
257+
await client.multiStreamAppend(requests);
258+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
// GENERATED CODE -- NO SERVICES IN PROTO
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// package: kurrentdb.protocol.v2.streams.errors
2+
// file: kurrentdb/protocols/v2/streams/errors.proto
3+
4+
/* tslint:disable */
5+
/* eslint-disable */
6+
7+
import * as jspb from "google-protobuf";
8+
import * as kurrentdb_protocols_v2_rpc_pb from "../../../../kurrentdb/protocols/v2/rpc_pb";
9+
10+
export class StreamNotFoundErrorDetails extends jspb.Message {
11+
getStream(): string;
12+
setStream(value: string): StreamNotFoundErrorDetails;
13+
14+
serializeBinary(): Uint8Array;
15+
toObject(includeInstance?: boolean): StreamNotFoundErrorDetails.AsObject;
16+
static toObject(includeInstance: boolean, msg: StreamNotFoundErrorDetails): StreamNotFoundErrorDetails.AsObject;
17+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
18+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
19+
static serializeBinaryToWriter(message: StreamNotFoundErrorDetails, writer: jspb.BinaryWriter): void;
20+
static deserializeBinary(bytes: Uint8Array): StreamNotFoundErrorDetails;
21+
static deserializeBinaryFromReader(message: StreamNotFoundErrorDetails, reader: jspb.BinaryReader): StreamNotFoundErrorDetails;
22+
}
23+
24+
export namespace StreamNotFoundErrorDetails {
25+
export type AsObject = {
26+
stream: string,
27+
}
28+
}
29+
30+
export class StreamAlreadyExistsErrorDetails extends jspb.Message {
31+
getStream(): string;
32+
setStream(value: string): StreamAlreadyExistsErrorDetails;
33+
34+
serializeBinary(): Uint8Array;
35+
toObject(includeInstance?: boolean): StreamAlreadyExistsErrorDetails.AsObject;
36+
static toObject(includeInstance: boolean, msg: StreamAlreadyExistsErrorDetails): StreamAlreadyExistsErrorDetails.AsObject;
37+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
38+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
39+
static serializeBinaryToWriter(message: StreamAlreadyExistsErrorDetails, writer: jspb.BinaryWriter): void;
40+
static deserializeBinary(bytes: Uint8Array): StreamAlreadyExistsErrorDetails;
41+
static deserializeBinaryFromReader(message: StreamAlreadyExistsErrorDetails, reader: jspb.BinaryReader): StreamAlreadyExistsErrorDetails;
42+
}
43+
44+
export namespace StreamAlreadyExistsErrorDetails {
45+
export type AsObject = {
46+
stream: string,
47+
}
48+
}
49+
50+
export class StreamDeletedErrorDetails extends jspb.Message {
51+
getStream(): string;
52+
setStream(value: string): StreamDeletedErrorDetails;
53+
54+
serializeBinary(): Uint8Array;
55+
toObject(includeInstance?: boolean): StreamDeletedErrorDetails.AsObject;
56+
static toObject(includeInstance: boolean, msg: StreamDeletedErrorDetails): StreamDeletedErrorDetails.AsObject;
57+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
58+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
59+
static serializeBinaryToWriter(message: StreamDeletedErrorDetails, writer: jspb.BinaryWriter): void;
60+
static deserializeBinary(bytes: Uint8Array): StreamDeletedErrorDetails;
61+
static deserializeBinaryFromReader(message: StreamDeletedErrorDetails, reader: jspb.BinaryReader): StreamDeletedErrorDetails;
62+
}
63+
64+
export namespace StreamDeletedErrorDetails {
65+
export type AsObject = {
66+
stream: string,
67+
}
68+
}
69+
70+
export class StreamTombstonedErrorDetails extends jspb.Message {
71+
getStream(): string;
72+
setStream(value: string): StreamTombstonedErrorDetails;
73+
74+
serializeBinary(): Uint8Array;
75+
toObject(includeInstance?: boolean): StreamTombstonedErrorDetails.AsObject;
76+
static toObject(includeInstance: boolean, msg: StreamTombstonedErrorDetails): StreamTombstonedErrorDetails.AsObject;
77+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
78+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
79+
static serializeBinaryToWriter(message: StreamTombstonedErrorDetails, writer: jspb.BinaryWriter): void;
80+
static deserializeBinary(bytes: Uint8Array): StreamTombstonedErrorDetails;
81+
static deserializeBinaryFromReader(message: StreamTombstonedErrorDetails, reader: jspb.BinaryReader): StreamTombstonedErrorDetails;
82+
}
83+
84+
export namespace StreamTombstonedErrorDetails {
85+
export type AsObject = {
86+
stream: string,
87+
}
88+
}
89+
90+
export class StreamRevisionConflictErrorDetails extends jspb.Message {
91+
getStream(): string;
92+
setStream(value: string): StreamRevisionConflictErrorDetails;
93+
getExpectedRevision(): string;
94+
setExpectedRevision(value: string): StreamRevisionConflictErrorDetails;
95+
getActualRevision(): string;
96+
setActualRevision(value: string): StreamRevisionConflictErrorDetails;
97+
98+
serializeBinary(): Uint8Array;
99+
toObject(includeInstance?: boolean): StreamRevisionConflictErrorDetails.AsObject;
100+
static toObject(includeInstance: boolean, msg: StreamRevisionConflictErrorDetails): StreamRevisionConflictErrorDetails.AsObject;
101+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
102+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
103+
static serializeBinaryToWriter(message: StreamRevisionConflictErrorDetails, writer: jspb.BinaryWriter): void;
104+
static deserializeBinary(bytes: Uint8Array): StreamRevisionConflictErrorDetails;
105+
static deserializeBinaryFromReader(message: StreamRevisionConflictErrorDetails, reader: jspb.BinaryReader): StreamRevisionConflictErrorDetails;
106+
}
107+
108+
export namespace StreamRevisionConflictErrorDetails {
109+
export type AsObject = {
110+
stream: string,
111+
expectedRevision: string,
112+
actualRevision: string,
113+
}
114+
}
115+
116+
export class AppendRecordSizeExceededErrorDetails extends jspb.Message {
117+
getStream(): string;
118+
setStream(value: string): AppendRecordSizeExceededErrorDetails;
119+
getRecordId(): string;
120+
setRecordId(value: string): AppendRecordSizeExceededErrorDetails;
121+
getSize(): number;
122+
setSize(value: number): AppendRecordSizeExceededErrorDetails;
123+
getMaxSize(): number;
124+
setMaxSize(value: number): AppendRecordSizeExceededErrorDetails;
125+
126+
serializeBinary(): Uint8Array;
127+
toObject(includeInstance?: boolean): AppendRecordSizeExceededErrorDetails.AsObject;
128+
static toObject(includeInstance: boolean, msg: AppendRecordSizeExceededErrorDetails): AppendRecordSizeExceededErrorDetails.AsObject;
129+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
130+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
131+
static serializeBinaryToWriter(message: AppendRecordSizeExceededErrorDetails, writer: jspb.BinaryWriter): void;
132+
static deserializeBinary(bytes: Uint8Array): AppendRecordSizeExceededErrorDetails;
133+
static deserializeBinaryFromReader(message: AppendRecordSizeExceededErrorDetails, reader: jspb.BinaryReader): AppendRecordSizeExceededErrorDetails;
134+
}
135+
136+
export namespace AppendRecordSizeExceededErrorDetails {
137+
export type AsObject = {
138+
stream: string,
139+
recordId: string,
140+
size: number,
141+
maxSize: number,
142+
}
143+
}
144+
145+
export class AppendTransactionSizeExceededErrorDetails extends jspb.Message {
146+
getSize(): number;
147+
setSize(value: number): AppendTransactionSizeExceededErrorDetails;
148+
getMaxSize(): number;
149+
setMaxSize(value: number): AppendTransactionSizeExceededErrorDetails;
150+
151+
serializeBinary(): Uint8Array;
152+
toObject(includeInstance?: boolean): AppendTransactionSizeExceededErrorDetails.AsObject;
153+
static toObject(includeInstance: boolean, msg: AppendTransactionSizeExceededErrorDetails): AppendTransactionSizeExceededErrorDetails.AsObject;
154+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
155+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
156+
static serializeBinaryToWriter(message: AppendTransactionSizeExceededErrorDetails, writer: jspb.BinaryWriter): void;
157+
static deserializeBinary(bytes: Uint8Array): AppendTransactionSizeExceededErrorDetails;
158+
static deserializeBinaryFromReader(message: AppendTransactionSizeExceededErrorDetails, reader: jspb.BinaryReader): AppendTransactionSizeExceededErrorDetails;
159+
}
160+
161+
export namespace AppendTransactionSizeExceededErrorDetails {
162+
export type AsObject = {
163+
size: number,
164+
maxSize: number,
165+
}
166+
}
167+
168+
export class StreamAlreadyInAppendSessionErrorDetails extends jspb.Message {
169+
getStream(): string;
170+
setStream(value: string): StreamAlreadyInAppendSessionErrorDetails;
171+
172+
serializeBinary(): Uint8Array;
173+
toObject(includeInstance?: boolean): StreamAlreadyInAppendSessionErrorDetails.AsObject;
174+
static toObject(includeInstance: boolean, msg: StreamAlreadyInAppendSessionErrorDetails): StreamAlreadyInAppendSessionErrorDetails.AsObject;
175+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
176+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
177+
static serializeBinaryToWriter(message: StreamAlreadyInAppendSessionErrorDetails, writer: jspb.BinaryWriter): void;
178+
static deserializeBinary(bytes: Uint8Array): StreamAlreadyInAppendSessionErrorDetails;
179+
static deserializeBinaryFromReader(message: StreamAlreadyInAppendSessionErrorDetails, reader: jspb.BinaryReader): StreamAlreadyInAppendSessionErrorDetails;
180+
}
181+
182+
export namespace StreamAlreadyInAppendSessionErrorDetails {
183+
export type AsObject = {
184+
stream: string,
185+
}
186+
}
187+
188+
export enum StreamsError {
189+
STREAMS_ERROR_UNSPECIFIED = 0,
190+
STREAMS_ERROR_STREAM_NOT_FOUND = 1,
191+
STREAMS_ERROR_STREAM_ALREADY_EXISTS = 2,
192+
STREAMS_ERROR_STREAM_DELETED = 3,
193+
STREAMS_ERROR_STREAM_TOMBSTONED = 4,
194+
STREAMS_ERROR_STREAM_REVISION_CONFLICT = 5,
195+
STREAMS_ERROR_APPEND_RECORD_SIZE_EXCEEDED = 6,
196+
STREAMS_ERROR_APPEND_TRANSACTION_SIZE_EXCEEDED = 7,
197+
STREAMS_ERROR_STREAM_ALREADY_IN_APPEND_SESSION = 8,
198+
STREAMS_ERROR_APPEND_SESSION_NO_REQUESTS = 9,
199+
}

0 commit comments

Comments
 (0)