diff --git a/config.ini.example b/config.ini.example index c2f0c3d1..fde0cd13 100644 --- a/config.ini.example +++ b/config.ini.example @@ -281,3 +281,21 @@ THROTTLE_INTERVAL = 0 #OBIS_POWER_L1 = 0100240700ff #OBIS_POWER_L2 = 0100380700ff #OBIS_POWER_L3 = 01004c0700ff + +#[ENVOY] +## Enphase Envoy local API powermeter +## Reads grid power from /production.json?details=1 (net-consumption). +## Requires grid CTs installed on the Envoy. +#HOST = envoy.local +## Static JWT token (optional if username/password/serial are provided) +#TOKEN = +## Enlighten cloud credentials for automatic token refresh (optional) +#USERNAME = +#PASSWORD = +#SERIAL = +## Number of phases (1 or 3) +#PHASES = 1 +## Set False only if you get certificate errors on a trusted LAN (insecure). +#VERIFY_SSL = False +## Per-powermeter throttling override (optional) +#THROTTLE_INTERVAL = 0 diff --git a/src/b2500_meter/config/config_loader.py b/src/b2500_meter/config/config_loader.py index 282d181e..f43c1a5a 100644 --- a/src/b2500_meter/config/config_loader.py +++ b/src/b2500_meter/config/config_loader.py @@ -32,6 +32,7 @@ SHELLY_SECTION = "SHELLY" TASMOTA_SECTION = "TASMOTA" SHRDZM_SECTION = "SHRDZM" +ENVOY_SECTION = "ENVOY" EMLOG_SECTION = "EMLOG" IOBROKER_SECTION = "IOBROKER" HOMEASSISTANT_SECTION = "HOMEASSISTANT" @@ -149,6 +150,8 @@ def create_powermeter( return create_tasmota_powermeter(section, config) elif section.startswith(SHRDZM_SECTION): return create_shrdzm_powermeter(section, config) + elif section.startswith(ENVOY_SECTION): + return create_envoy_powermeter(section, config) elif section.startswith(EMLOG_SECTION): return create_emlog_powermeter(section, config) elif section.startswith(IOBROKER_SECTION): @@ -448,3 +451,19 @@ def create_sma_energy_meter_powermeter( config.getint(section, "SERIAL_NUMBER", fallback=0), config.get(section, "INTERFACE", fallback=""), ) + + +def create_envoy_powermeter( + section: str, config: configparser.ConfigParser +) -> Powermeter: + from b2500_meter.powermeter.envoy import Envoy + + return Envoy( + host=config.get(section, "HOST", fallback=""), + token=config.get(section, "TOKEN", fallback=""), + phases=config.getint(section, "PHASES", fallback=1), + verify_ssl=config.getboolean(section, "VERIFY_SSL", fallback=False), + username=config.get(section, "USERNAME", fallback=""), + password=config.get(section, "PASSWORD", fallback=""), + serial=config.get(section, "SERIAL", fallback=""), + ) diff --git a/src/b2500_meter/powermeter/envoy.py b/src/b2500_meter/powermeter/envoy.py new file mode 100644 index 00000000..6b9b5ce0 --- /dev/null +++ b/src/b2500_meter/powermeter/envoy.py @@ -0,0 +1,152 @@ +from .base import Powermeter +import requests +import urllib3 +import logging + +logger = logging.getLogger("b2500-meter") + + +def _find_measurement(entries, measurement_type): + """Find a measurement block by type in the Envoy production/consumption array.""" + for entry in entries: + if ( + entry.get("measurementType") == measurement_type + or entry.get("type") == measurement_type + ): + return entry + return None + + +ENLIGHTEN_LOGIN_URL = "https://enlighten.enphaseenergy.com/login/login.json" +ENLIGHTEN_TOKEN_URL = "https://entrez.enphaseenergy.com/tokens" + + +def obtain_token(username, password, serial): + """Obtain a JWT token from the Enphase Enlighten cloud API. + + 1. Authenticate with username/password to get a session_id + 2. Use session_id + serial to obtain a JWT token for local Envoy access + """ + # Step 1: Login to Enlighten + login_resp = requests.post( + ENLIGHTEN_LOGIN_URL, + data={"user[email]": username, "user[password]": password}, + timeout=30, + ) + login_resp.raise_for_status() + login_data = login_resp.json() + session_id = login_data.get("session_id") + if not session_id: + raise ValueError( + f"Enlighten login failed: no session_id in response " + f"(message: {login_data.get('message', 'unknown')})" + ) + + # Step 2: Get token + token_resp = requests.post( + ENLIGHTEN_TOKEN_URL, + json={ + "session_id": session_id, + "serial_num": serial, + "username": username, + }, + timeout=30, + ) + token_resp.raise_for_status() + token = token_resp.text.strip() + if not token or token.startswith("{"): + raise ValueError(f"Enlighten token request failed: {token[:200]}") + + logger.info("Obtained new Envoy token from Enlighten cloud") + return token + + +class Envoy(Powermeter): + """Powermeter backend that reads grid power from an Enphase Envoy. + + Connects to the Envoy's local API at /production.json?details=1 and + extracts net consumption (grid) power. Supports both single-phase + (total only) and three-phase (per-line) reporting. + + Authentication uses a Bearer token. If username/password/serial are + provided, the token is automatically refreshed via the Enphase Enlighten + cloud API when a 401 is received. + """ + + def __init__( + self, + host, + token="", + phases=1, + verify_ssl=False, + username="", + password="", + serial="", + ): + if not isinstance(phases, int) or phases < 1: + raise ValueError("PHASES must be a positive integer") + self.host = host + self.token = token + self.phases = phases + self.verify_ssl = verify_ssl + self.username = username + self.password = password + self.serial = serial + self.session = requests.Session() + if not verify_ssl: + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + @property + def _has_credentials(self): + return all([self.username, self.password, self.serial]) + + def _refresh_token(self): + """Refresh the token using Enlighten cloud credentials.""" + self.token = obtain_token(self.username, self.password, self.serial) + + def _fetch(self): + """Fetch production.json from the Envoy, refreshing token on 401.""" + url = f"https://{self.host}/production.json?details=1" + + if not self.token and self._has_credentials: + self._refresh_token() + + headers = {"Authorization": f"Bearer {self.token}"} + response = self.session.get( + url, headers=headers, verify=self.verify_ssl, timeout=10 + ) + + if response.status_code == 401 and self._has_credentials: + logger.warning("Envoy returned 401, refreshing token via Enlighten") + self._refresh_token() + headers = {"Authorization": f"Bearer {self.token}"} + response = self.session.get( + url, headers=headers, verify=self.verify_ssl, timeout=10 + ) + + response.raise_for_status() + return response.json() + + def get_powermeter_watts(self): + """Return grid power in watts as a list (one entry per phase).""" + data = self._fetch() + consumption_list = data.get("consumption") or [] + if not isinstance(consumption_list, list): + raise ValueError("Invalid Envoy response: consumption must be a list") + + net_meter = _find_measurement(consumption_list, "net-consumption") + if net_meter is None: + logger.error("Envoy response does not expose net-consumption") + raise ValueError( + "Envoy response does not expose net-consumption; " + "grid CTs are required" + ) + + if self.phases == 1: + return [int(net_meter.get("wNow", 0))] + else: + lines = net_meter.get("lines", []) + values = [int(line.get("wNow", 0)) for line in lines[: self.phases]] + while len(values) < self.phases: + values.append(0) + return values diff --git a/src/b2500_meter/powermeter/envoy_test.py b/src/b2500_meter/powermeter/envoy_test.py new file mode 100644 index 00000000..91c54f41 --- /dev/null +++ b/src/b2500_meter/powermeter/envoy_test.py @@ -0,0 +1,227 @@ +import unittest +from unittest.mock import patch, MagicMock +from b2500_meter.powermeter.envoy import Envoy, _find_measurement, obtain_token + + +SAMPLE_RESPONSE = { + "production": [ + { + "type": "inverters", + "activeCount": 12, + "readingTime": 1710500000, + "wNow": 2500, + "whLifetime": 12345678, + } + ], + "consumption": [ + { + "measurementType": "total-consumption", + "wNow": 1200.5, + "whLifetime": 9876543, + "rmsVoltage": 240.0, + "rmsCurrent": 5.0, + "lines": [ + {"wNow": 400.0, "whLifetime": 3000000}, + {"wNow": 350.0, "whLifetime": 3200000}, + {"wNow": 450.5, "whLifetime": 3676543}, + ], + }, + { + "measurementType": "net-consumption", + "wNow": -300.0, + "whLifetime": 5000000, + "rmsVoltage": 240.0, + "rmsCurrent": 1.25, + "lines": [ + {"wNow": -100.0, "whLifetime": 1500000}, + {"wNow": -80.0, "whLifetime": 1700000}, + {"wNow": -120.0, "whLifetime": 1800000}, + ], + }, + ], +} + + +class TestFindMeasurement(unittest.TestCase): + def test_find_by_measurement_type(self): + result = _find_measurement(SAMPLE_RESPONSE["consumption"], "net-consumption") + self.assertIsNotNone(result) + self.assertEqual(result["wNow"], -300.0) + + def test_find_by_type(self): + result = _find_measurement(SAMPLE_RESPONSE["production"], "inverters") + self.assertIsNotNone(result) + self.assertEqual(result["wNow"], 2500) + + def test_not_found(self): + result = _find_measurement(SAMPLE_RESPONSE["consumption"], "nonexistent") + self.assertIsNone(result) + + +class TestEnvoySinglePhase(unittest.TestCase): + @patch("powermeter.envoy.requests.Session") + def test_get_powermeter_watts_single_phase(self, mock_session_class): + mock_response = MagicMock() + mock_response.json.return_value = SAMPLE_RESPONSE + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + mock_session = MagicMock() + mock_session.get.return_value = mock_response + mock_session_class.return_value = mock_session + + envoy = Envoy(host="192.168.1.200", token="test-token", phases=1) + result = envoy.get_powermeter_watts() + + self.assertEqual(result, [-300]) + mock_session.get.assert_called_once() + call_args = mock_session.get.call_args + self.assertIn("Bearer test-token", call_args[1]["headers"]["Authorization"]) + + +class TestEnvoyThreePhase(unittest.TestCase): + @patch("powermeter.envoy.requests.Session") + def test_get_powermeter_watts_three_phase(self, mock_session_class): + mock_response = MagicMock() + mock_response.json.return_value = SAMPLE_RESPONSE + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + mock_session = MagicMock() + mock_session.get.return_value = mock_response + mock_session_class.return_value = mock_session + + envoy = Envoy(host="192.168.1.200", token="test-token", phases=3) + result = envoy.get_powermeter_watts() + + self.assertEqual(result, [-100, -80, -120]) + + +class TestEnvoyNoNetConsumption(unittest.TestCase): + @patch("powermeter.envoy.requests.Session") + def test_raises_when_only_total_consumption(self, mock_session_class): + data = { + "production": [], + "consumption": [ + { + "measurementType": "total-consumption", + "wNow": 800.0, + } + ], + } + mock_response = MagicMock() + mock_response.json.return_value = data + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + mock_session = MagicMock() + mock_session.get.return_value = mock_response + mock_session_class.return_value = mock_session + + envoy = Envoy(host="192.168.1.200", token="test-token", phases=1) + with self.assertRaises(ValueError): + envoy.get_powermeter_watts() + + @patch("powermeter.envoy.requests.Session") + def test_raises_when_no_consumption_data(self, mock_session_class): + data = {"production": [], "consumption": []} + mock_response = MagicMock() + mock_response.json.return_value = data + mock_response.status_code = 200 + mock_response.raise_for_status = MagicMock() + mock_session = MagicMock() + mock_session.get.return_value = mock_response + mock_session_class.return_value = mock_session + + envoy = Envoy(host="192.168.1.200", token="test-token", phases=1) + with self.assertRaises(ValueError): + envoy.get_powermeter_watts() + + +class TestEnvoyTokenRefreshOn401(unittest.TestCase): + @patch("powermeter.envoy.obtain_token", return_value="new-token") + @patch("powermeter.envoy.requests.Session") + def test_refreshes_token_on_401(self, mock_session_class, mock_obtain): + # First call returns 401, second returns 200 + resp_401 = MagicMock() + resp_401.status_code = 401 + resp_200 = MagicMock() + resp_200.status_code = 200 + resp_200.json.return_value = SAMPLE_RESPONSE + resp_200.raise_for_status = MagicMock() + + mock_session = MagicMock() + mock_session.get.side_effect = [resp_401, resp_200] + mock_session_class.return_value = mock_session + + envoy = Envoy( + host="192.168.1.200", + token="expired-token", + phases=1, + username="user@test.com", + password="pass", + serial="123456", + ) + result = envoy.get_powermeter_watts() + + self.assertEqual(result, [-300]) + mock_obtain.assert_called_once_with("user@test.com", "pass", "123456") + self.assertEqual(envoy.token, "new-token") + + +class TestEnvoyAutoObtainToken(unittest.TestCase): + @patch("powermeter.envoy.obtain_token", return_value="fresh-token") + @patch("powermeter.envoy.requests.Session") + def test_obtains_token_when_none_provided(self, mock_session_class, mock_obtain): + resp_200 = MagicMock() + resp_200.status_code = 200 + resp_200.json.return_value = SAMPLE_RESPONSE + resp_200.raise_for_status = MagicMock() + + mock_session = MagicMock() + mock_session.get.return_value = resp_200 + mock_session_class.return_value = mock_session + + envoy = Envoy( + host="192.168.1.200", + token="", + phases=1, + username="user@test.com", + password="pass", + serial="123456", + ) + result = envoy.get_powermeter_watts() + + self.assertEqual(result, [-300]) + mock_obtain.assert_called_once() + self.assertEqual(envoy.token, "fresh-token") + + +class TestObtainToken(unittest.TestCase): + @patch("powermeter.envoy.requests.post") + def test_obtain_token_success(self, mock_post): + login_resp = MagicMock() + login_resp.json.return_value = {"session_id": "abc123"} + login_resp.raise_for_status = MagicMock() + + token_resp = MagicMock() + token_resp.text = "eyJhbGciOiJSUzI1NiJ9.test.token" + token_resp.raise_for_status = MagicMock() + + mock_post.side_effect = [login_resp, token_resp] + + token = obtain_token("user@test.com", "pass", "123456") + self.assertEqual(token, "eyJhbGciOiJSUzI1NiJ9.test.token") + + @patch("powermeter.envoy.requests.post") + def test_obtain_token_login_fails(self, mock_post): + login_resp = MagicMock() + login_resp.json.return_value = {"message": "Invalid credentials"} + login_resp.raise_for_status = MagicMock() + + mock_post.return_value = login_resp + + with self.assertRaises(ValueError) as ctx: + obtain_token("user@test.com", "wrongpass", "123456") + self.assertIn("no session_id", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main()