diff --git a/build.gradle.kts b/build.gradle.kts index 13d606e..15b9940 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ plugins { } group = "org.eclipse.dataplane-core" -version = "0.0.10-SNAPSHOT" +version = "0.0.11-SNAPSHOT" repositories { mavenCentral() diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index 872cf0c..a027ed8 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.domain.DataAddress; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.controlplane.ControlPlane; import org.eclipse.dataplane.domain.dataflow.DataFlow; @@ -210,7 +211,8 @@ public Result resume(String flowId, DataFlowResumeMessage .compose(dataFlow -> { dataFlow.transitionToStarted(); - var response = new DataFlowStatusMessage(flowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null); + var dataAddress = getDataAddressForResume(dataFlow); + var response = new DataFlowStatusMessage(flowId, dataFlow.getState().name(), dataAddress, null); return save(dataFlow).map(it -> response); }); @@ -341,6 +343,16 @@ public Result registerOn(String controlPlaneEndpoint) { }); } + private DataAddress getDataAddressForResume(DataFlow dataFlow) { + if (dataFlow.isPull() && dataFlow.getType() == DataFlow.Type.PROVIDER) { + return dataFlow.getDataAddress(); + } + if (dataFlow.isPush() && dataFlow.getType() == DataFlow.Type.CONSUMER) { + return dataFlow.getDataAddress(); + } + return null; + } + private Result notifyControlPlane(String action, DataFlow dataFlow, Object message) { return toJson(message) .map(body -> { diff --git a/src/test/java/org/eclipse/dataplane/ControlPlane.java b/src/test/java/org/eclipse/dataplane/ControlPlane.java index 305dbf9..e136ac7 100644 --- a/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -84,10 +84,18 @@ public ValidatableResponse providerSuspend(String flowId, DataFlowSuspendMessage return providerClient.suspend(flowId, suspendMessage); } + public ValidatableResponse consumerSuspend(String flowId, DataFlowSuspendMessage suspendMessage) { + return consumerClient.suspend(flowId, suspendMessage); + } + public ValidatableResponse providerResume(String flowId, DataFlowResumeMessage resumeMessage) { return providerClient.resume(flowId, resumeMessage); } + public ValidatableResponse consumerResume(String flowId, DataFlowResumeMessage resumeMessage) { + return consumerClient.resume(flowId, resumeMessage); + } + public ValidatableResponse providerStatus(String flowId) { return providerClient.status(flowId); } diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java index 12dec83..6926c8b 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java @@ -134,8 +134,15 @@ void shouldSuspendAndResumeOnProvider() { consumerDataPlane.assertNoMoreDataIsTransferred(); var resumeMessage = resumeMessage(providerProcessId); - var resumeResponse = controlPlane.providerResume(providerProcessId, resumeMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); - controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(resumeResponse.dataAddress())).statusCode(200); + var providerResumeResponse = controlPlane.providerResume(providerProcessId, resumeMessage) + .statusCode(200).extract().as(DataFlowStatusMessage.class); + assertThat(providerResumeResponse.dataAddress()).isNotNull(); + controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(providerResumeResponse.dataAddress())) + .statusCode(200); + + var consumerResumeResponse = controlPlane.consumerResume(consumerProcessId, resumeMessage(consumerProcessId)) + .statusCode(200).extract().as(DataFlowStatusMessage.class); + assertThat(consumerResumeResponse.dataAddress()).isNull(); consumerDataPlane.assertDataIsFlowing(); } @@ -153,6 +160,8 @@ private static class ConsumerDataPlane { .registerAuthorization(new TestAuthorization()) .onPrepare(Result::success) .onStarted(this::onStarted) + .onSuspend(Result::success) + .onResume(Result::success) .build(); diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java index e6f62e3..8a0b97a 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java @@ -22,7 +22,9 @@ import org.eclipse.dataplane.domain.DataAddress; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; +import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -95,6 +97,47 @@ void shouldPushDataToEndpointPreparedByConsumer() { consumerDataPlane.assertDataIsFlowing(consumerProcessId); } + @Test + void shouldSuspendAndResumeByConsumer() { + var transferType = "FileSystemStreaming-PUSH"; + var processId = UUID.randomUUID().toString(); + var consumerProcessId = "consumer_" + processId; + var prepareMessage = MessageFactory.createPrepareMessage(consumerProcessId, URI.create("http://callback"), transferType); + + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); + assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); + assertThat(prepareResponse.dataAddress()).isNotNull(); + var destinationDataAddress = prepareResponse.dataAddress(); + + var providerProcessId = "provider_" + processId; + var startMessage = MessageFactory.createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, destinationDataAddress); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); + + assertThat(startResponse.state()).isEqualTo(STARTED.name()); + consumerDataPlane.assertDataIsFlowing(consumerProcessId); + + controlPlane.consumerSuspend(consumerProcessId, new DataFlowSuspendMessage("a reason")) + .statusCode(200); + + controlPlane.providerSuspend(providerProcessId, new DataFlowSuspendMessage("a reason")) + .statusCode(200); + + consumerDataPlane.assertNoMoreDataIsTransferred(consumerProcessId); + + var resumeMessage = new DataFlowResumeMessage("theMessageId", consumerProcessId, null); + var consumerResumeResult = controlPlane.consumerResume(consumerProcessId, resumeMessage) + .statusCode(200).extract().as(DataFlowStatusMessage.class); + assertThat(consumerResumeResult.state()).isEqualTo("STARTED"); + assertThat(consumerResumeResult.dataAddress()).isNotNull(); + + var providerResumeResult = controlPlane.providerResume(providerProcessId, new DataFlowResumeMessage("theMessageId", providerProcessId, null)) + .statusCode(200).extract().as(DataFlowStatusMessage.class); + assertThat(providerResumeResult.state()).isEqualTo("STARTED"); + assertThat(providerResumeResult.dataAddress()).isNull(); + + consumerDataPlane.assertDataIsFlowing(consumerProcessId); + } + private static class ProviderDataPlane { private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); @@ -104,6 +147,7 @@ private static class ProviderDataPlane { .registerAuthorization(new TestAuthorization()) .onStart(this::onStart) .onSuspend(this::stopFlow) + .onResume(this::onStart) .build(); ProviderDataPlane() { @@ -147,6 +191,7 @@ private static class ConsumerDataPlane { .registerAuthorization(new TestAuthorization()) .onPrepare(this::onPrepare) .onSuspend(Result::success) + .onResume(Result::success) .onCompleted(this::onCompleted) .build(); @@ -205,5 +250,18 @@ public void assertDataIsFlowing(String consumerProcessId) { } + public void assertNoMoreDataIsTransferred(String consumerProcessId) { + var destinationDataAddress = sdk.getById(consumerProcessId).map(DataFlow::getDataAddress) + .orElseThrow(f -> new AssertionError("No DataFlow with id %s found".formatted(consumerProcessId))); + + var folder = Path.of(destinationDataAddress.endpoint()).toFile(); + + await().untilAsserted(() -> { + var filesTransferred = Objects.requireNonNull(folder.listFiles()).length; + Thread.sleep(1000L); + assertThat(folder.listFiles()).hasSize(filesTransferred); + }); + } + } }