Skip to content

NoTIPswe/notip-data-consumer

Repository files navigation

notip-data-consumer

Quality Gate Status Coverage

Go microservice in the NoTIP platform. Consumes encrypted telemetry from NATS JetStream, persists it to TimescaleDB, and tracks gateway liveness to fire offline alerts.

Responsibilities

  1. Subscribe to telemetry.data.> via a JetStream durable consumer and write every message verbatim to TimescaleDB (Rule Zero — encrypted blobs are never decoded server-side).
  2. Maintain an in-memory heartbeat map; on each periodic tick detect gateways that have exceeded their configured offline timeout.
  3. On offline transition: query the Management API for the gateway's lifecycle state (paused gateways are silenced); otherwise publish a gw_offline alert to JetStream and call the Management API via NATS Request-Reply.
  4. On online recovery (first-seen or Offline→Online): call the Management API to mark the gateway online.
  5. Subscribe to gateway.decommissioned.> and remove decommissioned gateways from the heartbeat map.
  6. Periodically refresh per-tenant alert configurations from the Management API.

NATS Subjects

Subject Direction Description
telemetry.data.{tenantId}.{gwId} Gateway → consumer Encrypted telemetry envelope
alert.{tenantId}.gw_offline Consumer → JetStream Gateway-offline alert
gateway.decommissioned.{tenantId}.{gwId} Management API → consumer Decommission broadcast
internal.mgmt.alert-configs.list Consumer → Mgmt API (RR) Fetch alert configurations
internal.mgmt.gateway.get-status Consumer → Mgmt API (RR) Query gateway lifecycle state
internal.mgmt.gateway.update-status Consumer → Mgmt API (RR) Report gateway online/offline

Configuration

All configuration is loaded from environment variables at startup. Missing required variables cause an immediate crash.

Variable Required Default Description
NATS_URL yes NATS server URL (tls://…)
NATS_TLS_CA yes Path to CA certificate
NATS_TLS_CERT yes Path to client certificate
NATS_TLS_KEY yes Path to client private key
DB_HOST yes TimescaleDB host
DB_NAME yes Database name
DB_USER yes Database user
DB_PASSWORD_FILE yes Path to file containing the DB password (Docker secret)
DB_PORT no 5432
DB_MAX_CONNS no 10 pgxpool max connections
DB_MIN_CONNS no 2 pgxpool min connections
DB_SSL_MODE no require PostgreSQL TLS mode (disable, require, verify-ca, verify-full)
DB_SSL_ROOT_CERT no* Path to CA certificate for DB TLS (required when DB_SSL_MODE is verify-ca or verify-full)
NATS_CONSUMER_DURABLE_NAME no data-consumer-telemetry JetStream durable consumer name
NATS_CONNECT_TIMEOUT_SECONDS no 10
GATEWAY_BUFFER_SIZE no 1000 Status update dispatch buffer
HEARTBEAT_TICK_MS no 10000 Liveness check interval
HEARTBEAT_GRACE_PERIOD_MS no 120000 Startup window before offline alerts fire
ALERT_CONFIG_REFRESH_MS no 120000 Alert config cache refresh interval
ALERT_CONFIG_DEFAULT_TIMEOUT_MS no 60000 Fallback offline timeout
ALERT_CONFIG_MAX_RETRIES no 10 Max retries for initial alert config fetch
ALERT_CONFIG_INITIAL_BACKOFF_MS no 1000 Initial backoff interval for alert config retries
ALERT_CONFIG_MAX_BACKOFF_MS no 30000 Maximum backoff interval for alert config retries
METRICS_ADDR no :9090 Prometheus /metrics endpoint

Running tests

# Unit tests
go test ./internal/...

# Integration tests (requires Docker)
go test -tags integration -timeout 5m ./tests/integration/...

# Coverage (unit + integration)
go test -tags integration -coverprofile=cover.out -coverpkg=./internal/... ./internal/... ./tests/integration/...
go tool cover -html=cover.out

The integration suite spins up ephemeral NATS JetStream (mTLS, verify_and_map) and TimescaleDB containers via testcontainers-go. No external infrastructure is required.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors