Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions _test/system_stations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,33 +47,30 @@ func (ts *systemTest) testStations(t *testing.T) {

stations, err := cli.GetStations()
assert.NoError(t, err)
assert.Equal(t, 11, len(stations))
assert.True(t, len(stations) == 11 || len(stations) == 10)

st := ts.StationManager.Get("station-009")
assert.NotNil(t, st)

var wg sync.WaitGroup
go func() {
wg.Add(1)
defer wg.Done()

<-time.After(10 * time.Second)
// stop a couple stations
t.Logf("Stopping station-09")
st := ts.StationManager.Get("station-009")
assert.NotNil(t, st)
if st != nil {
st.Stop()
}

st.Stop()
<-time.After(5 * time.Minute)
t.Logf("Checking to insure station-009 has been expired")
assert.Equal(t, 11, len(stations))
}()
wg.Wait()

// reststations, err := cli.GetStations()
// assert.NoError(t, err)
// assert.Equal(t, 10, len(reststations))
t.Logf("Checking to insure station-009 has been expired")
assert.Equal(t, 10, len(stations))
Comment on lines +65 to +68
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test logic is incorrect. The stations variable is obtained before the goroutine waits and stops station-009. The assertion on line 68 checks the length of stations which was captured earlier and won't reflect the state after the station is stopped. The assertion should check the length of a freshly fetched stations list after the goroutine completes, not the stale stations variable from line 48.

Copilot uses AI. Check for mistakes.

st = ts.StationManager.Get("station-009")
assert.Nil(t, st)
}()
// reststations, err := cli.GetStations()
// assert.NoError(t, err)
// assert.Equal(t, 10, len(reststations))

wg.Wait()
st = ts.StationManager.Get("station-009")
assert.Nil(t, st)
}
8 changes: 4 additions & 4 deletions cmd/ottoctl/cmd_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func runLog(cmd *cobra.Command, args []string) error {
fmt.Fprintln(cmdOutput, "otto client failed to retrieve log config", err)
return err
}
fmt.Fprintf(cmdOutput, "Output: %s\n", lc.Output)
fmt.Fprintf(cmdOutput, "Format: %s\n", lc.Format)
fmt.Fprintf(cmdOutput, "FilePath: %s\n", lc.FilePath)
fmt.Fprintf(cmdOutput, "Buffer: %s\n", lc.Buffer)
fmt.Fprintf(cmdOutput, "\t Output: %s\n", lc.Output)
fmt.Fprintf(cmdOutput, "\t Format: %s\n", lc.Format)
fmt.Fprintf(cmdOutput, "\tFilePath: %s\n", lc.FilePath)
fmt.Fprintf(cmdOutput, "\t Buffer: %s\n", lc.Buffer)
return nil
}
22 changes: 11 additions & 11 deletions messenger/messenger_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,17 @@ func TestServeHTTPUnknownPathStillHandled(t *testing.T) {
}

func TestConnMQTTConnectEmptyBroker(t *testing.T) {
c := &connMQTT{}
err := c.Connect("", "", "")
assert.Error(t, err, "expected error when connecting with empty broker address")
c := &connMQTT{}
err := c.Connect("", "", "")
assert.Error(t, err, "expected error when connecting with empty broker address")
}

func TestConnMQTTConnectUnreachableHost(t *testing.T) {
c := &connMQTT{}
broker := "127.0.0.1:65535"
err := c.Connect(broker, "user", "pass")
if err == nil {
t.Skipf("unexpectedly connected to broker at %s in test environment; skipping", broker)
}
assert.Error(t, err, "expected error when connecting to unreachable broker")
}
c := &connMQTT{}
broker := "127.0.0.1:65535"
err := c.Connect(broker, "user", "pass")
if err == nil {
t.Skipf("unexpectedly connected to broker at %s in test environment; skipping", broker)
}
assert.Error(t, err, "expected error when connecting to unreachable broker")
}
6 changes: 4 additions & 2 deletions otto.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,12 @@ func (o *OttO) Init() {
}

o.Server.Register("/version", o)
o.Server.Register("/api/log", o.LogConfig)
o.Server.Register("/api/shutdown", o)
o.Server.Register("/api/topics", messenger.GetTopics())
o.Server.Register("/api/stations", o.StationManager)
o.Server.Register("/api/stats", &utils.Stats{})
o.Server.Register("/api/log", o.LogConfig)
o.Server.Register("/api/timers", utils.GetTickers())
o.Server.Register("/api/topics", messenger.GetTopics())
}

// Start the OttO process, TODO return a stop channel or context?
Expand Down
5 changes: 0 additions & 5 deletions station/station_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/rustyeddy/otto/messenger"
"github.com/rustyeddy/otto/server"
"github.com/rustyeddy/otto/utils"
)

Expand Down Expand Up @@ -69,10 +68,6 @@ func (sm *StationManager) HandleMsg(msg *messenger.Msg) error {
}

func (sm *StationManager) Start() {

srv := server.GetServer()
srv.Register("/api/stations", sm)

msgr := messenger.GetMessenger()
msgr.Sub("o/d/+/hello", sm.HandleMsg)

Expand Down
90 changes: 85 additions & 5 deletions utils/timers.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
package utils

import "time"
import (
"encoding/json"
"log/slog"
"net/http"
"sync"
"time"
)

// Ticker is a wrapper around time.Ticker it is given a name, it hold
// Ticker is a wrapper around time.Ticker it is given a name, it holds
// the duration and kept in a map indexed by name such that it is easy
// to lookup to shutdown or reset
type Ticker struct {
Name string
*time.Ticker
Func func(t time.Time)

mu sync.RWMutex
lastTick time.Time
ticks int
active bool
}

// Tickers is a map of all active tickers indexed by name
type Tickers map[string]*Ticker

var (
// Start time is the time otto started
StartTime time.Time

// the map with all our tickers
tickers = make(map[string]*Ticker)
tickers = make(Tickers)
)

func init() {
Expand All @@ -34,28 +45,34 @@ func Timestamp() time.Duration {

// NewTicker creates a time.Ticker with the name n that will fire
// every d time.Duration. The function f will be called every time
// ticker goes off. The ticker can be stoped, restarted and reset
// ticker goes off. The ticker can be stopped, restarted and reset
// with a different duration
func NewTicker(n string, d time.Duration, f func(t time.Time)) *Ticker {
t := &Ticker{
Name: n,
Ticker: time.NewTicker(d),
Func: f,
active: true,
}

tickers[n] = t
go func() {
for tick := range t.Ticker.C {
t.mu.Lock()
t.lastTick = time.Now()
t.ticks++
t.mu.Unlock()
f(tick)
}
t.mu.Lock()
t.active = false
t.mu.Unlock()
}()
return t
}

// GetTickers will return the map of all ticker values.
func GetTickers() map[string]*Ticker {
func GetTickers() Tickers {
return tickers
}

Expand All @@ -64,3 +81,66 @@ func GetTicker(n string) *Ticker {
t, _ := tickers[n]
return t
}

// TickerInfo holds the JSON-serializable ticker information
type TickerInfo struct {
Name string `json:"name"`
LastTick time.Time `json:"last_tick"`
Ticks int `json:"ticks"`
Active bool `json:"active"`
}

// ServeHTTP implements http.Handler to return ticker information as JSON.
// It returns the ticker's name, last tick time, total tick count, and active status.
func (t *Ticker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

t.mu.RLock()
info := TickerInfo{
Name: t.Name,
LastTick: t.lastTick,
Ticks: t.ticks,
Active: t.active,
}
t.mu.RUnlock()

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(info); err != nil {
slog.Error("Failed to encode ticker info", "error", err, "ticker", t.Name)
Comment on lines +111 to +113
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The header and status code should be set after successful encoding, not before. If json.NewEncoder().Encode() fails, the response will have a 200 OK status even though an error occurred and no valid JSON was written. Consider setting the header but deferring the WriteHeader call until after successful encoding, or handle the error by writing an error response.

Suggested change
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(info); err != nil {
slog.Error("Failed to encode ticker info", "error", err, "ticker", t.Name)
data, err := json.Marshal(info)
if err != nil {
slog.Error("Failed to marshal ticker info", "error", err, "ticker", t.Name)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
if _, err := w.Write(data); err != nil {
slog.Error("Failed to write ticker info response", "error", err, "ticker", t.Name)

Copilot uses AI. Check for mistakes.
return
}
Comment on lines +112 to +115
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling after json.NewEncoder().Encode() fails is incomplete. The error is logged but no HTTP error response is sent to the client. The client will receive a 200 OK status with an incomplete or malformed JSON body. Consider using http.Error() to send a proper error response with status 500 after logging the error.

Copilot uses AI. Check for mistakes.
}

// ServeHTTP implements http.Handler to return a list of all tickers as JSON.
// It returns an array of ticker information for all registered tickers.
func (ts Tickers) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

// Collect all ticker information
tickerList := make([]TickerInfo, 0, len(ts))
for _, t := range ts {
t.mu.RLock()
info := TickerInfo{
Name: t.Name,
LastTick: t.lastTick,
Ticks: t.ticks,
Active: t.active,
}
t.mu.RUnlock()
tickerList = append(tickerList, info)
}
Comment on lines +126 to +138
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Tickers.ServeHTTP method iterates over the map without any synchronization of the map itself. While each individual Ticker's mutex protects its fields, the map iteration is not protected. If a new ticker is added or removed while this map is being iterated, it can cause a race condition or panic. The map should be protected with a mutex during iteration, or a copy/snapshot of the map should be made under lock before iteration.

Copilot uses AI. Check for mistakes.

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
Comment on lines +140 to +141
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The header and status code should be set after successful encoding, not before. If json.NewEncoder().Encode() fails, the response will have a 200 OK status even though an error occurred and no valid JSON was written. Consider setting the header but deferring the WriteHeader call until after successful encoding, or handle the error by writing an error response.

Copilot uses AI. Check for mistakes.
if err := json.NewEncoder(w).Encode(tickerList); err != nil {
slog.Error("Failed to encode tickers list", "error", err)
return
}
Comment on lines +142 to +145
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling after json.NewEncoder().Encode() fails is incomplete. The error is logged but no HTTP error response is sent to the client. The client will receive a 200 OK status with an incomplete or malformed JSON body. Consider using http.Error() to send a proper error response with status 500 after logging the error.

Copilot uses AI. Check for mistakes.
}
Loading