Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_and_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
COGNITE_PROJECT: extractor-tests
COGNITE_BASE_URL: https://greenfield.cognitedata.com
COGNITE_DEV_PROJECT: extractor-aws-dub-dev-testing
COGNITE_DEV_BASE_URL: https://aws-dub-dev.cognitedata.com/
COGNITE_DEV_BASE_URL: https://aws-dub-dev.cognitedata.com
COGNITE_DEV_TOKEN_SCOPES: https://aws-dub-dev.cognitedata.com/.default
COGNITE_INTEGRATION: pythonutils-test
run: |
Expand Down
12 changes: 12 additions & 0 deletions cognite/extractorutils/unstable/core/checkin_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ def active_revision(self, value: ConfigRevision) -> None:
with self._lock:
self._active_revision = value

def reset_startup(self) -> None:
"""
Reset startup.

This will reset startup if and when the extractor restarts either
due to changes in the config or the extractor just starting for the first time.

"""
with self._lock:
self._is_running = False
self._has_reported_startup = False

def set_on_revision_change_handler(self, on_revision_change: Callable[[int], None]) -> None:
"""
Set the handler for when the configuration revision changes.
Expand Down
2 changes: 2 additions & 0 deletions cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ def _main_runtime(self, args: Namespace) -> None:
message = self._message_queue.get_nowait()
match message:
case RuntimeMessage.RESTART:
self.logger.info("Extractor restart detected. Restarting extractor.")
checkin_worker.reset_startup()
continue

case _:
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,8 @@ only-include = ["cognite"]
requires = ["hatchling"]
build-backend = "hatchling.build"

[dependency-groups]
dev = []

[project.scripts]
simple-extractor = "cognite.examples.unstable.extractors.simple_extractor.main:main"
4 changes: 2 additions & 2 deletions tests/test_unstable/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def json_callback(request: Any, context: Any) -> dict:

requests_mock.register_uri(
method="POST",
url=f"{connection_config.base_url}api/v1/projects/{connection_config.project}/integrations/startup",
url=f"{connection_config.base_url}/api/v1/projects/{connection_config.project}/integrations/startup",
json=json_callback,
status_code=status_code,
)
Expand All @@ -177,7 +177,7 @@ def json_callback(request: Any, context: Any) -> dict:

requests_mock.register_uri(
method="POST",
url=f"{connection_config.base_url}api/v1/projects/{connection_config.project}/integrations/checkin",
url=f"{connection_config.base_url}/api/v1/projects/{connection_config.project}/integrations/checkin",
json=json_callback,
status_code=status_code,
)
Expand Down
65 changes: 65 additions & 0 deletions tests/test_unstable/test_checkin_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,71 @@ def test_run_report_periodic(
assert "taskEvents" in checkin_bag[1]


def test_run_report_periodic_reset_startup(
connection_config: ConnectionConfig,
application_config: TestConfig,
requests_mock: requests_mock.Mocker,
mock_checkin_request: Callable[[requests_mock.Mocker], None],
mock_startup_request: Callable[[requests_mock.Mocker], None],
faker: faker.Faker,
checkin_bag: list,
) -> None:
requests_mock.real_http = True
mock_startup_request(requests_mock)
mock_checkin_request(requests_mock)
cognite_client = connection_config.get_cognite_client("test_checkin")
cancellation_token = CancellationToken()
worker = CheckinWorker(
cognite_client,
connection_config.integration.external_id,
logging.getLogger(__name__),
)

test_extractor = TestExtractor(
FullConfig(
connection_config=connection_config, application_config=application_config, current_config_revision=1
),
worker,
)
test_extractor._start_time = datetime.fromtimestamp(int(now() / 1000), tz=timezone.utc)
message_queue: Queue = Queue()
mp_cancel_event = Event()
test_extractor._attach_runtime_controls(cancel_event=mp_cancel_event, message_queue=message_queue)

worker.report_task_end("task1", faker.sentence())
worker.report_task_start("task1", faker.sentence())
worker.report_error(
Error(
level=ErrorLevel.error,
description=faker.sentence(),
task_name="task1",
extractor=test_extractor,
details=None,
)
)

process = Thread(
target=worker.run_periodic_checkin,
args=(cancellation_token, test_extractor._get_startup_request(), 2.0),
)
process.start()
worker.reset_startup()
process.join(timeout=3)
cancellation_token.cancel()

cancellation_token = CancellationToken()
process = Thread(
target=worker.run_periodic_checkin,
args=(cancellation_token, test_extractor._get_startup_request(), 2.0),
)
process.start()
worker.reset_startup()
process.join(timeout=3)
cancellation_token.cancel()

assert len(checkin_bag) >= 3


def test_run_report_periodic_ensure_reorder(
connection_config: ConnectionConfig,
application_config: TestConfig,
Expand Down
Loading