Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions config.ini.example
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,21 @@ THROTTLE_INTERVAL = 0
#OBIS_POWER_L1 = 0100240700ff
#OBIS_POWER_L2 = 0100380700ff
#OBIS_POWER_L3 = 01004c0700ff

#[ENVOY]
## Enphase Envoy local API powermeter
## Reads grid power from /production.json?details=1 (net-consumption).
## Requires grid CTs installed on the Envoy.
#HOST = envoy.local
## Static JWT token (optional if username/password/serial are provided)
#TOKEN =
## Enlighten cloud credentials for automatic token refresh (optional)
#USERNAME =
#PASSWORD =
#SERIAL =
## Number of phases (1 or 3)
#PHASES = 1
## Set False only if you get certificate errors on a trusted LAN (insecure).
#VERIFY_SSL = False
## Per-powermeter throttling override (optional)
#THROTTLE_INTERVAL = 0
19 changes: 19 additions & 0 deletions src/b2500_meter/config/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
SHELLY_SECTION = "SHELLY"
TASMOTA_SECTION = "TASMOTA"
SHRDZM_SECTION = "SHRDZM"
ENVOY_SECTION = "ENVOY"
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The __init__.py export is missing from this PR. Needs from .envoy import Envoy and an "Envoy" entry in __all__. Also missing a README configuration subsection.

EMLOG_SECTION = "EMLOG"
IOBROKER_SECTION = "IOBROKER"
HOMEASSISTANT_SECTION = "HOMEASSISTANT"
Expand Down Expand Up @@ -149,6 +150,8 @@ def create_powermeter(
return create_tasmota_powermeter(section, config)
elif section.startswith(SHRDZM_SECTION):
return create_shrdzm_powermeter(section, config)
elif section.startswith(ENVOY_SECTION):
return create_envoy_powermeter(section, config)
elif section.startswith(EMLOG_SECTION):
return create_emlog_powermeter(section, config)
elif section.startswith(IOBROKER_SECTION):
Expand Down Expand Up @@ -448,3 +451,19 @@ def create_sma_energy_meter_powermeter(
config.getint(section, "SERIAL_NUMBER", fallback=0),
config.get(section, "INTERFACE", fallback=""),
)


def create_envoy_powermeter(
section: str, config: configparser.ConfigParser
) -> Powermeter:
from b2500_meter.powermeter.envoy import Envoy

return Envoy(
host=config.get(section, "HOST", fallback=""),
token=config.get(section, "TOKEN", fallback=""),
phases=config.getint(section, "PHASES", fallback=1),
verify_ssl=config.getboolean(section, "VERIFY_SSL", fallback=False),
username=config.get(section, "USERNAME", fallback=""),
password=config.get(section, "PASSWORD", fallback=""),
serial=config.get(section, "SERIAL", fallback=""),
)
152 changes: 152 additions & 0 deletions src/b2500_meter/powermeter/envoy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from .base import Powermeter
import requests
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The codebase migrated to asyncio after this PR was opened. All powermeter I/O now uses aiohttp.ClientSession. synchronous requests will block the event loop. This affects both the local Envoy calls and the Enlighten token flow. See shelly.py for the pattern.

import urllib3
import logging

logger = logging.getLogger("b2500-meter")


def _find_measurement(entries, measurement_type):
"""Find a measurement block by type in the Envoy production/consumption array."""
for entry in entries:
if (
entry.get("measurementType") == measurement_type
or entry.get("type") == measurement_type
):
return entry
return None


ENLIGHTEN_LOGIN_URL = "https://enlighten.enphaseenergy.com/login/login.json"
ENLIGHTEN_TOKEN_URL = "https://entrez.enphaseenergy.com/tokens"


def obtain_token(username, password, serial):
"""Obtain a JWT token from the Enphase Enlighten cloud API.

1. Authenticate with username/password to get a session_id
2. Use session_id + serial to obtain a JWT token for local Envoy access
"""
# Step 1: Login to Enlighten
login_resp = requests.post(
ENLIGHTEN_LOGIN_URL,
data={"user[email]": username, "user[password]": password},
timeout=30,
)
login_resp.raise_for_status()
login_data = login_resp.json()
session_id = login_data.get("session_id")
if not session_id:
raise ValueError(
f"Enlighten login failed: no session_id in response "
f"(message: {login_data.get('message', 'unknown')})"
)

# Step 2: Get token
token_resp = requests.post(
ENLIGHTEN_TOKEN_URL,
json={
"session_id": session_id,
"serial_num": serial,
"username": username,
},
timeout=30,
)
token_resp.raise_for_status()
token = token_resp.text.strip()
if not token or token.startswith("{"):
raise ValueError(f"Enlighten token request failed: {token[:200]}")

logger.info("Obtained new Envoy token from Enlighten cloud")
return token


class Envoy(Powermeter):
"""Powermeter backend that reads grid power from an Enphase Envoy.

Connects to the Envoy's local API at /production.json?details=1 and
extracts net consumption (grid) power. Supports both single-phase
(total only) and three-phase (per-line) reporting.

Authentication uses a Bearer token. If username/password/serial are
provided, the token is automatically refreshed via the Enphase Enlighten
cloud API when a 401 is received.
"""

def __init__(
self,
host,
token="",
phases=1,
verify_ssl=False,
username="",
password="",
serial="",
):
if not isinstance(phases, int) or phases < 1:
raise ValueError("PHASES must be a positive integer")
self.host = host
self.token = token
self.phases = phases
self.verify_ssl = verify_ssl
self.username = username
self.password = password
self.serial = serial
self.session = requests.Session()
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP-based powermeters need async def start() / async def stop() lifecycle methods now. The session should be created in start() and closed in stop(). The main loop calls these. Creating it in __init__ skips that contract.

if not verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suppresses SSL warnings globally for the entire process, affecting all other HTTP clients. Needs to be scoped to this powermeter only.


@property
def _has_credentials(self):
return all([self.username, self.password, self.serial])

def _refresh_token(self):
"""Refresh the token using Enlighten cloud credentials."""
self.token = obtain_token(self.username, self.password, self.serial)

def _fetch(self):
"""Fetch production.json from the Envoy, refreshing token on 401."""
url = f"https://{self.host}/production.json?details=1"

if not self.token and self._has_credentials:
self._refresh_token()

headers = {"Authorization": f"Bearer {self.token}"}
response = self.session.get(
url, headers=headers, verify=self.verify_ssl, timeout=10
)

if response.status_code == 401 and self._has_credentials:
logger.warning("Envoy returned 401, refreshing token via Enlighten")
self._refresh_token()
headers = {"Authorization": f"Bearer {self.token}"}
response = self.session.get(
url, headers=headers, verify=self.verify_ssl, timeout=10
)

response.raise_for_status()
return response.json()

def get_powermeter_watts(self):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be async def to match the base class signature (async def get_powermeter_watts(self) -> list[float]).

"""Return grid power in watts as a list (one entry per phase)."""
data = self._fetch()
consumption_list = data.get("consumption") or []
if not isinstance(consumption_list, list):
raise ValueError("Invalid Envoy response: consumption must be a list")

net_meter = _find_measurement(consumption_list, "net-consumption")
if net_meter is None:
logger.error("Envoy response does not expose net-consumption")
raise ValueError(
"Envoy response does not expose net-consumption; "
"grid CTs are required"
)

if self.phases == 1:
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we infer this from the response? For example, check if three lines are available; if not, fallback to single phase. I'm not sure if lines can be unavailable, but I see no reason to use the net power if individual phases are accessible.

return [int(net_meter.get("wNow", 0))]
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: base class return type is list[float]. Other powermeters return floats. Consider using float() for consistency.

else:
lines = net_meter.get("lines", [])
values = [int(line.get("wNow", 0)) for line in lines[: self.phases]]
while len(values) < self.phases:
values.append(0)
return values
Loading
Loading