Skip to content

Commit f781674

Browse files
authored
Changes for multi stream append
Changes for multi stream append
2 parents dbe8881 + 73e524d commit f781674

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+6051
-74
lines changed

docs/api/appending-events.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,4 +195,64 @@ const credentials = {
195195
await client.appendToStream("some-stream", event, {
196196
credentials,
197197
});
198+
```
199+
200+
## Append to multiple streams
201+
202+
::: note
203+
This feature is only available in KurrentDB 25.1 and later.
204+
:::
205+
206+
You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.
207+
208+
::: warning
209+
Currently, metadata must be valid JSON. Binary metadata will not be supported in
210+
this version. This limitation ensures compatibility with KurrentDB's metadata
211+
handling and will be removed in the next major release.
212+
:::
213+
214+
```ts
215+
import { jsonEvent } from "@kurrent/kurrentdb-client";
216+
import { v4 as uuid } from "uuid";
217+
218+
const metadata = {
219+
timestamp: new Date().toISOString(),
220+
source: "OrderProcessingSystem",
221+
version: 1.0
222+
};
223+
224+
const requests = [
225+
{
226+
streamName: "order-stream-1",
227+
expectedState: "any",
228+
events: [
229+
jsonEvent({
230+
id: uuid(),
231+
type: "OrderCreated",
232+
data: {
233+
orderId: "12345",
234+
amount: 99.99
235+
},
236+
metadata
237+
})
238+
]
239+
},
240+
{
241+
streamName: "inventory-stream-1",
242+
expectedState: "any",
243+
events: [
244+
jsonEvent({
245+
id: uuid(),
246+
type: "ItemReserved",
247+
data: {
248+
itemId: "ABC123",
249+
quantity: 2
250+
},
251+
metadata
252+
})
253+
]
254+
}
255+
];
256+
257+
await client.multiStreamAppend(requests);
198258
```

docs/api/persistent-subscriptions.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,27 @@ The main aim of this strategy is to decrease the likelihood of concurrency and
203203
ordering issues while maintaining load balancing. This is **not a guarantee**,
204204
and you should handle the usual ordering and concurrency issues.
205205

206+
### PinnedByCorrelation
207+
208+
The PinnedByCorrelation strategy is a consumer strategy available for persistent subscriptions
209+
It ensures that events with the same correlation id are consistently delivered to the same
210+
consumer within a subscription group.
211+
212+
:::note
213+
This strategy requires database version 21.10.1 or later. You can only create a persistent subscription
214+
with this strategy. To change the strategy, you must delete the existing subscription and create a
215+
new one with the desired settings.
216+
:::
217+
218+
## Updating a subscription group
219+
220+
You can edit the settings of an existing subscription group while it is running,
221+
you don't need to delete and recreate it to change settings. When you update the
222+
subscription group, it resets itself internally, dropping the connections and
223+
having them reconnect. You must have admin permissions to update a persistent
224+
subscription group.
225+
226+
206227
## Updating a subscription group
207228

208229
You can edit the settings of an existing subscription group while it is running,

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@
2424
"devDependencies": {
2525
"@tsconfig/node18": "^18.2.4",
2626
"@types/node": "18.19.76",
27+
"@types/semver": "^7.7.0",
2728
"@typescript-eslint/eslint-plugin": "^8.10.0",
2829
"@typescript-eslint/parser": "^8.10.0",
2930
"cross-env": "^7.0.3",
3031
"eslint": "^8.56.0",
3132
"eslint-plugin-tsdoc": "^0.2.17",
3233
"nx": "20.1.3",
3334
"prettier": "^2.8.8",
34-
"semver": "^7.6.3",
35+
"semver": "^7.7.2",
3536
"typescript": "^5.6.3"
3637
}
3738
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
// GENERATED CODE -- NO SERVICES IN PROTO
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// package: kurrentdb.protocol.v2.streams.errors
2+
// file: kurrentdb/protocols/v2/streams/errors.proto
3+
4+
/* tslint:disable */
5+
/* eslint-disable */
6+
7+
import * as jspb from "google-protobuf";
8+
import * as kurrentdb_protocols_v2_rpc_pb from "../../../../kurrentdb/protocols/v2/rpc_pb";
9+
10+
export class StreamNotFoundErrorDetails extends jspb.Message {
11+
getStream(): string;
12+
setStream(value: string): StreamNotFoundErrorDetails;
13+
14+
serializeBinary(): Uint8Array;
15+
toObject(includeInstance?: boolean): StreamNotFoundErrorDetails.AsObject;
16+
static toObject(includeInstance: boolean, msg: StreamNotFoundErrorDetails): StreamNotFoundErrorDetails.AsObject;
17+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
18+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
19+
static serializeBinaryToWriter(message: StreamNotFoundErrorDetails, writer: jspb.BinaryWriter): void;
20+
static deserializeBinary(bytes: Uint8Array): StreamNotFoundErrorDetails;
21+
static deserializeBinaryFromReader(message: StreamNotFoundErrorDetails, reader: jspb.BinaryReader): StreamNotFoundErrorDetails;
22+
}
23+
24+
export namespace StreamNotFoundErrorDetails {
25+
export type AsObject = {
26+
stream: string,
27+
}
28+
}
29+
30+
export class StreamAlreadyExistsErrorDetails extends jspb.Message {
31+
getStream(): string;
32+
setStream(value: string): StreamAlreadyExistsErrorDetails;
33+
34+
serializeBinary(): Uint8Array;
35+
toObject(includeInstance?: boolean): StreamAlreadyExistsErrorDetails.AsObject;
36+
static toObject(includeInstance: boolean, msg: StreamAlreadyExistsErrorDetails): StreamAlreadyExistsErrorDetails.AsObject;
37+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
38+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
39+
static serializeBinaryToWriter(message: StreamAlreadyExistsErrorDetails, writer: jspb.BinaryWriter): void;
40+
static deserializeBinary(bytes: Uint8Array): StreamAlreadyExistsErrorDetails;
41+
static deserializeBinaryFromReader(message: StreamAlreadyExistsErrorDetails, reader: jspb.BinaryReader): StreamAlreadyExistsErrorDetails;
42+
}
43+
44+
export namespace StreamAlreadyExistsErrorDetails {
45+
export type AsObject = {
46+
stream: string,
47+
}
48+
}
49+
50+
export class StreamDeletedErrorDetails extends jspb.Message {
51+
getStream(): string;
52+
setStream(value: string): StreamDeletedErrorDetails;
53+
54+
serializeBinary(): Uint8Array;
55+
toObject(includeInstance?: boolean): StreamDeletedErrorDetails.AsObject;
56+
static toObject(includeInstance: boolean, msg: StreamDeletedErrorDetails): StreamDeletedErrorDetails.AsObject;
57+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
58+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
59+
static serializeBinaryToWriter(message: StreamDeletedErrorDetails, writer: jspb.BinaryWriter): void;
60+
static deserializeBinary(bytes: Uint8Array): StreamDeletedErrorDetails;
61+
static deserializeBinaryFromReader(message: StreamDeletedErrorDetails, reader: jspb.BinaryReader): StreamDeletedErrorDetails;
62+
}
63+
64+
export namespace StreamDeletedErrorDetails {
65+
export type AsObject = {
66+
stream: string,
67+
}
68+
}
69+
70+
export class StreamTombstonedErrorDetails extends jspb.Message {
71+
getStream(): string;
72+
setStream(value: string): StreamTombstonedErrorDetails;
73+
74+
serializeBinary(): Uint8Array;
75+
toObject(includeInstance?: boolean): StreamTombstonedErrorDetails.AsObject;
76+
static toObject(includeInstance: boolean, msg: StreamTombstonedErrorDetails): StreamTombstonedErrorDetails.AsObject;
77+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
78+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
79+
static serializeBinaryToWriter(message: StreamTombstonedErrorDetails, writer: jspb.BinaryWriter): void;
80+
static deserializeBinary(bytes: Uint8Array): StreamTombstonedErrorDetails;
81+
static deserializeBinaryFromReader(message: StreamTombstonedErrorDetails, reader: jspb.BinaryReader): StreamTombstonedErrorDetails;
82+
}
83+
84+
export namespace StreamTombstonedErrorDetails {
85+
export type AsObject = {
86+
stream: string,
87+
}
88+
}
89+
90+
export class StreamRevisionConflictErrorDetails extends jspb.Message {
91+
getStream(): string;
92+
setStream(value: string): StreamRevisionConflictErrorDetails;
93+
getExpectedRevision(): string;
94+
setExpectedRevision(value: string): StreamRevisionConflictErrorDetails;
95+
getActualRevision(): string;
96+
setActualRevision(value: string): StreamRevisionConflictErrorDetails;
97+
98+
serializeBinary(): Uint8Array;
99+
toObject(includeInstance?: boolean): StreamRevisionConflictErrorDetails.AsObject;
100+
static toObject(includeInstance: boolean, msg: StreamRevisionConflictErrorDetails): StreamRevisionConflictErrorDetails.AsObject;
101+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
102+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
103+
static serializeBinaryToWriter(message: StreamRevisionConflictErrorDetails, writer: jspb.BinaryWriter): void;
104+
static deserializeBinary(bytes: Uint8Array): StreamRevisionConflictErrorDetails;
105+
static deserializeBinaryFromReader(message: StreamRevisionConflictErrorDetails, reader: jspb.BinaryReader): StreamRevisionConflictErrorDetails;
106+
}
107+
108+
export namespace StreamRevisionConflictErrorDetails {
109+
export type AsObject = {
110+
stream: string,
111+
expectedRevision: string,
112+
actualRevision: string,
113+
}
114+
}
115+
116+
export class AppendRecordSizeExceededErrorDetails extends jspb.Message {
117+
getStream(): string;
118+
setStream(value: string): AppendRecordSizeExceededErrorDetails;
119+
getRecordId(): string;
120+
setRecordId(value: string): AppendRecordSizeExceededErrorDetails;
121+
getSize(): number;
122+
setSize(value: number): AppendRecordSizeExceededErrorDetails;
123+
getMaxSize(): number;
124+
setMaxSize(value: number): AppendRecordSizeExceededErrorDetails;
125+
126+
serializeBinary(): Uint8Array;
127+
toObject(includeInstance?: boolean): AppendRecordSizeExceededErrorDetails.AsObject;
128+
static toObject(includeInstance: boolean, msg: AppendRecordSizeExceededErrorDetails): AppendRecordSizeExceededErrorDetails.AsObject;
129+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
130+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
131+
static serializeBinaryToWriter(message: AppendRecordSizeExceededErrorDetails, writer: jspb.BinaryWriter): void;
132+
static deserializeBinary(bytes: Uint8Array): AppendRecordSizeExceededErrorDetails;
133+
static deserializeBinaryFromReader(message: AppendRecordSizeExceededErrorDetails, reader: jspb.BinaryReader): AppendRecordSizeExceededErrorDetails;
134+
}
135+
136+
export namespace AppendRecordSizeExceededErrorDetails {
137+
export type AsObject = {
138+
stream: string,
139+
recordId: string,
140+
size: number,
141+
maxSize: number,
142+
}
143+
}
144+
145+
export class AppendTransactionSizeExceededErrorDetails extends jspb.Message {
146+
getSize(): number;
147+
setSize(value: number): AppendTransactionSizeExceededErrorDetails;
148+
getMaxSize(): number;
149+
setMaxSize(value: number): AppendTransactionSizeExceededErrorDetails;
150+
151+
serializeBinary(): Uint8Array;
152+
toObject(includeInstance?: boolean): AppendTransactionSizeExceededErrorDetails.AsObject;
153+
static toObject(includeInstance: boolean, msg: AppendTransactionSizeExceededErrorDetails): AppendTransactionSizeExceededErrorDetails.AsObject;
154+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
155+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
156+
static serializeBinaryToWriter(message: AppendTransactionSizeExceededErrorDetails, writer: jspb.BinaryWriter): void;
157+
static deserializeBinary(bytes: Uint8Array): AppendTransactionSizeExceededErrorDetails;
158+
static deserializeBinaryFromReader(message: AppendTransactionSizeExceededErrorDetails, reader: jspb.BinaryReader): AppendTransactionSizeExceededErrorDetails;
159+
}
160+
161+
export namespace AppendTransactionSizeExceededErrorDetails {
162+
export type AsObject = {
163+
size: number,
164+
maxSize: number,
165+
}
166+
}
167+
168+
export class StreamAlreadyInAppendSessionErrorDetails extends jspb.Message {
169+
getStream(): string;
170+
setStream(value: string): StreamAlreadyInAppendSessionErrorDetails;
171+
172+
serializeBinary(): Uint8Array;
173+
toObject(includeInstance?: boolean): StreamAlreadyInAppendSessionErrorDetails.AsObject;
174+
static toObject(includeInstance: boolean, msg: StreamAlreadyInAppendSessionErrorDetails): StreamAlreadyInAppendSessionErrorDetails.AsObject;
175+
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
176+
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
177+
static serializeBinaryToWriter(message: StreamAlreadyInAppendSessionErrorDetails, writer: jspb.BinaryWriter): void;
178+
static deserializeBinary(bytes: Uint8Array): StreamAlreadyInAppendSessionErrorDetails;
179+
static deserializeBinaryFromReader(message: StreamAlreadyInAppendSessionErrorDetails, reader: jspb.BinaryReader): StreamAlreadyInAppendSessionErrorDetails;
180+
}
181+
182+
export namespace StreamAlreadyInAppendSessionErrorDetails {
183+
export type AsObject = {
184+
stream: string,
185+
}
186+
}
187+
188+
export enum StreamsError {
189+
STREAMS_ERROR_UNSPECIFIED = 0,
190+
STREAMS_ERROR_STREAM_NOT_FOUND = 1,
191+
STREAMS_ERROR_STREAM_ALREADY_EXISTS = 2,
192+
STREAMS_ERROR_STREAM_DELETED = 3,
193+
STREAMS_ERROR_STREAM_TOMBSTONED = 4,
194+
STREAMS_ERROR_STREAM_REVISION_CONFLICT = 5,
195+
STREAMS_ERROR_APPEND_RECORD_SIZE_EXCEEDED = 6,
196+
STREAMS_ERROR_APPEND_TRANSACTION_SIZE_EXCEEDED = 7,
197+
STREAMS_ERROR_STREAM_ALREADY_IN_APPEND_SESSION = 8,
198+
STREAMS_ERROR_APPEND_SESSION_NO_REQUESTS = 9,
199+
}

0 commit comments

Comments
 (0)