diff --git a/.github/workflows/test_and_build.yml b/.github/workflows/test_and_build.yml index bc0efbe0..465fa711 100644 --- a/.github/workflows/test_and_build.yml +++ b/.github/workflows/test_and_build.yml @@ -46,7 +46,7 @@ jobs: COGNITE_PROJECT: extractor-tests COGNITE_BASE_URL: https://greenfield.cognitedata.com COGNITE_DEV_PROJECT: extractor-aws-dub-dev-testing - COGNITE_DEV_BASE_URL: https://aws-dub-dev.cognitedata.com/ + COGNITE_DEV_BASE_URL: https://aws-dub-dev.cognitedata.com COGNITE_DEV_TOKEN_SCOPES: https://aws-dub-dev.cognitedata.com/.default COGNITE_INTEGRATION: pythonutils-test run: | diff --git a/cognite/extractorutils/unstable/core/checkin_worker.py b/cognite/extractorutils/unstable/core/checkin_worker.py index 2b83ecf7..a65a9c31 100644 --- a/cognite/extractorutils/unstable/core/checkin_worker.py +++ b/cognite/extractorutils/unstable/core/checkin_worker.py @@ -96,6 +96,18 @@ def active_revision(self, value: ConfigRevision) -> None: with self._lock: self._active_revision = value + def reset_startup(self) -> None: + """ + Reset startup. + + This will reset startup if and when the extractor restarts either + due to changes in the config or the extractor just starting for the first time. + + """ + with self._lock: + self._is_running = False + self._has_reported_startup = False + def set_on_revision_change_handler(self, on_revision_change: Callable[[int], None]) -> None: """ Set the handler for when the configuration revision changes. diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index 30d7d2fa..da0a2068 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -525,6 +525,8 @@ def _main_runtime(self, args: Namespace) -> None: message = self._message_queue.get_nowait() match message: case RuntimeMessage.RESTART: + self.logger.info("Extractor restart detected. Restarting extractor.") + checkin_worker.reset_startup() continue case _: diff --git a/pyproject.toml b/pyproject.toml index 7566ee58..37c4db12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -123,5 +123,8 @@ only-include = ["cognite"] requires = ["hatchling"] build-backend = "hatchling.build" +[dependency-groups] +dev = [] + [project.scripts] simple-extractor = "cognite.examples.unstable.extractors.simple_extractor.main:main" diff --git a/tests/test_unstable/conftest.py b/tests/test_unstable/conftest.py index bd5001b5..d143f0ff 100644 --- a/tests/test_unstable/conftest.py +++ b/tests/test_unstable/conftest.py @@ -151,7 +151,7 @@ def json_callback(request: Any, context: Any) -> dict: requests_mock.register_uri( method="POST", - url=f"{connection_config.base_url}api/v1/projects/{connection_config.project}/integrations/startup", + url=f"{connection_config.base_url}/api/v1/projects/{connection_config.project}/integrations/startup", json=json_callback, status_code=status_code, ) @@ -177,7 +177,7 @@ def json_callback(request: Any, context: Any) -> dict: requests_mock.register_uri( method="POST", - url=f"{connection_config.base_url}api/v1/projects/{connection_config.project}/integrations/checkin", + url=f"{connection_config.base_url}/api/v1/projects/{connection_config.project}/integrations/checkin", json=json_callback, status_code=status_code, ) diff --git a/tests/test_unstable/test_checkin_worker.py b/tests/test_unstable/test_checkin_worker.py index 9bc484d3..2711aca8 100644 --- a/tests/test_unstable/test_checkin_worker.py +++ b/tests/test_unstable/test_checkin_worker.py @@ -157,6 +157,71 @@ def test_run_report_periodic( assert "taskEvents" in checkin_bag[1] +def test_run_report_periodic_reset_startup( + connection_config: ConnectionConfig, + application_config: TestConfig, + requests_mock: requests_mock.Mocker, + mock_checkin_request: Callable[[requests_mock.Mocker], None], + mock_startup_request: Callable[[requests_mock.Mocker], None], + faker: faker.Faker, + checkin_bag: list, +) -> None: + requests_mock.real_http = True + mock_startup_request(requests_mock) + mock_checkin_request(requests_mock) + cognite_client = connection_config.get_cognite_client("test_checkin") + cancellation_token = CancellationToken() + worker = CheckinWorker( + cognite_client, + connection_config.integration.external_id, + logging.getLogger(__name__), + ) + + test_extractor = TestExtractor( + FullConfig( + connection_config=connection_config, application_config=application_config, current_config_revision=1 + ), + worker, + ) + test_extractor._start_time = datetime.fromtimestamp(int(now() / 1000), tz=timezone.utc) + message_queue: Queue = Queue() + mp_cancel_event = Event() + test_extractor._attach_runtime_controls(cancel_event=mp_cancel_event, message_queue=message_queue) + + worker.report_task_end("task1", faker.sentence()) + worker.report_task_start("task1", faker.sentence()) + worker.report_error( + Error( + level=ErrorLevel.error, + description=faker.sentence(), + task_name="task1", + extractor=test_extractor, + details=None, + ) + ) + + process = Thread( + target=worker.run_periodic_checkin, + args=(cancellation_token, test_extractor._get_startup_request(), 2.0), + ) + process.start() + worker.reset_startup() + process.join(timeout=3) + cancellation_token.cancel() + + cancellation_token = CancellationToken() + process = Thread( + target=worker.run_periodic_checkin, + args=(cancellation_token, test_extractor._get_startup_request(), 2.0), + ) + process.start() + worker.reset_startup() + process.join(timeout=3) + cancellation_token.cancel() + + assert len(checkin_bag) >= 3 + + def test_run_report_periodic_ensure_reorder( connection_config: ConnectionConfig, application_config: TestConfig,