Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
}

group = "org.eclipse.dataplane-core"
version = "0.0.10-SNAPSHOT"
version = "0.0.11-SNAPSHOT"

repositories {
mavenCentral()
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/org/eclipse/dataplane/Dataplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.dataplane.domain.DataAddress;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
Expand Down Expand Up @@ -210,7 +211,8 @@ public Result<DataFlowStatusMessage> resume(String flowId, DataFlowResumeMessage
.compose(dataFlow -> {
dataFlow.transitionToStarted();

var response = new DataFlowStatusMessage(flowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);
var dataAddress = getDataAddressForResume(dataFlow);
var response = new DataFlowStatusMessage(flowId, dataFlow.getState().name(), dataAddress, null);

return save(dataFlow).map(it -> response);
});
Expand Down Expand Up @@ -341,6 +343,16 @@ public Result<Void> registerOn(String controlPlaneEndpoint) {
});
}

private DataAddress getDataAddressForResume(DataFlow dataFlow) {
if (dataFlow.isPull() && dataFlow.getType() == DataFlow.Type.PROVIDER) {
return dataFlow.getDataAddress();
}
if (dataFlow.isPush() && dataFlow.getType() == DataFlow.Type.CONSUMER) {
return dataFlow.getDataAddress();
}
return null;
}

private Result<Void> notifyControlPlane(String action, DataFlow dataFlow, Object message) {
return toJson(message)
.map(body -> {
Expand Down
8 changes: 8 additions & 0 deletions src/test/java/org/eclipse/dataplane/ControlPlane.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,18 @@ public ValidatableResponse providerSuspend(String flowId, DataFlowSuspendMessage
return providerClient.suspend(flowId, suspendMessage);
}

public ValidatableResponse consumerSuspend(String flowId, DataFlowSuspendMessage suspendMessage) {
return consumerClient.suspend(flowId, suspendMessage);
}

public ValidatableResponse providerResume(String flowId, DataFlowResumeMessage resumeMessage) {
return providerClient.resume(flowId, resumeMessage);
}

public ValidatableResponse consumerResume(String flowId, DataFlowResumeMessage resumeMessage) {
return consumerClient.resume(flowId, resumeMessage);
}

public ValidatableResponse providerStatus(String flowId) {
return providerClient.status(flowId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,15 @@ void shouldSuspendAndResumeOnProvider() {
consumerDataPlane.assertNoMoreDataIsTransferred();

var resumeMessage = resumeMessage(providerProcessId);
var resumeResponse = controlPlane.providerResume(providerProcessId, resumeMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(resumeResponse.dataAddress())).statusCode(200);
var providerResumeResponse = controlPlane.providerResume(providerProcessId, resumeMessage)
.statusCode(200).extract().as(DataFlowStatusMessage.class);
assertThat(providerResumeResponse.dataAddress()).isNotNull();
controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(providerResumeResponse.dataAddress()))
.statusCode(200);

var consumerResumeResponse = controlPlane.consumerResume(consumerProcessId, resumeMessage(consumerProcessId))
.statusCode(200).extract().as(DataFlowStatusMessage.class);
assertThat(consumerResumeResponse.dataAddress()).isNull();

consumerDataPlane.assertDataIsFlowing();
}
Expand All @@ -153,6 +160,8 @@ private static class ConsumerDataPlane {
.registerAuthorization(new TestAuthorization())
.onPrepare(Result::success)
.onStarted(this::onStarted)
.onSuspend(Result::success)
.onResume(Result::success)
.build();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.eclipse.dataplane.domain.DataAddress;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -95,6 +97,47 @@ void shouldPushDataToEndpointPreparedByConsumer() {
consumerDataPlane.assertDataIsFlowing(consumerProcessId);
}

@Test
void shouldSuspendAndResumeByConsumer() {
var transferType = "FileSystemStreaming-PUSH";
var processId = UUID.randomUUID().toString();
var consumerProcessId = "consumer_" + processId;
var prepareMessage = MessageFactory.createPrepareMessage(consumerProcessId, URI.create("http://callback"), transferType);

var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
assertThat(prepareResponse.dataAddress()).isNotNull();
var destinationDataAddress = prepareResponse.dataAddress();

var providerProcessId = "provider_" + processId;
var startMessage = MessageFactory.createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, destinationDataAddress);
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);

assertThat(startResponse.state()).isEqualTo(STARTED.name());
consumerDataPlane.assertDataIsFlowing(consumerProcessId);

controlPlane.consumerSuspend(consumerProcessId, new DataFlowSuspendMessage("a reason"))
.statusCode(200);

controlPlane.providerSuspend(providerProcessId, new DataFlowSuspendMessage("a reason"))
.statusCode(200);

consumerDataPlane.assertNoMoreDataIsTransferred(consumerProcessId);

var resumeMessage = new DataFlowResumeMessage("theMessageId", consumerProcessId, null);
var consumerResumeResult = controlPlane.consumerResume(consumerProcessId, resumeMessage)
.statusCode(200).extract().as(DataFlowStatusMessage.class);
assertThat(consumerResumeResult.state()).isEqualTo("STARTED");
assertThat(consumerResumeResult.dataAddress()).isNotNull();

var providerResumeResult = controlPlane.providerResume(providerProcessId, new DataFlowResumeMessage("theMessageId", providerProcessId, null))
.statusCode(200).extract().as(DataFlowStatusMessage.class);
assertThat(providerResumeResult.state()).isEqualTo("STARTED");
assertThat(providerResumeResult.dataAddress()).isNull();

consumerDataPlane.assertDataIsFlowing(consumerProcessId);
}

private static class ProviderDataPlane {

private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
Expand All @@ -104,6 +147,7 @@ private static class ProviderDataPlane {
.registerAuthorization(new TestAuthorization())
.onStart(this::onStart)
.onSuspend(this::stopFlow)
.onResume(this::onStart)
.build();

ProviderDataPlane() {
Expand Down Expand Up @@ -147,6 +191,7 @@ private static class ConsumerDataPlane {
.registerAuthorization(new TestAuthorization())
.onPrepare(this::onPrepare)
.onSuspend(Result::success)
.onResume(Result::success)
.onCompleted(this::onCompleted)
.build();

Expand Down Expand Up @@ -205,5 +250,18 @@ public void assertDataIsFlowing(String consumerProcessId) {

}

public void assertNoMoreDataIsTransferred(String consumerProcessId) {
var destinationDataAddress = sdk.getById(consumerProcessId).map(DataFlow::getDataAddress)
.orElseThrow(f -> new AssertionError("No DataFlow with id %s found".formatted(consumerProcessId)));

var folder = Path.of(destinationDataAddress.endpoint()).toFile();

await().untilAsserted(() -> {
var filesTransferred = Objects.requireNonNull(folder.listFiles()).length;
Thread.sleep(1000L);
assertThat(folder.listFiles()).hasSize(filesTransferred);
});
}

}
}