Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/sip/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package sip

const signalLoggingFeatureFlag = "sip.signal_logging"
5 changes: 5 additions & 0 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math"
"net/netip"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -864,6 +865,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
disp.RingingTimeout = defaultRingingTimeout
}
disp.Room.JitterBuf = c.jitterBuf
disp.Room.LogSignalChanges, _ = strconv.ParseBool(disp.FeatureFlags[signalLoggingFeatureFlag])
ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration)
defer cancel()
status := CallRinging
Expand Down Expand Up @@ -956,12 +958,15 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit
return nil, err
}

logSignalChanges := false
logSignalChanges, _ = strconv.ParseBool(featureFlags[signalLoggingFeatureFlag])
mp, err := NewMediaPort(tid, c.log(), c.mon, &MediaOptions{
IP: c.s.sconf.MediaIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
MediaTimeout: c.s.conf.MediaTimeout,
EnableJitterBuffer: c.jitterBuf,
LogSignalChanges: logSignalChanges,
Stats: &c.stats.Port,
NoInputResample: !RoomResample,
}, RoomSampleRate)
Expand Down
18 changes: 18 additions & 0 deletions pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ type MediaOptions struct {
EnableJitterBuffer bool
NoInputResample bool
IgnorePreanswerData bool
LogSignalChanges bool
}

func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
Expand Down Expand Up @@ -321,6 +322,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor,
mediaTimeout: mediaTimeout,
timeoutResetTick: make(chan time.Duration, 1),
jitterEnabled: opts.EnableJitterBuffer,
logSignalChanges: opts.LogSignalChanges,
port: newUDPConn(log, conn),
audioOut: msdk.NewSwitchWriter(sampleRate),
audioIn: msdk.NewSwitchWriter(inSampleRate),
Expand Down Expand Up @@ -357,6 +359,7 @@ type MediaPort struct {
stats *PortStats
dtmfAudioEnabled bool
jitterEnabled bool
logSignalChanges bool

mu sync.Mutex
conf *MediaConf
Expand Down Expand Up @@ -769,6 +772,13 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error {
if p.stats != nil {
audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples)
}
if p.logSignalChanges {
audioOut, err = NewSignalLogger(p.log, "mixed", audioOut)
if err != nil {
audioOut.Close() // need to close since it's not linked to the port yet
return err
}
}

if p.conf.Audio.DTMFType != 0 {
p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate)
Expand Down Expand Up @@ -802,6 +812,14 @@ func (p *MediaPort) setupInput() {
if p.stats != nil {
audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples)
}
if p.logSignalChanges {
signalLogger, err := NewSignalLogger(p.log, "input", audioWriter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious.. why don't we return error for the setupInput path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't want to change too much, and the function currently doesn't support erroring out. Using the original audioWriter in that case doesn't actually degrade anything.

if err != nil {
p.log.Errorw("failed to create signal logger", err)
} else {
audioWriter = signalLogger
}
}
audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type)
// Wrap the decoder with silence suppression handler to fill gaps during silence suppression
audioHandler = newSilenceFiller(audioHandler, audioWriter, codecInfo.RTPClockRate, codecInfo.SampleRate, p.log)
Expand Down
4 changes: 4 additions & 0 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"net"
"sort"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -90,6 +91,7 @@ type outboundCall struct {
}

func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig, state *CallState, projectID string) (*outboundCall, error) {
signalLoggingEnabled, _ := strconv.ParseBool(sipConf.featureFlags[signalLoggingFeatureFlag])
if sipConf.maxCallDuration <= 0 || sipConf.maxCallDuration > maxCallDuration {
sipConf.maxCallDuration = maxCallDuration
}
Expand All @@ -98,6 +100,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi
}
jitterBuf := SelectValueBool(conf.EnableJitterBuffer, conf.EnableJitterBufferProb)
room.JitterBuf = jitterBuf
room.LogSignalChanges = signalLoggingEnabled

tr := TransportFrom(sipConf.transport)
contact := c.ContactURI(tr)
Expand Down Expand Up @@ -142,6 +145,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi
MediaTimeoutInitial: c.conf.MediaTimeoutInitial,
MediaTimeout: c.conf.MediaTimeout,
EnableJitterBuffer: call.jitterBuf,
LogSignalChanges: signalLoggingEnabled,
Stats: &call.stats.Port,
NoInputResample: !RoomResample,
IgnorePreanswerData: true,
Expand Down
27 changes: 19 additions & 8 deletions pkg/sip/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,14 @@ type ParticipantConfig struct {
}

type RoomConfig struct {
WsUrl string
Token string
RoomName string
Participant ParticipantConfig
RoomPreset string
RoomConfig *livekit.RoomConfiguration
JitterBuf bool
WsUrl string
Token string
RoomName string
Participant ParticipantConfig
RoomPreset string
RoomConfig *livekit.RoomConfiguration
JitterBuf bool
LogSignalChanges bool
}

func NewRoom(log logger.Logger, st *RoomStats) *Room {
Expand Down Expand Up @@ -321,7 +322,17 @@ func (r *Room) Connect(conf *config.Config, rconf RoomConfig) error {
defer log.Infow("track closed")
defer mTrack.Close()

codec, err := opus.Decode(mTrack, channels, log)
var out msdk.PCM16Writer = mTrack
if rconf.LogSignalChanges {
var err error
out, err = NewSignalLogger(log, track.ID(), out)
if err != nil {
log.Errorw("cannot create signal logger", err)
return
}
}

codec, err := opus.Decode(out, channels, log)
if err != nil {
log.Errorw("cannot create opus decoder", err)
return
Expand Down
195 changes: 195 additions & 0 deletions pkg/sip/signal_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sip

import (
"fmt"
"math"
"time"

msdk "github.com/livekit/media-sdk"
"github.com/livekit/protocol/logger"
)

const (
// DefaultInitialNoiseFloorDB is the default noise floor in dBFS.
DefaultInitialNoiseFloorDB = -50
// DefaultHangoverDuration is how long we stay in "signal" after level drops below exit threshold.
DefaultHangoverDuration = 1 * time.Second
// DefaultEnterVoiceOffsetDB is the default offset above noise floor to enter voice (hysteresis high).
DefaultEnterVoiceOffsetDB = 10
// DefaultExitVoiceOffsetDB is the default offset above noise floor to exit voice (hysteresis low).
DefaultExitVoiceOffsetDB = 5

// minDBFS clamps very quiet frames to avoid -inf in dBFS.
minDBFS = -100
)

// SignalLogger keeps internal state of whether we're in voice or silence, using RMS → dBFS
// and a fixed noise floor with hysteresis. It implements msdk.PCM16Writer and logs state changes.
type SignalLogger struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider moving it to media-sdk, since it's not SIP-specific.

// Configuration
log logger.Logger
next msdk.PCM16Writer
name string
hangoverDuration time.Duration
noiseFloor float64 // Noise floor in dBFS (fixed, not adaptive).
enterVoiceOffsetDB float64 // Offset above noise floor to enter voice (hysteresis high).
exitVoiceOffsetDB float64 // Offset above noise floor to exit voice (hysteresis low).

// State
lastSignalTime time.Time
lastIsSignal bool

// Stats
framesProcessed uint64
stateChanges uint64
}

type SignalLoggerOption func(*SignalLogger) error

// WithNoiseFloor sets the noise floor in dBFS (e.g. -40). Must be >= minDBFS.
func WithNoiseFloor(noiseFloorDB float64) SignalLoggerOption {
return func(s *SignalLogger) error {
if noiseFloorDB < minDBFS {
return fmt.Errorf("noise floor must be >= %g dBFS, got %g", float64(minDBFS), noiseFloorDB)
}
s.noiseFloor = noiseFloorDB
return nil
}
}

func WithHangoverDuration(hangoverDuration time.Duration) SignalLoggerOption {
return func(s *SignalLogger) error {
if hangoverDuration <= 0 {
return fmt.Errorf("hangover duration must be positive, got %s", hangoverDuration)
}
s.hangoverDuration = hangoverDuration
return nil
}
}

// WithEnterVoiceOffsetDB sets the offset (dB) above noise floor to enter voice. Default is DefaultEnterVoiceOffsetDB.
func WithEnterVoiceOffsetDB(db float64) SignalLoggerOption {
return func(s *SignalLogger) error {
if db <= 0 {
return fmt.Errorf("enterVoiceOffsetDB must be positive, got %g", db)
}
s.enterVoiceOffsetDB = db
return nil
}
}

// WithExitVoiceOffsetDB sets the offset (dB) above noise floor to exit voice. Default is DefaultExitVoiceOffsetDB.
func WithExitVoiceOffsetDB(db float64) SignalLoggerOption {
return func(s *SignalLogger) error {
if db <= 0 {
return fmt.Errorf("exitVoiceOffsetDB must be positive, got %g", db)
}
s.exitVoiceOffsetDB = db
return nil
}
}

func NewSignalLogger(log logger.Logger, name string, next msdk.PCM16Writer, options ...SignalLoggerOption) (msdk.PCM16Writer, error) {
s := &SignalLogger{
log: log,
next: next,
name: name,
hangoverDuration: DefaultHangoverDuration,
noiseFloor: DefaultInitialNoiseFloorDB,
enterVoiceOffsetDB: DefaultEnterVoiceOffsetDB,
exitVoiceOffsetDB: DefaultExitVoiceOffsetDB,
}
for _, option := range options {
if err := option(s); err != nil {
return next, err
}
}
return s, nil
}

func (s *SignalLogger) String() string {
return fmt.Sprintf("SignalLogger(%s) -> %s", s.name, s.next.String())
}

func (s *SignalLogger) SampleRate() int {
return s.next.SampleRate()
}

func (s *SignalLogger) Close() error {
if s.stateChanges > 0 {
s.log.Infow("signal logger closing", "name", s.name, "stateChanges", s.stateChanges)
}
return s.next.Close()
}

// rmsToDBFS computes RMS of the frame then converts to dBFS: 10*log10(mean(square)/MAX^2).
// Uses math.MaxInt16 (32767) as reference. Returns a value <= 0; silence approaches -inf, so we clamp to minDBFS.
func (s *SignalLogger) rmsToDBFS(sample msdk.PCM16Sample, minDBFS float64) float64 {
if len(sample) == 0 {
return minDBFS
}
var sumSq int64
for _, v := range sample {
x := int64(v)
sumSq += x * x
}
meanSq := float64(sumSq) / float64(len(sample))
refSq := float64(math.MaxInt16) * float64(math.MaxInt16)
if meanSq <= 0 {
return minDBFS
}
db := 10 * math.Log10(meanSq/refSq)
if db < minDBFS {
return minDBFS
}
return db
}

func (s *SignalLogger) WriteSample(sample msdk.PCM16Sample) error {
currentDB := s.rmsToDBFS(sample, minDBFS)

enterThreshold := s.noiseFloor + s.enterVoiceOffsetDB
exitThreshold := s.noiseFloor + s.exitVoiceOffsetDB

now := time.Now()
aboveEnter := currentDB > enterThreshold
belowExit := currentDB < exitThreshold

if aboveEnter {
s.lastSignalTime = now
}

s.framesProcessed++
if s.framesProcessed <= 10 {
s.lastIsSignal = aboveEnter
return s.next.WriteSample(sample)
}

if aboveEnter && !s.lastIsSignal {
s.lastIsSignal = true
s.stateChanges++
s.log.Infow("signal changed", "name", s.name, "signal", true, "stateChanges", s.stateChanges, "dBFS", currentDB, "noiseFloor", s.noiseFloor)
} else if belowExit && s.lastIsSignal {
if now.Sub(s.lastSignalTime) >= s.hangoverDuration {
s.lastIsSignal = false
s.stateChanges++
s.log.Infow("signal changed", "name", s.name, "signal", false, "stateChanges", s.stateChanges, "dBFS", currentDB, "noiseFloor", s.noiseFloor)
}
}

return s.next.WriteSample(sample)
}
Loading