Skip to content

Commit 58f1eaf

Browse files
authored
Add support for redis config (#346)
1 parent f9304bc commit 58f1eaf

12 files changed

+181
-28
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212

1313
strategy:
1414
matrix:
15-
node-version: [16.x, 18.x, 20.x, 22.x]
15+
node-version: [16.x, 18.x, 20.x, 22.x, 23.x]
1616

1717
steps:
1818
- name: Checkout Repository

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ const redisCache = new Redis(redisOptions)
273273

274274
const { publisher: notificationPublisher, consumer: notificationConsumer } = createNotificationPair<User>({
275275
channel: 'user-cache-notifications',
276-
consumerRedis: redisConsumer,
277-
publisherRedis: redisPublisher,
276+
consumerRedis: redisConsumer, // you can pass redis config instead
277+
publisherRedis: redisPublisher, // you can pass redis config instead
278278
})
279279

280280
const userLoader = new Loader({

lib/redis/RedisGroupNotificationConsumer.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@ export class RedisGroupNotificationConsumer<LoadedValue> extends AbstractNotific
2121
this.channel = config.channel
2222
}
2323

24-
async close() {
24+
async close(): Promise<void> {
2525
await this.redis.unsubscribe(this.channel)
26+
return new Promise((resolve) => {
27+
void this.redis.quit((_err, result) => {
28+
return resolve()
29+
})
30+
})
2631
}
2732

2833
subscribe(): Promise<void> {

lib/redis/RedisGroupNotificationFactory.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
11
import { randomUUID } from 'node:crypto'
22
import { RedisGroupNotificationConsumer } from './RedisGroupNotificationConsumer'
33
import { RedisGroupNotificationPublisher } from './RedisGroupNotificationPublisher'
4-
import type { RedisNotificationConfig } from './RedisNotificationFactory'
4+
import {isClient, RedisNotificationConfig} from './RedisNotificationFactory'
5+
import {Redis} from "ioredis";
56

67
export function createGroupNotificationPair<T>(config: RedisNotificationConfig) {
8+
const resolvedConsumer = isClient(config.consumerRedis) ? config.consumerRedis : new Redis(config.consumerRedis)
9+
const resolvedPublisher = isClient(config.publisherRedis) ? config.publisherRedis : new Redis(config.publisherRedis)
10+
711
const serverUuid = randomUUID()
8-
if (config.publisherRedis === config.consumerRedis) {
12+
if (resolvedPublisher === resolvedConsumer) {
913
throw new Error(
1014
'Same Redis client instance cannot be used both for publisher and for consumer, please create a separate connection',
1115
)
1216
}
1317

14-
const consumer = new RedisGroupNotificationConsumer<T>(config.consumerRedis, {
18+
const consumer = new RedisGroupNotificationConsumer<T>(resolvedConsumer, {
1519
channel: config.channel,
1620
serverUuid,
1721
})
1822

19-
const publisher = new RedisGroupNotificationPublisher<T>(config.publisherRedis, {
23+
const publisher = new RedisGroupNotificationPublisher<T>(resolvedPublisher, {
2024
channel: config.channel,
2125
errorHandler: config.errorHandler,
2226
serverUuid,

lib/redis/RedisGroupNotificationPublisher.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,13 @@ export class RedisGroupNotificationPublisher<LoadedValue> implements GroupNotifi
6868
)
6969
}
7070

71-
async close() {}
71+
close(): Promise<void> {
72+
return new Promise((resolve) => {
73+
void this.redis.quit((_err, result) => {
74+
return resolve()
75+
})
76+
})
77+
}
7278

7379
async subscribe() {}
7480
}

lib/redis/RedisNotificationConsumer.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,14 @@ export class RedisNotificationConsumer<LoadedValue> extends AbstractNotification
2626
this.channel = config.channel
2727
}
2828

29-
async close() {
29+
async close(): Promise<void> {
3030
await this.redis.unsubscribe(this.channel)
31-
}
31+
return new Promise((resolve) => {
32+
void this.redis.quit((_err, result) => {
33+
return resolve()
34+
})
35+
})
36+
}
3237

3338
subscribe(): Promise<void> {
3439
return this.redis.subscribe(this.channel).then(() => {

lib/redis/RedisNotificationFactory.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,37 @@
11
import { randomUUID } from 'node:crypto'
2-
import type { Redis } from 'ioredis'
2+
import {Redis, RedisOptions} from 'ioredis'
33
import type { PublisherErrorHandler } from '../notifications/NotificationPublisher'
44
import { RedisNotificationConsumer } from './RedisNotificationConsumer'
55
import { RedisNotificationPublisher } from './RedisNotificationPublisher'
66

77
export type RedisNotificationConfig = {
88
channel: string
9-
publisherRedis: Redis
10-
consumerRedis: Redis
9+
publisherRedis: Redis | RedisOptions
10+
consumerRedis: Redis | RedisOptions
1111
errorHandler?: PublisherErrorHandler
1212
}
1313

14+
export function isClient(maybeClient: unknown): maybeClient is Redis {
15+
return 'status' in (maybeClient as Redis)
16+
}
17+
1418
export function createNotificationPair<T>(config: RedisNotificationConfig) {
19+
const resolvedConsumer = isClient(config.consumerRedis) ? config.consumerRedis : new Redis(config.consumerRedis)
20+
const resolvedPublisher = isClient(config.publisherRedis) ? config.publisherRedis : new Redis(config.publisherRedis)
21+
1522
const serverUuid = randomUUID()
16-
if (config.publisherRedis === config.consumerRedis) {
23+
if (resolvedConsumer === resolvedPublisher) {
1724
throw new Error(
1825
'Same Redis client instance cannot be used both for publisher and for consumer, please create a separate connection',
1926
)
2027
}
2128

22-
const consumer = new RedisNotificationConsumer<T>(config.consumerRedis, {
29+
const consumer = new RedisNotificationConsumer<T>(resolvedConsumer, {
2330
channel: config.channel,
2431
serverUuid,
2532
})
2633

27-
const publisher = new RedisNotificationPublisher<T>(config.publisherRedis, {
34+
const publisher = new RedisNotificationPublisher<T>(resolvedPublisher, {
2835
channel: config.channel,
2936
errorHandler: config.errorHandler,
3037
serverUuid,

lib/redis/RedisNotificationPublisher.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,13 @@ export class RedisNotificationPublisher<LoadedValue> implements NotificationPubl
9191
)
9292
}
9393

94-
async close() {}
94+
close(): Promise<void> {
95+
return new Promise((resolve) => {
96+
void this.redis.quit((_err, result) => {
97+
return resolve()
98+
})
99+
})
100+
}
95101

96102
async subscribe() {}
97103
}

package.json

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
"scripts": {
1515
"build": "tsc",
1616
"build:release": "del-cli dist && del-cli coverage && npm run lint && npm run build",
17-
"docker:start": "docker-compose -f docker-compose.yml up --build -d redis && docker-compose -f docker-compose.yml up --build -d wait_for_redis",
18-
"docker:stop": "docker-compose -f docker-compose.yml down",
17+
"docker:start": "docker compose -f docker-compose.yml up --build -d redis && docker compose -f docker-compose.yml up --build -d wait_for_redis",
18+
"docker:stop": "docker compose -f docker-compose.yml down",
1919
"test": "vitest",
2020
"test:everything": "npm run lint && npm run test:coverage",
2121
"test:coverage": "vitest --coverage",
@@ -50,17 +50,17 @@
5050
],
5151
"homepage": "https://github.com/kibertoad/layered-loader",
5252
"dependencies": {
53+
"ioredis": "^5.4.1",
5354
"toad-cache": "^3.7.0"
5455
},
5556
"devDependencies": {
56-
"@biomejs/biome": "^1.8.0",
57-
"@types/node": "^20.14.2",
58-
"@vitest/coverage-v8": "1.6.0",
59-
"del-cli": "^5.1.0",
60-
"ioredis": "^5.4.1",
61-
"rfdc": "^1.3.0",
62-
"vitest": "1.6.0",
63-
"typescript": "5.5.3"
57+
"@biomejs/biome": "^1.9.4",
58+
"@types/node": "^20.17.4",
59+
"@vitest/coverage-v8": "^2.1.4",
60+
"del-cli": "^6.0.0",
61+
"rfdc": "^1.4.1",
62+
"vitest": "^2.1.4",
63+
"typescript": "^5.6.3"
6464
},
6565
"files": [
6666
"README.md",

test/redis/RedisGroupNotificationPublisher.spec.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,65 @@ describe('RedisGroupNotificationPublisher', () => {
126126
await notificationPublisher1.close()
127127
})
128128

129+
it('Propagates invalidation event to remote cache, works with redis config', async () => {
130+
const { publisher: notificationPublisher1, consumer: notificationConsumer1 } =
131+
createGroupNotificationPair({
132+
channel: CHANNEL_ID,
133+
consumerRedis: redisOptions,
134+
publisherRedis: redisOptions,
135+
})
136+
137+
const { publisher: notificationPublisher2, consumer: notificationConsumer2 } =
138+
createGroupNotificationPair({
139+
channel: CHANNEL_ID,
140+
consumerRedis: redisConsumer,
141+
publisherRedis: redisPublisher,
142+
})
143+
const operation = new GroupLoader({
144+
inMemoryCache: IN_MEMORY_CACHE_CONFIG,
145+
asyncCache: new DummyGroupedCache(userValues),
146+
notificationConsumer: notificationConsumer1,
147+
notificationPublisher: notificationPublisher1,
148+
})
149+
150+
const operation2 = new GroupLoader({
151+
inMemoryCache: IN_MEMORY_CACHE_CONFIG,
152+
asyncCache: new DummyGroupedCache(userValues),
153+
notificationConsumer: notificationConsumer2,
154+
notificationPublisher: notificationPublisher2,
155+
})
156+
await operation.init()
157+
await operation2.init()
158+
159+
await operation.getAsyncOnly('key', 'group')
160+
await operation2.getAsyncOnly('key', 'group')
161+
const resultPre1 = operation.getInMemoryOnly('key', 'group')
162+
const resultPre2 = operation2.getInMemoryOnly('key', 'group')
163+
await operation.invalidateCacheFor('key', 'group')
164+
165+
await waitAndRetry(
166+
() => {
167+
const resultPost1 = operation.getInMemoryOnly('key', 'group')
168+
const resultPost2 = operation2.getInMemoryOnly('key', 'group')
169+
return resultPost1 === undefined && resultPost2 === undefined
170+
},
171+
50,
172+
100,
173+
)
174+
175+
const resultPost1 = operation.getInMemoryOnly('key', 'group')
176+
const resultPost2 = operation2.getInMemoryOnly('key', 'group')
177+
178+
expect(resultPre1).toEqual(user1)
179+
expect(resultPre2).toEqual(user1)
180+
181+
expect(resultPost1).toBeUndefined()
182+
expect(resultPost2).toBeUndefined()
183+
184+
await notificationConsumer1.close()
185+
await notificationPublisher1.close()
186+
})
187+
129188
it('Propagates delete group event to remote cache', async () => {
130189
const { publisher: notificationPublisher1, consumer: notificationConsumer1 } =
131190
createGroupNotificationPair({

0 commit comments

Comments
 (0)