Skip to content

Commit 412986e

Browse files
authored
feat: non-conflicting fetch instrumentation (#554)
* feat: new approach for fetch instrumentation * fix: restore extras and prepareHeaders * fix: rename request handler methods * fix: avoid duplicate subscriptions * fix: initialize _channelSubs array * test: update request/response header format * chore: no need for new variable * test: no longer patching fetch * test: fix typo in test case
1 parent a666b89 commit 412986e

File tree

2 files changed

+141
-141
lines changed

2 files changed

+141
-141
lines changed
Lines changed: 7 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
import { http, HttpResponse } from 'msw'
2-
import { setupServer } from 'msw/node'
3-
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, test } from 'vitest'
4-
import { createTracerProvider } from '../bootstrap/main.ts'
5-
import { shutdownTracers } from '../main.ts'
1+
import { describe, expect, test } from 'vitest'
62
import { FetchInstrumentation } from './fetch.ts'
73

84
describe('header exclusion', () => {
@@ -14,11 +10,7 @@ describe('header exclusion', () => {
1410
// eslint-disable-next-line @typescript-eslint/dot-notation
1511
const attributes = instrumentation['prepareHeaders'](
1612
'request',
17-
new Headers({
18-
a: 'a',
19-
b: 'b',
20-
authorization: 'secret',
21-
}),
13+
['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)),
2214
)
2315
expect(attributes).toEqual({
2416
'http.request.header.a': 'a',
@@ -33,11 +25,7 @@ describe('header exclusion', () => {
3325
// eslint-disable-next-line @typescript-eslint/dot-notation
3426
const empty = everything['prepareHeaders'](
3527
'request',
36-
new Headers({
37-
a: 'a',
38-
b: 'b',
39-
authorization: 'secret',
40-
}),
28+
['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)),
4129
)
4230
expect(empty).toEqual({})
4331
})
@@ -50,13 +38,9 @@ describe('header exclusion', () => {
5038
// eslint-disable-next-line @typescript-eslint/dot-notation
5139
const attributes = instrumentation['prepareHeaders'](
5240
'request',
53-
new Headers({
54-
a: 'a',
55-
b: 'b',
56-
authorization: 'a secret',
57-
}),
41+
['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)),
5842
)
59-
expect(attributes['http.request.header.authorization']).not.toBe('a secret')
43+
expect(attributes['http.request.header.authorization']).not.toBe('secret')
6044
expect(attributes['http.request.header.authorization']).toBeTypeOf('string')
6145
expect(attributes['http.request.header.a']).toBe('a')
6246
expect(attributes['http.request.header.b']).toBe('b')
@@ -70,91 +54,13 @@ describe('header exclusion', () => {
7054
// eslint-disable-next-line @typescript-eslint/dot-notation
7155
const attributes = instrumentation['prepareHeaders'](
7256
'request',
73-
new Headers({
74-
a: 'a',
75-
b: 'b',
76-
authorization: 'a secret',
77-
}),
57+
['a', 'a', 'b', 'b', 'authorization', 'secret'].map((value) => Buffer.from(value)),
7858
)
79-
expect(attributes['http.request.header.authorization']).not.toBe('a secret')
59+
expect(attributes['http.request.header.authorization']).not.toBe('secret')
8060
expect(attributes['http.request.header.a']).not.toBe('a')
8161
expect(attributes['http.request.header.b']).not.toBe('b')
8262
expect(attributes['http.request.header.authorization']).toBeTypeOf('string')
8363
expect(attributes['http.request.header.a']).toBeTypeOf('string')
8464
expect(attributes['http.request.header.b']).toBeTypeOf('string')
8565
})
8666
})
87-
88-
describe('patched fetch', () => {
89-
const server = setupServer(
90-
http.get('http://localhost:3000/ok', () => HttpResponse.json({ message: 'ok' })),
91-
http.post('http://localhost:3000/ok', () => HttpResponse.json({ message: 'ok' })),
92-
)
93-
94-
beforeAll(() => {
95-
server.listen({ onUnhandledRequest: 'error' })
96-
})
97-
98-
beforeEach(() => {
99-
createTracerProvider({
100-
serviceName: 'test-service',
101-
serviceVersion: '1.0.0',
102-
deploymentEnvironment: 'test',
103-
siteUrl: 'https://example.com',
104-
siteId: '12345',
105-
siteName: 'example',
106-
instrumentations: [new FetchInstrumentation()],
107-
})
108-
})
109-
110-
afterEach(async () => {
111-
server.resetHandlers()
112-
await shutdownTracers()
113-
})
114-
115-
afterAll(() => {
116-
server.close()
117-
})
118-
119-
it('can GET url', async () => {
120-
createTracerProvider({
121-
serviceName: 'test-service',
122-
serviceVersion: '1.0.0',
123-
deploymentEnvironment: 'test',
124-
siteUrl: 'https://example.com',
125-
siteId: '12345',
126-
siteName: 'example',
127-
instrumentations: [new FetchInstrumentation()],
128-
})
129-
130-
await expect(fetch('http://localhost:3000/ok').then((r) => r.json())).resolves.toEqual({ message: 'ok' })
131-
})
132-
133-
it('can POST url', async () => {
134-
await expect(
135-
fetch('http://localhost:3000/ok', {
136-
method: 'POST',
137-
body: JSON.stringify({ hello: 'rabbit' }),
138-
headers: {
139-
'Content-Type': 'application/json',
140-
},
141-
}).then((r) => r.json()),
142-
).resolves.toEqual({ message: 'ok' })
143-
})
144-
145-
it('can GET request', async () => {
146-
const req = new Request('http://localhost:3000/ok')
147-
await expect(fetch(req).then((r) => r.json())).resolves.toEqual({ message: 'ok' })
148-
})
149-
150-
it('can POST request', async () => {
151-
const req = new Request('http://localhost:3000/ok', {
152-
method: 'POST',
153-
body: JSON.stringify({ hello: 'rabbit' }),
154-
headers: {
155-
'Content-Type': 'application/json',
156-
},
157-
})
158-
await expect(fetch(req).then((r) => r.json())).resolves.toEqual({ message: 'ok' })
159-
})
160-
})
Lines changed: 134 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
import * as diagnosticsChannel from 'diagnostics_channel'
2+
13
import * as api from '@opentelemetry/api'
24
import { SugaredTracer } from '@opentelemetry/api/experimental'
35
import { _globalThis } from '@opentelemetry/core'
46
import { InstrumentationConfig, type Instrumentation } from '@opentelemetry/instrumentation'
57

68
export interface FetchInstrumentationConfig extends InstrumentationConfig {
7-
getRequestAttributes?(headers: Request): api.Attributes
8-
getResponseAttributes?(response: Response): api.Attributes
9+
getRequestAttributes?(request: FetchRequest): api.Attributes
10+
getResponseAttributes?(response: FetchResponse): api.Attributes
911
skipURLs?: (string | RegExp)[]
1012
skipHeaders?: (string | RegExp)[] | true
1113
redactHeaders?: (string | RegExp)[] | true
@@ -14,12 +16,15 @@ export interface FetchInstrumentationConfig extends InstrumentationConfig {
1416
export class FetchInstrumentation implements Instrumentation {
1517
instrumentationName = '@netlify/otel/instrumentation-fetch'
1618
instrumentationVersion = '1.0.0'
17-
private originalFetch: typeof fetch | null = null
1819
private config: FetchInstrumentationConfig
1920
private provider?: api.TracerProvider
2021

22+
declare private _channelSubs: ListenerRecord[]
23+
private _recordFromReq = new WeakMap<FetchRequest, api.Span>()
24+
2125
constructor(config: FetchInstrumentationConfig = {}) {
2226
this.config = config
27+
this._channelSubs = []
2328
}
2429

2530
getConfig(): FetchInstrumentationConfig {
@@ -36,9 +41,10 @@ export class FetchInstrumentation implements Instrumentation {
3641
return this.provider
3742
}
3843

39-
private annotateFromRequest(span: api.Span, request: Request): void {
44+
private annotateFromRequest(span: api.Span, request: FetchRequest): void {
4045
const extras = this.config.getRequestAttributes?.(request) ?? {}
41-
const url = new URL(request.url)
46+
const url = new URL(request.path, request.origin)
47+
4248
// these are based on @opentelemetry/semantic-convention 1.36
4349
span.setAttributes({
4450
...extras,
@@ -52,19 +58,25 @@ export class FetchInstrumentation implements Instrumentation {
5258
})
5359
}
5460

55-
private annotateFromResponse(span: api.Span, response: Response): void {
61+
private annotateFromResponse(span: api.Span, response: FetchResponse): void {
5662
const extras = this.config.getResponseAttributes?.(response) ?? {}
5763

5864
// these are based on @opentelemetry/semantic-convention 1.36
5965
span.setAttributes({
6066
...extras,
61-
'http.response.status_code': response.status,
67+
'http.response.status_code': response.statusCode,
6268
...this.prepareHeaders('response', response.headers),
6369
})
64-
span.setStatus({ code: response.status >= 400 ? api.SpanStatusCode.ERROR : api.SpanStatusCode.UNSET })
70+
71+
span.setStatus({
72+
code: response.statusCode >= 400 ? api.SpanStatusCode.ERROR : api.SpanStatusCode.UNSET,
73+
})
6574
}
6675

67-
private prepareHeaders(type: 'request' | 'response', headers: Headers): api.Attributes {
76+
private prepareHeaders(
77+
type: 'request' | 'response',
78+
headers: FetchRequest['headers'] | FetchResponse['headers'],
79+
): api.Attributes {
6880
if (this.config.skipHeaders === true) {
6981
return {}
7082
}
@@ -74,8 +86,9 @@ export class FetchInstrumentation implements Instrumentation {
7486
const everythingSkipped = skips.some((skip) => everything.includes(skip.toString()))
7587
const attributes: api.Attributes = {}
7688
if (everythingSkipped) return attributes
77-
const entries = headers.entries()
78-
for (const [key, value] of entries) {
89+
for (let idx = 0; idx < headers.length; idx = idx + 2) {
90+
const key = headers[idx].toString().toLowerCase()
91+
const value = headers[idx + 1].toString()
7992
if (skips.some((skip) => (typeof skip == 'string' ? skip == key : skip.test(key)))) {
8093
continue
8194
}
@@ -92,6 +105,16 @@ export class FetchInstrumentation implements Instrumentation {
92105
return attributes
93106
}
94107

108+
private getRequestMethod(original: string): string {
109+
const acceptedMethods = ['HEAD', 'GET', 'POST', 'PUT', 'PATCH', 'DELETE']
110+
111+
if (acceptedMethods.includes(original.toUpperCase())) {
112+
return original.toUpperCase()
113+
}
114+
115+
return '_OTHER'
116+
}
117+
95118
private getTracer(): SugaredTracer | undefined {
96119
if (!this.provider) {
97120
return undefined
@@ -105,39 +128,110 @@ export class FetchInstrumentation implements Instrumentation {
105128
return new SugaredTracer(tracer)
106129
}
107130

108-
/**
109-
* patch global fetch
110-
*/
111131
enable(): void {
112-
const originalFetch = _globalThis.fetch
113-
this.originalFetch = originalFetch
114-
_globalThis.fetch = async (resource: RequestInfo | URL, options?: RequestInit): Promise<Response> => {
115-
const url = typeof resource === 'string' ? resource : resource instanceof URL ? resource.href : resource.url
116-
const tracer = this.getTracer()
117-
if (
118-
!tracer ||
119-
this.config.skipURLs?.some((skip) => (typeof skip == 'string' ? url.startsWith(skip) : skip.test(url)))
120-
) {
121-
return await originalFetch(resource, options)
122-
}
132+
// Avoid to duplicate subscriptions
133+
if (this._channelSubs.length > 0) return
134+
135+
// https://undici.nodejs.org/#/docs/api/DiagnosticsChannel?id=diagnostics-channel-support
136+
this.subscribe('undici:request:create', this.onRequestCreate.bind(this))
137+
this.subscribe('undici:request:headers', this.onRequestHeaders.bind(this))
138+
this.subscribe('undici:request:trailers', this.onRequestEnd.bind(this))
139+
this.subscribe('undici:request:error', this.onRequestError.bind(this))
140+
}
123141

124-
return tracer.withActiveSpan('fetch', async (span) => {
125-
const request = new Request(resource, options)
126-
this.annotateFromRequest(span, request)
127-
const response = await originalFetch(request, options)
128-
this.annotateFromResponse(span, response)
129-
return response
130-
})
131-
}
142+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
143+
private subscribe(channelName: string, onMessage: (message: any, name: string | symbol) => void) {
144+
diagnosticsChannel.subscribe(channelName, onMessage)
145+
146+
const unsubscribe = () => diagnosticsChannel.unsubscribe(channelName, onMessage)
147+
this._channelSubs.push({ name: channelName, unsubscribe })
132148
}
133149

134-
/**
135-
* unpatch global fetch
136-
*/
137-
disable(): void {
138-
if (this.originalFetch) {
139-
_globalThis.fetch = this.originalFetch
140-
this.originalFetch = null
150+
disable() {
151+
this._channelSubs.forEach((sub) => {
152+
sub.unsubscribe()
153+
})
154+
this._channelSubs.length = 0
155+
}
156+
157+
private onRequestCreate({ request }: { request: FetchRequest }): void {
158+
const tracer = this.getTracer()
159+
const url = new URL(request.path, request.origin)
160+
161+
if (
162+
!tracer ||
163+
request.method === 'CONNECT' ||
164+
this.config.skipURLs?.some((skip) => (typeof skip == 'string' ? url.href.startsWith(skip) : skip.test(url.href)))
165+
) {
166+
return
141167
}
168+
169+
const span = tracer.startSpan(
170+
this.getRequestMethod(request.method),
171+
{
172+
kind: api.SpanKind.CLIENT,
173+
},
174+
api.context.active(),
175+
)
176+
177+
this.annotateFromRequest(span, request)
178+
179+
this._recordFromReq.set(request, span)
180+
}
181+
182+
private onRequestHeaders({ request, response }: { request: FetchRequest; response: FetchResponse }): void {
183+
const span = this._recordFromReq.get(request)
184+
if (!span) return
185+
186+
this.annotateFromResponse(span, response)
187+
}
188+
189+
private onRequestError({ request, error }: { request: FetchRequest; error: Error }): void {
190+
const span = this._recordFromReq.get(request)
191+
if (!span) return
192+
193+
span.recordException(error)
194+
span.setStatus({
195+
code: api.SpanStatusCode.ERROR,
196+
message: error.message,
197+
})
198+
199+
span.end()
200+
this._recordFromReq.delete(request)
142201
}
202+
203+
private onRequestEnd({ request }: { request: FetchRequest; response: FetchResponse }): void {
204+
const span = this._recordFromReq.get(request)
205+
if (!span) return
206+
207+
span.end()
208+
this._recordFromReq.delete(request)
209+
}
210+
}
211+
212+
interface ListenerRecord {
213+
name: string
214+
unsubscribe: () => void
215+
}
216+
217+
// https://github.com/open-telemetry/opentelemetry-js-contrib/blob/main/packages/instrumentation-undici/src/types.ts
218+
interface FetchRequest {
219+
origin: string
220+
method: string
221+
path: string
222+
headers: string | (string | string[])[]
223+
addHeader: (name: string, value: string) => void
224+
throwOnError: boolean
225+
completed: boolean
226+
aborted: boolean
227+
idempotent: boolean
228+
contentLength: number | null
229+
contentType: string | null
230+
body: unknown
231+
}
232+
233+
interface FetchResponse {
234+
headers: Buffer[]
235+
statusCode: number
236+
statusText: string
143237
}

0 commit comments

Comments
 (0)