Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
346 changes: 337 additions & 9 deletions mmm_audio/Filters.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@ struct SVF[num_chans: Int = 1](Representable, Movable, Copyable):

@doc_private
@always_inline
fn _compute_coeficients[filter_type: Int64](self, frequency: SIMD[DType.float64, self.num_chans], q: SIMD[DType.float64, self.num_chans], gain_db: SIMD[DType.float64, self.num_chans]) -> (SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans]):
"""Compute filter coeficients based on type and parameters.
fn _compute_coefficients[filter_type: Int64](self, frequency: SIMD[DType.float64, self.num_chans], q: SIMD[DType.float64, self.num_chans], gain_db: SIMD[DType.float64, self.num_chans]) -> (SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans]):
"""Compute filter coefficients based on type and parameters.

Parameters:
filter_type: The type of filter to compute coeficients for.
filter_type: The type of filter to compute coefficients for.

Args:
frequency: The cutoff/center frequency of the filter.
Expand Down Expand Up @@ -219,15 +219,15 @@ struct SVF[num_chans: Int = 1](Representable, Movable, Copyable):
else:
k = 1.0 / q

# Get mix coeficients based on filter type
var mix_coefs = self._get_mix_coeficients[filter_type](k, A)
# Get mix coefficients based on filter type
var mix_coefs = self._get_mix_coefficients[filter_type](k, A)

return (g, k, mix_coefs[0], mix_coefs[1], mix_coefs[2])

@doc_private
@always_inline
fn _get_mix_coeficients[filter_type: Int64](self, k: SIMD[DType.float64, num_chans], A: SIMD[DType.float64, self.num_chans]) -> (SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans]):
"""Get mixing coeficients for different filter types"""
fn _get_mix_coefficients[filter_type: Int64](self, k: SIMD[DType.float64, num_chans], A: SIMD[DType.float64, self.num_chans]) -> (SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans], SIMD[DType.float64, self.num_chans]):
"""Get mixing coefficients for different filter types"""

mc0 = SIMD[DType.float64, self.num_chans](1.0)
mc1 = SIMD[DType.float64, self.num_chans](0.0)
Expand Down Expand Up @@ -277,7 +277,7 @@ struct SVF[num_chans: Int = 1](Representable, Movable, Copyable):
The next sample of the filtered output.
"""

var coefs = self._compute_coeficients[filter_type](frequency, q, gain_db)
var coefs = self._compute_coefficients[filter_type](frequency, q, gain_db)
var g = coefs[0]
var k = coefs[1]
var mix_a = coefs[2]
Expand Down Expand Up @@ -1068,4 +1068,332 @@ fn tf2s[num_chans: Int = 1](coeffs: List[SIMD[DType.float64, num_chans]], mut co
coeffs_out[1] = b1d
coeffs_out[2] = b2d
coeffs_out[3] = a1d
coeffs_out[4] = a2d
coeffs_out[4] = a2d


@doc_private
struct BiquadModes:
"""Enumeration of different Biquad Filter modes.

This makes specifying a filter type more readable. For example,
to specify a lowpass filter, use `BiquadModes.lowpass`.

| Mode | Value |
|----------|-------|
| lowpass | 0 |
| bandpass | 1 |
| highpass | 2 |
| notch | 3 |
| peak | 4 |
| allpass | 5 |
| bell | 6 |
| lowshelf | 7 |
| highshelf| 8 |
"""
alias lowpass: Int64 = 0
alias bandpass: Int64 = 1
alias highpass: Int64 = 2
alias notch: Int64 = 3
alias peak: Int64 = 4
alias allpass: Int64 = 5
alias bell: Int64 = 6
alias lowshelf: Int64 = 7
alias highshelf: Int64 = 8

struct Biquad[num_chans: Int = 1](Representable, Movable, Copyable):
"""A Biquad filter struct.

To use the different modes, see the mode-specific methods.

Based on [Robert Bristow-Johnson's Audio EQ Cookbook](https://webaudio.github.io/Audio-EQ-Cookbook/audio-eq-cookbook.html).

Parameters:
num_chans: Number of SIMD channels to process in parallel.
"""

# Direct Form I state
var x1: SIMD[DType.float64, num_chans]
var x2: SIMD[DType.float64, num_chans]
var y1: SIMD[DType.float64, num_chans]
var y2: SIMD[DType.float64, num_chans]

var sample_rate: Float64

fn __init__(out self, world: UnsafePointer[MMMWorld]):
"""Initialize the Biquad.

Args:
world: Pointer to the MMMWorld.
"""
self.x1 = SIMD[DType.float64, num_chans](0.0)
self.x2 = SIMD[DType.float64, num_chans](0.0)
self.y1 = SIMD[DType.float64, num_chans](0.0)
self.y2 = SIMD[DType.float64, num_chans](0.0)
self.sample_rate = world[].sample_rate

fn __repr__(self) -> String:
return String("Biquad")

fn reset(mut self):
"""Reset internal state of the filter."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to say a bit more about this. Why might a user need to use this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset function matches SVF description... should both be improved in a separate PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think only Biquad should be improved right here in this PR and an issue should be opened to also improve SVF.

self.x1 = SIMD[DType.float64, num_chans](0.0)
self.x2 = SIMD[DType.float64, num_chans](0.0)
self.y1 = SIMD[DType.float64, num_chans](0.0)
self.y2 = SIMD[DType.float64, num_chans](0.0)

@doc_private
@always_inline
fn _compute_coefficients[filter_type: Int64](self, frequency: SIMD[DType.float64, self.num_chans], q: SIMD[DType.float64, self.num_chans], gain_db: SIMD[DType.float64, self.num_chans]) -> (
SIMD[DType.float64, self.num_chans], #b0
SIMD[DType.float64, self.num_chans], #b1
SIMD[DType.float64, self.num_chans], #b2
SIMD[DType.float64, self.num_chans], #a1
SIMD[DType.float64, self.num_chans] #a2
):
"""Compute filter coefficients based on type and parameters.

Parameters:
filter_type: The type of filter to compute coefficients for.

Args:
frequency: The cutoff/center frequency of the filter.
q: The resonance (Q factor) of the filter.
gain_db: The gain in decibels for filters that use it.

Returns:
A tuple containing (b0, b1, b2, a1, a2).
"""

# Compute A (gain factor)
var A: SIMD[DType.float64, self.num_chans] = pow(SIMD[DType.float64, self.num_chans](10.0), gain_db / 40.0)

# Compute normalized digital frequency
var w0: SIMD[DType.float64, self.num_chans] = 2.0 * pi * frequency / self.sample_rate
var cosw0: SIMD[DType.float64, self.num_chans] = cos(w0)
var sinw0: SIMD[DType.float64, self.num_chans] = sin(w0)

# Alpha term
var alpha: SIMD[DType.float64, self.num_chans] = sinw0 / (2.0 * q)

# Unnormalized coefficients
var b0 = SIMD[DType.float64, self.num_chans](0.0)
var b1 = SIMD[DType.float64, self.num_chans](0.0)
var b2 = SIMD[DType.float64, self.num_chans](0.0)
var a0 = SIMD[DType.float64, self.num_chans](0.0)
var a1 = SIMD[DType.float64, self.num_chans](0.0)
var a2 = SIMD[DType.float64, self.num_chans](0.0)

@parameter
if filter_type == BiquadModes.lowpass:
b1 = (1.0 - cosw0) # doing this first saves some calculations
b0 = b1 * 0.5
b2 = b0
a0 = 1 + alpha
a1 = -2.0 * cosw0
a2 = 1 - alpha
elif filter_type == BiquadModes.highpass:
b0 = (1.0 + cosw0) * 0.5
b1 = -(1.0 + cosw0)
b2 = b0
a0 = 1 + alpha
a1 = -2.0 * cosw0
a2 = 1 - alpha
elif filter_type == BiquadModes.bandpass:
b0 = sinw0 * 0.5
b1 = 0.0
b2 = -b0
a0 = 1 + alpha
a1 = -2.0 * cosw0
a2 = 1.0 - alpha
elif filter_type == BiquadModes.peak:
b0 = alpha
b1 = 0.0
b2 = -alpha
a0 = 1 + alpha
a1 = -2.0 * cosw0
a2 = 1 - alpha
elif filter_type == BiquadModes.notch:
b0 = 1.0
b1 = -2.0 * cosw0
b2 = 1.0
a0 = 1.0 + alpha
a1 = b1
a2 = 1 - alpha
elif filter_type == BiquadModes.allpass:
b0 = 1.0 - alpha
b1 = -2.0 * cosw0
b2 = 1.0 + alpha
a0 = b2
a1 = b1
a2 = b0
elif filter_type == BiquadModes.bell:
b0 = 1.0 + (alpha * A)
b1 = -2.0 * cosw0
b2 = 1.0 - (alpha * A)
a0 = 1.0 + (alpha / A)
a1 = -2.0 * cosw0
a2 = 1.0 - (alpha / A)
elif filter_type == BiquadModes.lowshelf:
var Ap1 = A + 1.0
var Am1 = A - 1.0
var twoSqrtA = 2.0 * sqrt(A) * alpha

b0 = A * (Ap1 - Am1 * cosw0 + twoSqrtA)
b1 = 2.0 * A * (Am1 - Ap1 * cosw0)
b2 = A * (Ap1 - Am1 * cosw0 - twoSqrtA)
a0 = (Ap1 + Am1 * cosw0 + twoSqrtA)
a1 = -2.0 * (Am1 + Ap1 * cosw0)
a2 = (Ap1 + Am1 * cosw0 - twoSqrtA)
elif filter_type == BiquadModes.highshelf:
var Ap1 = A + 1.0
var Am1 = A - 1.0
var twoSqrtA = 2.0 * sqrt(A) * alpha

b0 = A * (Ap1 + Am1 * cosw0 + twoSqrtA)
b1 = -2.0 * A * (Am1 + Ap1 * cosw0)
b2 = A * (Ap1 + Am1 * cosw0 - twoSqrtA)
a0 = (Ap1 - Am1 * cosw0 + twoSqrtA)
a1 = 2.0 * (Am1 - Ap1 * cosw0)
a2 = (Ap1 - Am1 * cosw0 - twoSqrtA)

# Normalize so a0 == 1
b0 /= a0
b1 /= a0
b2 /= a0
a1 /= a0
a2 /= a0

return (b0, b1, b2, a1, a2)

@doc_private
@always_inline
fn next[filter_type: Int64](
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans],
gain_db: SIMD[DType.float64, self.num_chans] = SIMD[DType.float64, self.num_chans](0.0)
) -> SIMD[DType.float64, self.num_chans]:
"""Process one sample through the biquad filter of the given type.

Args:
input: The next input value to process.
frequency: The cutoff/center frequency of the filter.
q: The resonance (Q factor) of the filter.
gain_db: The gain in decibels for filters that use it.

Returns:
The next SIMD sample of the filtered output.
"""
var coefs = self._compute_coefficients[filter_type](frequency, q, gain_db)
var b0 = coefs[0]
var b1 = coefs[1]
var b2 = coefs[2]
var a1 = coefs[3]
var a2 = coefs[4]

# Direct Form I:
# y[n] = b0*x[n] + b1*x[n-1] + b2*x[n-2] - a1*y[n-1] - a2*y[n-2]
var y = b0 * input + b1 * self.x1 + b2 * self.x2 - a1 * self.y1 - a2 * self.y2

# Update state
self.x2 = self.x1
self.x1 = input
self.y2 = self.y1
self.y1 = y

return sanitize(y)


@always_inline
fn lpf(
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans]
) -> SIMD[DType.float64, self.num_chans]:
"""Lowpass biquad"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each one of these (lpf, hpf, etc) is going to need "Args:" and "Return:" in the docstring.

return self.next[BiquadModes.lowpass](input, frequency, q)

@always_inline
fn hpf(
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans]
) -> SIMD[DType.float64, self.num_chans]:
"""Highpass biquad"""
return self.next[BiquadModes.highpass](input, frequency, q)

@always_inline
fn bpf(
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans]
) -> SIMD[DType.float64, self.num_chans]:
"""Bandpass (constant‑skirt gain; q sets peak gain)"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"constant-skirt gain" and "q sets peak gain" can be explained or there can be link to where a user can learn what it means. Or if this information is trivial, then it should just be removed.

The issue is that a user might come to this and see "constant-skirt gain" and think, "hmm do I want that? let me find the 'non'-constant-skirt gain to compare. oh there isn't one?..." and now the user is off on some confusing sidequest rather than making art. So if details like this are important to include, it's good for them to not be dangling pointers.

return self.next[BiquadModes.bandpass](input, frequency, q)

@always_inline
fn peak(
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans]
) -> SIMD[DType.float64, self.num_chans]:
"""Bandpass (constant‑peak gain = 0 dB; not a peaking EQ)"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should not be called "peak" since the docstring says "not a peaking EQ". it is a confusing fn.

return self.next[BiquadModes.peak](input, frequency, q)

@always_inline
fn notch(
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans]
) -> SIMD[DType.float64, self.num_chans]:
"""Notch (band‑reject) biquad"""
return self.next[BiquadModes.notch](input, frequency, q)

@always_inline
fn allpass(
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans]
) -> SIMD[DType.float64, self.num_chans]:
"""Allpass (flat magnitude; phase‑only)"""
return self.next[BiquadModes.allpass](input, frequency, q)

@always_inline
fn bell(
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans],
gain_db: SIMD[DType.float64, self.num_chans]
) -> SIMD[DType.float64, self.num_chans]:
"""Peaking EQ"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See "fn peak" above. Now this one is called "bell" but is explained as "peaking". it will look like a typo to users.

return self.next[BiquadModes.bell](input, frequency, q, gain_db)

@always_inline
fn lowshelf(
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans],
gain_db: SIMD[DType.float64, self.num_chans]
) -> SIMD[DType.float64, self.num_chans]:
"""Low‑shelf (q sets shelf transition)"""
return self.next[BiquadModes.lowshelf](input, frequency, q, gain_db)

@always_inline
fn highshelf(
mut self,
input: SIMD[DType.float64, self.num_chans],
frequency: SIMD[DType.float64, self.num_chans],
q: SIMD[DType.float64, self.num_chans],
gain_db: SIMD[DType.float64, self.num_chans]
) -> SIMD[DType.float64, self.num_chans]:
"""High‑shelf (q sets shelf transition)"""
return self.next[BiquadModes.highshelf](input, frequency, q, gain_db)