Last updated on

Practical engineering I: Signal processing with lazy lists

package audio

This document is a companion to the November 8 CS-214 lecture titled Practical engineering I: Signal processing with lazy lists.

To follow along, download the archive.

The aim of the lecture was to demonstrate a practical use case for lazy lists: representing and manipulating potentially infinite streams of audio samples. In this model, a signal is a LazyList of audio samples, each represented as 16-bit integers. We’ll discuss some of the pros and cons of this choice at the end of this transcript, but for now let’s jump into the code!

The first ~100 lines or so are setup code, so you may want to jump straight to the signal-processing code!

Imports

We start by importing sound packages from Java’s standard library (they are extensively documented in the Java sound programmer guide). Nothing fancy: we’ll just use the core APIs that let us play signals encoded using linear pulse-code modulation.

import javax.sound.sampled.{AudioSystem, AudioFormat, SourceDataLine}

We also import the duration package, which gives us a Duration type to represent time spans. Using this more precise type protects us from mistakes such as forgetting to convert a duration in milliseconds to a number of samples.

import scala.concurrent.duration.*
import scala.language.implicitConversions

Type definitions

We use the following type aliases to improve readability:

type Sample = Double
type QuantizedSample = Short
type Signal = LazyList[Sample]
type Frequency = Double

Scores

The standard format to represent inputs to electronic musical instruments is called MIDI. MIDI is relatively simple, but not simple enough for a 45-minutes lecture, so let’s use a much-simpler format instead:

An Instruction is either a Note, a Chord of multiple notes, or a Silence. All share a duration; in addition, the Note has a single Pitch, and the Chord has multiple pitches.

enum Instruction[+Pitch, +Duration]:
  val duration: Duration
  case Note(pitch: Pitch, duration: Duration)
  case Chord(pitches: Seq[Pitch], duration: Duration)
  case Silence(duration: Duration)
import Instruction.*

Instructions are polymorphic on the representation of pitch and duration. This allows us to write either Note("A4", 1.0) or Note(440, 75.millis) for an A4 note lasting one-eights of a beat (the first representation assumes that 1.0 is one-eight of a beat; the second representation assumes 100 bpm). We call the former representation a scored instruction and the latter a sampled instruction:

object Instruction:
  type Scored = Instruction[String, Double]
  type Sampled = Instruction[Sample, Duration]

A voice is a sequence of instructions, and a score is a collection of melodies played simultaneously:

type Voice = Iterable[Instruction.Scored]
type Score = Iterable[Voice]

Finally, an extension method on the Scored type lets us easily convert between scored and sampled notes:

extension (note: Instruction.Scored)
  def asSampledNote(bpm: Int): Instruction.Sampled =
    val beat = (1000 * 60 / bpm).millis
    val duration = beat * note.duration / 8
    note match
      case Note(pitch, _) =>
        Note(WellTemperedScale.freq(pitch), duration)
      case Chord(pitches, _) =>
        Chord(pitches.map(WellTemperedScale.freq), duration)
      case Silence(_) =>
        Silence(duration)

Audio pipeline

Java uses the SourceDataLine type to represent an audio line ready to process audio data. It exposes a traditional .write(b: Array[Byte]) interface. Given that we want to feed it infinite streams of samples, so we need to wrap it and perform three adjustments:

  1. Quantized the data into Shorts;
  2. Encode it into pairs of Bytes;
  3. Copy it, one buffer at a time, into the line’s internal buffers, using .write().

The LazySourceDataLine below performs these three functions, and additionally offers a convenient interface to initialize a new SourceDataLine:

class LazyLine(val line: SourceDataLine):

Constructors

  /** Construct a `LazyLine` from a format and a buffer size */
  def this(format: AudioFormat, bufferSize: Int) = this {
    val line = AudioSystem.getSourceDataLine(format)
    line.open(format, bufferSize)
    line.start()
    line
  }

  val format = line.getFormat()
  val samplesPerSecond = format.getSampleRate()

Public API

  /** Play all samples from `samples`. */
  def play(samples: Iterable[Sample]) =
    playAllSamples(samples)
    line.pad() // Make sure that we get at least one full buffer

  /** Release all resources held by this line. */
  def release(): Unit =
    line.drain()
    line.stop()
    line.close()

Internals

Copying

  private def playAllSamples(samples: Iterable[Sample]): Unit =
    if !samples.isEmpty then
      playAllSamples(playSomeSamples(samples))

  require(format.getSampleSizeInBits() == 16)
  val sampleSizeBytes = format.getSampleSizeInBits() / 8
  val buffer = new Array[Byte](
    line.getBufferSize() / 4 / sampleSizeBytes * sampleSizeBytes
  )

  private def playSomeSamples(
      samples: Iterable[Sample],
      offset: Int = 0
  ) : Iterable[Sample] =
    if offset == buffer.length || samples.isEmpty then
      line.write(buffer, 0, offset)
      samples
    else
      writeSample(buffer, samples.head, offset)
      playSomeSamples(samples.tail, offset + sampleSizeBytes)

Quantization

  private def debug(msg: String) = {}
  val quantizedMin: QuantizedSample = Short.MinValue
  val quantizedMax: QuantizedSample = Short.MaxValue

  def quantize(f: Sample): QuantizedSample =
    val bf = f * quantizedMax
    if bf < quantizedMin then
      debug("Underflow"); quantizedMin
    else if bf > quantizedMax then
      debug("Overflow"); quantizedMax
    else bf.round.toShort

  def dequantize(b: QuantizedSample): Sample =
    if b == quantizedMin then -1.0
    else b.toDouble / quantizedMax

Encoding

  // The following code assumes a little-endian representation
  require(!format.isBigEndian())

  private def writeSample(buffer: Array[Byte], s: Sample, offset: Int) =
    require(offset + sampleSizeBytes <= buffer.length)
    val qs = quantize(s)
    buffer(offset) = qs.toByte
    buffer(offset + 1) = (qs >> 8).toByte

  private def readSample(buffer: Array[Byte], offset: Int): Sample =
    val qs = (buffer(offset + 1) << 8 | (buffer(offset) & 0xff)).toShort
    dequantize(qs)

  def writeBuffer(samples: Array[Sample]): Array[Byte] =
    val bytes = new Array[Byte](samples.length * sampleSizeBytes)
    for idx <- 0 until samples.length
    do writeSample(bytes, samples(idx), idx * sampleSizeBytes)
    bytes

  def readBuffer(bytes: Array[Byte]): Array[Sample] =
    val samples = new Array[Sample](bytes.length / sampleSizeBytes)
    for idx <- 0 until samples.length
    do samples(idx) = readSample(bytes, idx * sampleSizeBytes)
    samples

Other extensions

  extension (sdl: SourceDataLine)
    /** Pad `sdl`'s buffer to correctly play short sounds. */
    def pad() =
      val padding = Array.fill(sdl.available)(0.toByte)
      sdl.write(padding, 0, padding.length)

  extension (d: Duration)
    /** Convert a duration into a number of samples */
    def toSamples: Int =
      (d.toMicros * samplesPerSecond / 1_000_000).round.toInt

Companion object

object LazyLine:

The following AudioFormat object provides a reasonable default for newly created LazyLines: the sample rate (samplesPerSecond) and bit depth (sampleSizeInBits) are the same as found on a typical compact disc:

  val DEFAULT_FORMAT: AudioFormat =
    val samplesPerSecond = 44100f
    val sampleSizeInBits = 16
    val channels = 1 // Mono
    val signed = true
    val bigEndian = false
    AudioFormat(samplesPerSecond, sampleSizeInBits,
                channels, signed, bigEndian)

  val DEFAULT_BUFFER_SIZE: Int = 32768

Making some noise

Our LazyLine can now play audio streams, but we don’t have any yet! Station below is where the real action begins. The Station constructor takes three arguments. The first one, skipExamples, can be used to skip all calls to play during initialization (this is useful for experimenting with extensions and additional filters, see def main below). The last two, format and bufferSize are used to configure the LazyLine above. The example function is used throughout the body of this class to play example samples:

class Station(
    skipExamples: Boolean = true,
    format: AudioFormat = LazyLine.DEFAULT_FORMAT,
    bufferSize: Int = LazyLine.DEFAULT_BUFFER_SIZE,
) extends LazyLine(format, bufferSize):

  /** Display `message` before playing all samples from `samples`. */
  def example(message: String)(samples: => Iterable[Sample]) =
    if skipExamples then
      println(f"$message: skipped")
    else
      println(message)
      play(samples)

Finite signals

The simplest way to play audio is to read it from a file; our first function does just that:

  def wav(path: String): Signal =
    val input = AudioSystem.getAudioInputStream(java.io.File(path))
    val converted = AudioSystem.getAudioInputStream(format, input)
    readBuffer(converted.readAllBytes()).to(LazyList)

This function creates a finite Signal; we can use it to play a simple audio file:

  example("wav"):
    wav("rymann.wav")

… or the same in reverse:

  val yodl = wav("rymann.wav")
  example("reverse"):
    yodl.reverse

(The : syntax lets us pass an argument to a single-argument function without parentheses.)

Now that we have signals, we can apply simple transformations to them. The first one is to mix two signals, with configurable relative intensities (from 0 to 1):

  def mix2(samples1: Signal, samples2: Signal, ratio: Double = 0.5) =
    samples1
      .zipAll(samples2, 0.0, 0.0)
      .map(_ * ratio + _ * (1 - ratio))
  example("mix2"):
    mix2(yodl, yodl.reverse)

(Using zipAll ensures that we keep playing until both signals complete.)

The sound up to this point sounds a bit flat, so let’s add some echo for a more genuine mountain feel. We can leverage mix2 to create an echo by mixing the signal with a delayed copy of itself. To do this, we define four small functions:

  def constant(nSamples: Int, level: Sample) =
    LazyList.fill(nSamples)(level)

  def silence(duration: Duration) =
    constant(duration.toSamples, 0.0)

  def delay(offset: Duration)(samples: Signal) =
    silence(offset) ++ samples

  def echo(offset: Duration)(samples: Signal) =
    mix2(samples, delay(offset)(samples), 0.7)

With that we get a much more satisfactory mountain vibe, with a bit more complexity to the sound:

  example("echo"):
    echo(1500.millis):
      yodl

… or as much complexity as we want, in fact:

  example("echos"):
    echo(600.millis):
      echo(1500.millis):
        echo(500.millis):
          echo(650.millis):
            yodl

Speeding up or slowing down a signal is trivial in our model, as long as we don’t intend to maintain the pitch (changing speed without affecting pitch is a much trickier problem, often called time scale modification).

To slow down, we simply duplicate samples:

  def slower(factor: Int)(samples: Signal) =
    samples.flatMap(s => List.fill(factor)(s))
  example("slower"):
    slower(2):
      yodl

Exercise: Interpolation

A higher-quality alternative would be to interpolate between existing points. Implement slower2_sliding using Scala’s sliding function.

Solution
  def slower2_sliding(samples: Signal) =
    samples.take(1) #::: samples.sliding(2).to(LazyList).flatMap {
      case LazyList(w0, w1) => List((w0 + w1) / 2, w1)
    }
  example("slower2"):
    slower2_sliding:
      yodl

We can speed up signals in a similar fashion, this time by dropping signals instead of duplicating them:

  def faster(invFactor: Int)(samples: Signal) =
    samples
      .zipWithIndex.filter((s, idx) => idx % invFactor != 0)
      .map(_._1)
  example("faster"):
    faster(2):
      yodl

… and again here mixing gives us a richer polyphony:

  example("polyphony"):
    mix2(
      yodl ++ yodl,
      mix2(
        slower(2)(yodl),
        { val fast = faster(2)(yodl)
          fast ++ fast ++ fast ++ fast }
      )
    )

Exercise: Additional finite transformations

Take the time to experiment with these and other transformations on finite signals. How would you speed up or slow down a signal by a non-integer factor? What does echo with a very small delay produce? What happens if you produce samples outside of the -1 .. 1 range?

Infinite signals

Lazy lists really shine once we start considering infinite signals. Such signals can be generated in various ways: recursion (which corresponds to a physical feedback loop), repetition of a finite signal (a simple loop), transformations over existing streams, etc. The simplest, of course, is the plain loop, which takes an arbitrary signal and repeats it (loop has no effect on infinite signals):

  def loop(samples: Signal): Signal =
    samples #::: loop(samples)

This function is not tail recursive, yet it will not cause a stack overflow. Can you see why?

Solution

The #::: delays the evaluation of the call to loop(samples), so that it is not evaluated while the initial call to loop executes. This is similar in spirit to a technique called trampolining.

Oscillators

The core primitive of additive synthesis is the oscillator: a period signal that repeats indefinitely. The wavelength of an oscillator is the inverse of its frequency:

  def waveLength(frequency: Frequency): Int =
    (samplesPerSecond / frequency).round.toInt

Lazy lists make it trivial to define various oscillators. To do so, we define a finite signal corresponding to a single period, or a pulse, of the oscillator, then loop it. We can define a sawtooth:

  def linear(nSamples: Int, first: Sample, last: Sample) =
    (0 until nSamples)
      .map(first + _ * (last - first) / (nSamples - 1))
      .to(LazyList)

  def sawtoothPulse(frequency: Frequency): Signal =
    linear(waveLength(frequency), -1.0, 1.0)

  def sawtooth(frequency: Frequency) =
    loop(sawtoothPulse(frequency))

… and play it. The truncate function below is not strictly necessary, but without it the sound would keep playing continuously… and we would never play the other examples!

  def truncate(duration: Duration)(samples: Signal) =
    samples.take(duration.toSamples)
  example("sawtooth"):
    truncate(1.seconds):
      sawtooth(440)

… or a triangle:

  def trianglePulse(frequency: Frequency): Signal =
    val λ = waveLength(frequency)
    linear(λ / 2, -1.0, 1.0) ++ linear(λ - λ / 2 + 1, 1.0, -1.0).tail

  def triangle(frequency: Frequency) =
    loop(trianglePulse(frequency))
  example("triangle"):
    truncate(1.seconds):
      triangle(440)

… or a square:

  def squarePulse(frequency: Frequency): Signal =
    val λ = waveLength(frequency)
    constant(λ / 2, -0.6) ++ constant(λ - λ / 2, 0.6)

  def square(frequency: Frequency) =
    loop(squarePulse(frequency))
  example("square"):
    truncate(1.seconds):
      square(440)

… or a sine:

  def sinePulse(frequency: Frequency): Signal =
    val λ = waveLength(frequency)
    (0 until λ)
      .map(i => math.sin(2 * math.Pi * i / λ))
      .to(LazyList)

  def sine(frequency: Frequency) =
    loop(sinePulse(frequency))
  example("sine"):
    truncate(1.seconds):
      sine(440)

… or even a random pulse, which sounds very metallic:

  def randomSample: Sample =
    (math.random() * 2 - 1)

  def randomPulse(frequency: Frequency): Signal =
    LazyList.fill(waveLength(frequency))(randomSample)

  def random(frequency: Frequency) =
    loop(randomPulse(frequency))
  example("random"):
    truncate(1.seconds):
      random(440)

Note that this random oscillator is not the same as a white noise machine: the periodicity of the signal causes the human hear to perceive a pitch. The following, in contrast, is heard as plain noise:

  def randomNoise: Signal =
    LazyList.continually(randomSample)

Variable pitch

The loop function repeats the same pulse over and over and hence maintains a constant pitch. What happens if we vary the frequency on each iteration instead?

  example("slide-whistle"):
    (880 to 44 by -1).flatMap(f => sinePulse(f))

  def slide(pulse: Frequency => Signal)(pitch: Signal) =
    pitch.flatMap(pulse)
  example("slide"):
    slide(trianglePulse):
      LazyList(144d, 216d, 324d, 486d, 486d, 486d).flatMap: f =>
        sinePulse(f).map(660 + _ * 440)

Envelopes

These notes can be sequenced by truncating them, as done by truncate above, and concatenating them… but the result is not pleasant: going from 0 to a full-amplitude signal, or vice versa, produces an audible distortion that corresponds to the unwanted frequencies generated by the abrupt transition. Instead, synthesizers use envelopes to shape the signal more smoothly. In our setting, an envelope is simply a signal varying between 0 and 1, and applying an envelope to a signal is done by sample-wise multiplication:

  def envelope(envelope: Signal)(samples: Signal) =
    envelope.zip(samples).map(_ * _)

Here is how we may define a fade-in effect using this envelope function:

  def fadeIn(duration: Duration)(samples: Signal) =
    val nSamples = duration.toSamples
    val (faded, full) = samples.splitAt(nSamples)
    envelope(linear(nSamples, 0, 1))(faded) ++ full

Applying a fade-out that cuts the signal off after a certain duration is similarly easy, either linearly like above or with an exponential decay:

  def exponential(nSamples: Int, first: Sample, last: Sample) =
    (0 until nSamples).map(idx =>
      val t = (idx - first) / (nSamples - 1.0)
      val decay = math.pow(2, 1 - t) - 1 // 1 to 0
      last + (first - last) * decay
    ).to(LazyList)

  def fadeOut(duration: Duration)(samples: Signal) =
    val nSamples = duration.toSamples
    envelope(exponential(nSamples, 1, 0))(samples)
Exercise: Fade-out

Defining a fade-in is easy, because we already know when to start fading in: right from the beginning! The fade-out above is similarly easy, since it similarly begins at $t = 0$. More flexible would be a fade-out of duration $d_f$ that begins at time $d_s - d_f$ for a signal of duration $d_s$. Define such a fade-out (and note that it should leave infinite signals unchanged).

Solution
  import scala.collection.immutable.Queue
  def fadeOutAtEnd(duration: Duration)(samples: Signal): Signal =
    val nSamples = duration.toSamples
    def loop(prefix: Queue[Sample], suffix: Signal): Signal =
      suffix match
        case LazyList() =>
          envelope(linear(math.min(nSamples, prefix.size), 1, 0)):
            prefix.to(LazyList)
        case shd #:: stl =>
          prefix :+ shd match // Works even is nSamples == 0
            case phd +: ptl if ptl.length >= nSamples =>
              phd #:: loop(ptl, stl)
            case p => loop(p, stl)
    loop(Queue.empty, samples)

Typical synthesizers usually implement a variant of the ADSR pattern, which stands for “Attack, decay, sustain, release”:

ADSR

The function ADSR below replicates this pattern. Note that the implementation is not a simple concatenation of each of the four phases: the envelope has to be truncated carefully to support total durations shorter than the sum of attack, decay, and sustain (this, in turn, requires care to ensure continuity at the beginning of the release phase):

  def adsrEnvelope(totalDuration: Duration,
                   a: Duration, d: Duration, s: Sample, r: Duration) =
    require(totalDuration >= r)
    val ads = truncate(totalDuration - r):
      linear(a.toSamples, 0, 1) ++
        linear(d.toSamples, 1, s) ++
        constant(totalDuration.toSamples, s)
    ads ++ linear(r.toSamples, ads.last, 0)

  def adsr(totalDuration: Duration,
           a: Duration, d: Duration, s: Sample, r: Duration)
          (samples: Signal) =
    envelope(adsrEnvelope(totalDuration, a, d, s, r)):
      samples

With this envelope, we can now get much more pleasant notes…

  example("adsr-sine"):
    adsr(2.seconds, 600.millis, 400.millis, 0.3, 500.millis):
      sine(440)

  example("adsr-all"):
    Seq(sawtooth, triangle, square, sine, random).flatMap: oscillator =>
      adsr(150.millis, 30.millis, 20.millis, 0.6, 30.millis):
        oscillator(440)

Take the time to experiment with the definition of this synthesizer: how does changing the level of the sustain, or the duration of the attack, decay, and release affect the sound? For example, try to reduce the attack and release to 5.millis, lengthen the decay to 200.millis, and reduce the sustain level to 0.2:

  example("adsr-short"):
    Seq(sawtooth, triangle, square, sine, random).flatMap: oscillator =>
      adsr(150.millis, 5.millis, 200.millis, 0.2, 5.millis):
        oscillator(440)

With these components in hand, we can define a simple synthesizer instrument, parametrized on an oscillator…

  def synth(oscillator: Frequency => Signal)
           (duration: Duration, frequency: Frequency) =
    adsr(duration, 30.millis, 20.millis, 0.7, 30.millis):
      oscillator(frequency)

… and play sequences of notes! At the bottom of this file is the definition of a well-tempered scale, which maps each note name to a frequency (e.g. WellTemperedScale.freq("A4") is 440, the “la” in the middle of an 88-keys piano keyboard):

  example("scale"):
    WellTemperedScale.standardScale
      .map(WellTemperedScale.freq)
      .flatMap(synth(sine)(300.millis, _))

Sequencing

Now that we have a simple instrument, we can introduce a sequencer: a tool that drives one or multiple digital instruments to play a digital music score. In our case, since we called our individual notes, silences, and chords “Instructions”, we’ll call our sequencer an interpreter. To program it, we first need a way to mix more than two voices:

  def sum(signals: Seq[Signal]): Signal =
    val nonEmpty = signals.filter(!_.isEmpty)
    if nonEmpty.isEmpty then LazyList()
    else nonEmpty.view.map(_.head).sum #:: sum(nonEmpty.map(_.tail))

  def mix(signals: Iterable[Signal]): Signal =
    val scale = 1.0 / signals.size
    sum(signals.map(_.map(_ * scale)).toSeq)

  def mix(signals: Signal*): Signal =
    mix(signals)

… then need a way to sequence a single note:

  def interpNote(instrument: (Duration, Frequency) => Signal)
                (n: Instruction.Sampled): Signal =
    n match
      case Note(level, duration) =>
        instrument(duration, level)
      case Chord(levels, duration) =>
        mix(levels.map(level => instrument(duration, level)))
      case Silence(duration) =>
        silence(duration)

… and we can finally sequence complete scores!

  def interpScore(instrument: (Duration, Frequency) => Signal)
                 (tracks: Score, bpm: Int) =
    mix:
      tracks.map: track =>
        track.to(LazyList)
          .map(_.asSampledNote(bpm))
          .flatMap(interpNote(instrument))

Further below in this file is a collection of scores, which we can play through interpScore; I’ve paired them up with experiments on the additional instruments that we defined above:

  example("mario"):
    interpScore(synth(square))(Scores.marioTheme, 100)

Additional instruments

Harmonics

Using a higher-order function for the instrument in interpScore lets us easily try out different instruments. For example, we can give our synthesizer a fuller sound by adding additional harmonics to its base frequencies; adjusting the shape of the envelope gives a more “piano” feel to the sound:

  def piano(oscillator: Frequency => Signal)
           (duration: Duration, frequency: Frequency) =
    val harmonics = 1 to 4
    val amplitudes = (1 to harmonics.size).map(f => Math.pow(2, 1 - f))
    val totalAmplitude = amplitudes.sum
    sum:
      harmonics.zip(amplitudes).map { case (h, a) =>
        adsr(duration, 5.millis, 4.seconds, 0.2, 5.millis):
          oscillator(h * frequency).map(_ * a / totalAmplitude)
      }
  example("furElise"):
    interpScore(piano(sine))(Scores.furElise, 80)

Bells

Continuing on the idea of harmonics, we can model a bell, for example, by combining a collection of slightly dissonant frequencies (proper bell synthesis would require a lot more, but this is a start!):

  def bell(duration: Duration, frequency: Frequency) =
    val harmonics  =
      Seq(.56, .92, 1.19, 1.71, 2.0, 2.74, 3.0, 3.76, 4.07)
    val amplitudes =
      Seq(.41, .54, 0.92, 0.68, 0.85, 0.22, 0.61, 0.33, 0.41)
    val totalAmplitude = amplitudes.sum
    sum:
      harmonics.zip(amplitudes).zipWithIndex.map { case ((h, a), idx) =>
        val decay = duration / (1 + idx)
        val release = duration - decay
        adsr(duration, 0.millis, decay, 0.1, release):
          sine(h * frequency).map(_ * a / totalAmplitude)
      }
  example("bell"):
    bell(4.seconds, 440)

Notice how smoothly the bell function plugs into into the sequencer:

  example("bell-marioTheme"):
    interpScore(bell)(Scores.marioTheme, 100)

Plucked strings

And as a final experiment, here is a Scala implementation of a variant of the Karplus-Strong string synthesis algorithm.

  def ks0(frequency: Frequency)(tone: Signal): Signal =
    tone.take(waveLength(frequency)) #::: ks(frequency):
      tone.zip(tone.tail).map((p, d) => (p + d) / 2)

The decay parameter controls the speed at which the sound decays: lower values cause faster decay (anything below 0.95 or so decays almost instantly; values above 1.0 are not advisable).

The original algorithm uses a feedback loop to average a signal with a delayed copy of itself. In our model, this feedback loop materializes as recursion. When we apply it to a high-energy signal, like random noise, we get a sound very close to a plucked metallic string:

  example("plucked-string"):
    fadeOut(5.seconds):
      ks0(440)(randomNoise)

When we apply it to other signals, we get a more “bouncy” version of them:

  example("karplus-strong"):
    List(randomPulse, square, triangle, sine)
      .flatMap(osc => fadeOut(750.millis)(ks0(440)(osc(440))))

If you look carefully at the implementation (or experiment with it by hand, on paper), you’ll notice that it consumes more and more of the original signal, leading to increasing complexity at each iteration and eventually a stack overflow. An alternative implementation, more faithful to the original algorithm, is as follows:

  def ks(frequency: Frequency)(tone: Signal): Signal =
    def loop(last: Sample)(wt: Vector[Sample]): Signal =
      wt.to(LazyList) #::: loop(wt.last):
        wt.zip(last +: wt).map((p, d) => (p + d) / 2)
    val wavetable = tone.tail.take(waveLength(frequency)).to(Vector)
    loop(tone.head)(wavetable)

  def karplusStrong(oscillator: Frequency => Signal)
                   (duration: Duration, frequency: Frequency) =
    fadeIn(1.millis):
      fadeOut(duration):
        ks(frequency)(oscillator(frequency))

  val harpsicord = karplusStrong(randomPulse)

This version sounds just the same as the one above. One of the delightful things in this algorithm is that playing the same note twice produces very slightly different harmonics, because the initial samples noise varies:

  example("karplus-strong-variation"):
    (1 to 10).flatMap: _ =>
      harpsicord(500.millis, 440)

When combined, the notes produced sound almost realistic!

  example("arpeggio"):
    mix:
      List("C4", "E4", "G4", "C5").zipWithIndex.map: (note, idx) =>
        silence(idx * 500.millis) ++
        harpsicord(3.5.second, WellTemperedScale.freq(note))

… (it gets even more interesting if we stack many notes:)

  example("bach-arpeggio"):
    sum:
      List(
        "C4", "E4", "G4", "C5", "E5", "G4", "C5", "E5",
        "C4", "E4", "G4", "C5", "E5", "G4", "C5", "E5",
        "C4", "D4", "A4", "D5", "F5", "A4", "D5", "F5",
        "C4", "D4", "A4", "D5", "F5", "A4", "D5", "F5",
        "B3", "D4", "G4", "D5", "F5", "G4", "D5", "F5",
        "B3", "D4", "G4", "D5", "F5", "G4", "D5", "F5",
        "C4", "E4", "G4", "C5", "E5", "G4", "C5", "E5",
        "C4", "E4", "G4", "C5", "E5", "G4", "C5", "E5",
      ).zipWithIndex.map: (note, idx) =>
        silence(idx * 200.millis) ++
        harpsicord(4.seconds, WellTemperedScale.freq(note)).map(_ / 20)

Thanks to the flexibility of interpScore, it is again trivial to use this new instrument with our sequencer:

  example("rondo"):
    interpScore(harpsicord)(Scores.rondo, 100)

The algorithm is explained in detail in two excellent papers, Karplus & Strong 1983 and Jaffe & Smith 1983. These papers include details on why the algorithm works, how to chose parameters, how to adjust the duration of the sound, and the math behind the results.

Blending consecutive sounds

The main remaining issue at this point is the fact that our sequencer doesn’t support overlapping notes, except if they come for two separate tracks. While this is a reasonable assumption for, e.g., a piano, it doesn’t work so well for a harpsicord, whose strings should keep resonating while the musician continues to play. What we need instead if a sequencer that allows notes to echo for some time:

  extension (note: Instruction.Sampled)
    def copy(d: Duration) =
      note match
        case Note(pitch, duration) => Note(pitch, d)
        case Chord(pitches, duration) => Chord(pitches, d)
        case Silence(duration) => Silence(0.millis)

  def interpWithEcho(instrument: (Duration, Frequency) => Signal)
                    (tracks: Score, bpm: Int, echo: Duration) =
    def loop(voice: Voice, delay: Int, signals: List[Signal]): Signal =
      if voice.isEmpty then
        sum(signals).map(_ / 4)
      else
        val nonEmpty = signals.filter(!_.isEmpty)
        val sums = sum(nonEmpty) #::: LazyList.continually(0d)
        val prefix = sums.take(delay).map(_ / 4)
        val suffixes = nonEmpty.map(_.drop(delay))
        prefix #::: {
          val note = voice.head.asSampledNote(bpm)
          val lengthened = note.copy(note.duration.max(echo))
          loop(voice.tail,
               note.duration.toSamples,
               interpNote(instrument)(lengthened) :: suffixes)
        }
    sum:
      tracks.to(Seq).map(tr => loop(tr, 0, Nil))

Compare the result with the previous one:

  example("rondo-echo"):
    interpWithEcho(harpsicord)(Scores.rondo, 100, 750.millis)

  example("furElise-echo"):
    interpWithEcho(karplusStrong(triangle))
                  (Scores.furElise, 80, 500.millis)

Further experiments

If you want to experiment further, you can download this code and add your own filters either to the body of Station or in def main below. You’ll probably want to switch skipExamples to true in that case, so that constructing the Station object doesn’t play all the examples above!

@main def entryPoint: Unit =
  val line = Station(skipExamples = false)
  try main(line)
  finally line.release()

def main(line: Station): Unit =
  import line.*

  // Include your own code here after setting `skipExamples` to `true`.
  play:
    interpWithEcho(karplusStrong(sine))
                  (Scores.rondo, 120, 350.millis)

Concluding remarks

Lazy lists are not the way in which audio is typically represented. Instead, one would usually rather use either a function that fills an buffer (usually an array), or a Java-style Stream, or an observer-style Signal (we will see those in more detail later in the class). These approaches beat lazy lists on performance and potentially memory usage, but they have severe downsides:

Appendix I: The well-tempered scale

The calculations below compute the frequencies of each note of the well-tempered scale; they are used by the sequencer defined above.

object WellTemperedScale:
  private val NOTES =
    for note <- "C,C#=Db,D,D#=Eb,E,F,F#=Gb,G,G#=Ab,A,A#=Bb,B".split(",")
    yield note.split("=").toList

  private val aIdx = NOTES.indexOf(List("A"))

  private def scale(a4: Frequency, offset: Int = 4)
    : Map[String, Frequency] =
    (for (names, idx) <- NOTES.zipWithIndex
         expt = (offset - 4) + (idx - aIdx) * 1.0 / NOTES.size
         freq = a4 * math.pow(2.0, expt)
         name <- names
     yield f"$name$offset" -> freq).toMap

  private def scales(a4: Frequency)
    : Map[String, Frequency] =
    (for offset <- 0 to 8
         (k, v) <- scale(a4, offset)
     yield k -> v).toMap

  private val freqs = scales(440)
  def freq(note: String): Frequency =
    freqs(note)

  val standardScale =
    List("C4", "D4", "E4", "F4", "G4", "A4", "B4", "C5")

Appendix II: Scores

The constants below encode a few famous tunes into the simplified input format of our sequencer:

object Scores:
  val N = Note
  val S = Silence
  val C = Chord

  val marioTheme: Score = Seq(
    List(
      N("E5", 2), N("E5", 4), N("E5", 2), S(2),
        N("C5", 2), N("E5", 4), N("G5", 2), S(6), N("G4", 2), S(6),
      N("C5", 4), S(2), N("G4", 2), S(4), N("E4", 4), S(2),
        N("A4", 2), S(2), N("B4", 2), S(2), N("Bb4", 2), N("A4", 4),
      N("G4", 8f/3), N("E5", 8f/3), N("G5", 8f/3),
        N("A5", 4), N("F5", 2), N("G5", 2), S(2),
        N("E5", 2), S(2), N("C5", 2), N("D5", 2), N("B4", 2),
        S(4)
    )
  )

  val furElise: Score = Seq(
    List(
      N("E5", 2), N("D#5", 2),

      N("E5", 2), N("D#5", 2), N("E5", 2),
        N("B4", 2), N("D5", 2), N("C5", 2),
      N("A4", 4), S(2), N("C4", 2), N("E4", 2), N("A4", 2),
      N("B4", 4), S(2), N("E4", 2), N("G#4", 2), N("B4", 2),
      N("C5", 4), S(2), N("E4", 2), N("E5", 2), N("D#5", 2),
      N("E5", 2), N("D#5", 2), N("E5", 2),
        N("B4", 2), N("D5", 2), N("C5", 2),
      N("A4", 4), S(2), N("C4", 2), N("E4", 2), N("A4", 2),
      N("B4", 4), S(2), N("D4", 2), N("C5", 2), N("B4", 2),

      N("A4", 4), S(2), N("B4", 2), N("C5", 2), N("D5", 2),
      N("E5", 6), N("G4", 2), N("F5", 2), N("E5", 2),
      N("D5", 6), N("F4", 2), N("E5", 2), N("D5", 2),
      N("C5", 6), N("E4", 2), N("D5", 2), N("C5", 2),
      N("B4", 4), S(2), N("E4", 2), N("E5", 2), N("E4", 2),
      N("E5", 2), N("E5", 2), N("E6", 2),
        N("D#5", 2), N("E5", 2), N("D#5", 2),
      N("E5", 2), N("D#5", 2), N("E5", 2),
        N("B4", 2), N("D5", 2), N("C5", 2),
      N("A4", 8)
    ),
    List(
      S(4),

      S(12),
      N("A2", 2), N("E3", 2), N("A3", 2), S(2), S(4),
      N("E2", 2), N("E3", 2), N("G#3", 2), S(2), S(4),
      N("A2", 2), N("E3", 2), N("A3", 2), S(2), S(4),
      S(4), S(4), S(4),
      N("A2", 2), N("E3", 2), N("A3", 2), S(2), S(4),
      N("E2", 2), N("E3", 2), N("G#3", 2), S(2), S(4),

      N("A2", 2), N("E3", 2), N("A3", 2), S(2), S(4),
      N("C3", 2), N("G3", 2), N("C4", 2), S(2), S(4),
      N("G2", 2), N("G3", 2), N("B3", 2), S(2), S(4),
      N("A2", 2), N("E3", 2), N("A3", 2), S(2), S(4),
      N("E2", 2), N("E3", 2), N("E4", 2), S(6),
      S(12),
      S(12),
      S(12),
      S(8)
    )
  )

  val rondo: Score = Seq(
    List(
      N("B4", 2), N("A4", 2), N("G#4", 2), N("A4", 2),
      N("C5", 2), S(6), N("D5", 2), N("C5", 2), N("B4", 2), N("C5", 2),
      N("E5", 2), S(6), N("F5", 2), N("E5", 2), N("D#5", 2), N("E5", 2),
      N("B5", 2), N("A5", 2), N("G#5", 2), N("A5", 2),
        N("B5", 2), N("A5", 2), N("G#5", 2), N("A5", 2),
      N("C6", 8), N("A5", 1), S(3), N("C6", 1), S(3),

      N("B5", 1), S(3), C(Seq("A5", "F#5"), 1), S(3),
        C(Seq("G5", "E5"), 1), S(3), C(Seq("A5", "F#5"), 1), S(3),
      N("B5", 1), S(3), C(Seq("A5", "F#5"), 1), S(3),
        C(Seq("G5", "E5"), 1), S(3), C(Seq("A5", "F#5"), 1), S(3),
      N("B5", 1), S(3), C(Seq("A5", "F#5"), 1), S(3),
        C(Seq("G5", "E5"), 1), S(3), C(Seq("F#5", "D#5"), 1), S(3),
      N("E5", 8)
    ),
    List(
      S(8),

      N("A3", 4), C(Seq("C4", "E4"), 1), S(3),
        C(Seq("C4", "E4"), 1), S(3),
        C(Seq("C4", "E4"), 1), S(3),
      N("A3", 4), C(Seq("C4", "E4"), 1), S(3),
        C(Seq("C4", "E4"), 1), S(3),
        C(Seq("C4", "E4"), 1), S(3),
      N("A3", 1), S(3), C(Seq("C4", "E4"), 1), S(3),
        N("A3", 1), S(3), C(Seq("C4", "E4"), 1), S(3),
      N("A3", 4), C(Seq("C4", "E4"), 1), S(3),
        C(Seq("C4", "E4"), 1), S(3),
        C(Seq("C4", "E4"), 1), S(3),

      N("E3", 1), S(3), C(Seq("B3", "E4"), 1), S(3),
        C(Seq("B3", "E4"), 1), S(3),
        C(Seq("B3", "E4"), 1), S(3),
      N("E3", 1), S(3), C(Seq("B3", "E4"), 1), S(3),
        C(Seq("B3", "E4"), 1), S(3),
        C(Seq("B3", "E4"), 1), S(3),
      N("E3", 1), S(3), C(Seq("B3", "E4"), 1), S(3),
        N("B2", 1), S(3),
        N("B3", 1), S(3),
      N("E3", 8)
    )
  )

Copyright (c) SYSTEMF, EPFL, 2023

Not for distribution outside of EPFL.