diff --git a/lib/adapter.ts b/lib/adapter.ts index 17f082a..ceeffdc 100644 --- a/lib/adapter.ts +++ b/lib/adapter.ts @@ -289,12 +289,14 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat { for (const entry of entries) { if (entry.message.nsp === this.nsp.name && entry.message.type === "3") { - const message = RedisStreamsAdapter.decode(entry.message); - - // @ts-ignore - if (shouldIncludePacket(session.rooms, message.data.opts)) { - // @ts-ignore - session.missedPackets.push(message.data.packet.data); + const message = RedisStreamsAdapter.decode(entry.message) as { + data: any; + }; + const { packet, opts } = message.data; + + if (shouldIncludePacket(session.rooms, opts)) { + packet.data.push(entry.id); + session.missedPackets.push(packet.data); } } offset = entry.id; diff --git a/test/connection-state-recovery.ts b/test/connection-state-recovery.ts index ee9eecc..6f9b643 100644 --- a/test/connection-state-recovery.ts +++ b/test/connection-state-recovery.ts @@ -1,10 +1,10 @@ import { Server } from "socket.io"; import { io as ioc } from "socket.io-client"; -import { setup, sleep } from "./util"; +import { setup } from "./util"; import expect = require("expect.js"); describe("connection state recovery", () => { - let servers: Server[], ports: number[], cleanup; + let servers: Server[], ports: number[], cleanup: () => void; beforeEach(async () => { const testContext = await setup({ @@ -82,10 +82,14 @@ describe("connection state recovery", () => { socket.io.engine.close(); socket.on("connect", () => { + // @ts-expect-error _lastOffset is private + offsets.add(socket._lastOffset); + expect(socket.recovered).to.eql(true); setTimeout(() => { expect(events).to.eql([1, 2, 3]); + expect(offsets.size).to.eql(4); socket.disconnect(); done(); @@ -94,9 +98,13 @@ describe("connection state recovery", () => { }); const events: number[] = []; + const offsets = new Set(); socket.on("myEvent", (val) => { events.push(val); + // note: the offset is updated after the callback execution + // @ts-expect-error _lastOffset is private + offsets.add(socket._lastOffset); }); });