diff --git a/src/chains/ethereum/ethereum/src/forking/handlers/ws-handler.ts b/src/chains/ethereum/ethereum/src/forking/handlers/ws-handler.ts index eca6f9e01a..4dc81c9b3f 100644 --- a/src/chains/ethereum/ethereum/src/forking/handlers/ws-handler.ts +++ b/src/chains/ethereum/ethereum/src/forking/handlers/ws-handler.ts @@ -9,6 +9,11 @@ import { JsonRpcResponse, JsonRpcError } from "@ganache/utils"; const { JSONRPC_PREFIX } = BaseHandler; +export type RetryConfiguration = { + retryIntervalBaseInSeconds: number; + retryCounter: number; +}; + export class WsHandler extends BaseHandler implements Handler { private open: Promise; private connection: WebSocket; @@ -20,42 +25,65 @@ export class WsHandler extends BaseHandler implements Handler { }> >(); - constructor(options: EthereumInternalOptions, abortSignal: AbortSignal) { + // queue requests when connection is closed. + private delayedRequestsQueue = []; + // flag to identify if adhoc reconnection attempt. + private adhocReconnectionRequest = false; + + // retry configuration + private retryCounter: number = 3; + private retryIntervalBaseInSeconds: number = 2; + private initialRetryCounter: number; + private retryTimeoutId: NodeJS.Timeout; + + // socket configuration + private url: string; + private origin: string; + private logging: EthereumInternalOptions["logging"]; + + constructor( + options: EthereumInternalOptions, + abortSignal: AbortSignal, + retryConfiguration?: RetryConfiguration | undefined + ) { super(options, abortSignal); const { fork: { url, origin }, logging } = options; + this.url = url.toString(); + this.origin = origin; + this.logging = logging; + + // set retry configuration values + if (retryConfiguration) { + this.retryCounter = retryConfiguration.retryCounter; + this.initialRetryCounter = retryConfiguration.retryIntervalBaseInSeconds; + } + this.initialRetryCounter = this.retryCounter; - this.connection = new WebSocket(url.toString(), { - origin, - headers: this.headers - }); - // `nodebuffer` is already the default, but I just wanted to be explicit - // here because when `nodebuffer` is the binaryType the `message` event's - // data type is guaranteed to be a `Buffer`. We don't need to check for - // different types of data. - // I mention all this because if `arraybuffer` or `fragment` is used for the - // binaryType the `"message"` event's `data` may end up being - // `ArrayBuffer | Buffer`, or `Buffer[] | Buffer`, respectively. - // If you need to change this, you probably need to change our `onMessage` - // handler too. - this.connection.binaryType = "nodebuffer"; - - this.open = this.connect(this.connection, logging); - this.connection.onclose = () => { + const onCloseEvent = () => { // try to connect again... - // Issue: https://github.com/trufflesuite/ganache/issues/3476 - // TODO: backoff and eventually fail - // Issue: https://github.com/trufflesuite/ganache/issues/3477 - this.open = this.connect(this.connection, logging); + // backoff and eventually fail + // do not schedule reconnection for adhoc reconnection requests + if (this.retryCounter === 0) { + this.logging.logger.log("Connection to Infura has failed. Try again"); + } else { + if (!this.adhocReconnectionRequest) { + this.retryCounter--; + clearTimeout(this.retryTimeoutId); + this.retryTimeoutId = setTimeout(async () => { + this.reconnect(this.url, this.origin, false); + }, Math.pow(this.retryIntervalBaseInSeconds, this.initialRetryCounter - this.retryCounter) * 1000); + } + } }; + this.open = this.connect(this.url, this.origin, onCloseEvent); this.abortSignal.addEventListener("abort", () => { this.connection.onclose = null; this.connection.close(1000); }); - this.connection.onmessage = this.onMessage.bind(this); } public async request( @@ -63,7 +91,14 @@ export class WsHandler extends BaseHandler implements Handler { params: unknown[], options = { disableCache: false } ) { - await this.open; + try { + await this.open; + } catch (er) { + this.logging.logger.log("Connection to Infura has failed"); + // skip the reconnection if connection is being made + if (this.connection.readyState !== this.connection.CONNECTING) + this.reconnect(this.url, this.origin, true); + } if (this.abortSignal.aborted) return Promise.reject(new AbortError()); const key = JSON.stringify({ method, params }); @@ -81,7 +116,13 @@ export class WsHandler extends BaseHandler implements Handler { // Issue: https://github.com/trufflesuite/ganache/issues/3478 this.inFlightRequests.set(messageId, deferred); - this.connection.send(`${JSONRPC_PREFIX}${messageId},${key.slice(1)}`); + // if connection is alive send request else delay the request + const data = `${JSONRPC_PREFIX}${messageId},${key.slice(1)}`; + if (this.connection && this.connection.readyState === 1) { + this.connection.send(data); + } else { + this.delayRequest(data); + } return deferred.promise.finally(() => this.requestCache.delete(key)); }; return await this.queueRequest(method, params, key, send, options); @@ -105,26 +146,62 @@ export class WsHandler extends BaseHandler implements Handler { } } - private connect( - connection: WebSocket, - logging: EthereumInternalOptions["logging"] - ) { + private connect(url: string, origin: string, onCloseEvent: any) { + this.connection = new WebSocket(url, { + origin, + headers: this.headers + }); + // `nodebuffer` is already the default, but I just wanted to be explicit + // here because when `nodebuffer` is the binaryType the `message` event's + // data type is guaranteed to be a `Buffer`. We don't need to check for + // different types of data. + // I mention all this because if `arraybuffer` or `fragment` is used for the + // binaryType the `"message"` event's `data` may end up being + // `ArrayBuffer | Buffer`, or `Buffer[] | Buffer`, respectively. + // If you need to change this, you probably need to change our `onMessage` + // handler too. + this.connection.binaryType = "nodebuffer"; + this.connection.onclose = onCloseEvent; + this.connection.onmessage = this.onMessage.bind(this); let open = new Promise((resolve, reject) => { - connection.onopen = resolve; - connection.onerror = reject; + this.connection.onopen = resolve; + this.connection.onerror = reject; + }); + open.then(() => { + this.connection.onopen = null; + this.connection.onerror = null; + // reset the retry counter and any timeouts scheduled for retries + this.retryCounter = this.initialRetryCounter; + clearTimeout(this.retryTimeoutId); + + this.adhocReconnectionRequest = false; + // process delayed requests which were queued at the time of connection failure + this.sendDelayedRequests(); }); - open.then( - () => { - connection.onopen = null; - connection.onerror = null; - }, - err => { - logging.logger.log(err); - } - ); return open; } + private reconnect( + url: string, + origin: string, + adhocReconnectionRequest: boolean = false + ) { + this.adhocReconnectionRequest = adhocReconnectionRequest; + const onCloseEvent = this.connection.onclose; + this.open = this.connect(url, origin, onCloseEvent); + } + + private delayRequest(request: any) { + this.delayedRequestsQueue.push(request); + } + + private sendDelayedRequests() { + while (this.delayedRequestsQueue.length > 0) { + const request = this.delayedRequestsQueue.pop(); + this.connection.send(request); + } + } + public async close() { await super.close(); this.connection.close(); diff --git a/src/chains/ethereum/ethereum/tests/forking/handlers/ws-handler.test.ts b/src/chains/ethereum/ethereum/tests/forking/handlers/ws-handler.test.ts new file mode 100644 index 0000000000..6008b06cf0 --- /dev/null +++ b/src/chains/ethereum/ethereum/tests/forking/handlers/ws-handler.test.ts @@ -0,0 +1,79 @@ +import assert from "assert"; +import AbortController from "abort-controller"; +import { WsHandler } from "../../../src/forking/handlers/ws-handler"; +import { + EthereumOptionsConfig, + EthereumProviderOptions +} from "@ganache/ethereum-options"; +import WebSocket from "ws"; + +const createWebSocketServer = (port: number): WebSocket.Server => { + let wsServer = new WebSocket.Server({ port }); + wsServer.on("connection", async ws => { + ws.on("message", data => { + const message = JSON.parse(data.toString()); + ws.send( + Buffer.from( + JSON.stringify({ + id: message.id, + jsonrpc: "2.0", + result: "0x0" + }), + "utf-8" + ) + ); + if (message.method === "client-disconnect") { + setTimeout(() => { + ws.terminate(); + }, 10); + } + }); + }); + return wsServer; +}; + +// create test server +const URL = "ws://localhost:8888/"; +let wsServer: WebSocket.Server; +let wsHandler: WsHandler; +wsServer = createWebSocketServer(8888); + +describe("ws-handler", function () { + describe("retries", function () { + before(() => { + const providerOptions = EthereumOptionsConfig.normalize({ + fork: { + url: URL, + origin: "test" + } + } as EthereumProviderOptions); + const abortController: AbortController = new AbortController(); + wsHandler = new WsHandler(providerOptions, abortController.signal, { + retryCounter: 4, + retryIntervalBaseInSeconds: 3 + }); + }); + + after(() => { + wsHandler.close(); + wsServer.close(); + }); + + it("should attempt to reconnect the server when connection is terminated", async () => { + // send a request to websocket server to get connection termination. + await wsHandler.request("client-disconnect", [], { + disableCache: true + }); + await new Promise(resolve => setTimeout(resolve, 100)); + + // send request after connection is terminated + const retryPromise = wsHandler.request("retry", [], { + disableCache: true + }); + + // assert the result + const response = await retryPromise; + assert.equal(response, "0x0"); + }).timeout(10000); + }); +});