Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions crates/sdk-axum/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use dataplane_sdk::{
db::tx::TransactionalContext,
model::{
messages::{
DataFlowPrepareMessage, DataFlowResponseMessage, DataFlowStartMessage,
DataFlowStartedNotificationMessage, DataFlowSuspendMessage,
DataFlowTerminateMessage,
DataFlowPrepareMessage, DataFlowStartMessage, DataFlowStartedNotificationMessage,
DataFlowStatusMessage, DataFlowSuspendMessage, DataFlowTerminateMessage,
},
participant::ParticipantContext,
},
Expand All @@ -36,7 +35,7 @@ pub async fn start_flow<C>(
State(sdk): State<DataPlaneSdk<C>>,
Extension(participant): Extension<ParticipantContext>,
Json(msg): Json<DataFlowStartMessage>,
) -> SignalingResult<Json<DataFlowResponseMessage>>
) -> SignalingResult<Json<DataFlowStatusMessage>>
where
C: TransactionalContext,
{
Expand All @@ -48,7 +47,7 @@ pub async fn prepare_flow<C>(
State(sdk): State<DataPlaneSdk<C>>,
Extension(participant): Extension<ParticipantContext>,
Json(msg): Json<DataFlowPrepareMessage>,
) -> SignalingResult<Json<DataFlowResponseMessage>>
) -> SignalingResult<Json<DataFlowStatusMessage>>
where
C: TransactionalContext,
{
Expand Down
13 changes: 6 additions & 7 deletions crates/sdk-axum/src/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use async_trait::async_trait;
use axum::Router;
use dataplane_sdk::core::error::HandlerResult;
use dataplane_sdk::core::model::messages::DataFlowResponseMessage;
use dataplane_sdk::core::model::messages::DataFlowStatusMessage;
use dataplane_sdk::core::model::participant::ParticipantContext;
use dataplane_sdk::sdk;
use dataplane_sdk::{
Expand Down Expand Up @@ -91,7 +91,7 @@ mock! {
&self,
tx: &mut MockTx,
flow: &DataFlow,
) -> HandlerResult<DataFlowResponseMessage>;
) -> HandlerResult<DataFlowStatusMessage>;

async fn on_terminate(
&self,
Expand All @@ -103,7 +103,7 @@ mock! {
&self,
tx: &mut MockTx,
flow: &DataFlow,
) -> HandlerResult<DataFlowResponseMessage>;
) -> HandlerResult<DataFlowStatusMessage>;

async fn on_suspend(
&self,
Expand Down Expand Up @@ -172,7 +172,7 @@ mod start {
use dataplane_sdk::core::model::{
data_address::DataAddress,
data_flow::DataFlowState,
messages::{DataFlowResponseMessage, DataFlowStartMessage},
messages::{DataFlowStartMessage, DataFlowStatusMessage},
};
use http_body_util::BodyExt;
use rstest::rstest;
Expand All @@ -197,9 +197,8 @@ mod start {

handler.expect_can_handle().returning(|_| Ok(true));
handler.expect_on_start().returning(|_, _flow| {
Ok(DataFlowResponseMessage::builder()
Ok(DataFlowStatusMessage::builder()
.state(DataFlowState::Started)
.dataplane_id("dataplane_id")
.build())
});

Expand Down Expand Up @@ -239,7 +238,7 @@ mod start {

let body = response.into_body().collect().await.unwrap().to_bytes();

let body: DataFlowResponseMessage = serde_json::from_slice(&body).unwrap();
let body: DataFlowStatusMessage = serde_json::from_slice(&body).unwrap();

assert!(body.data_address.is_none());
}
Expand Down
6 changes: 3 additions & 3 deletions crates/sdk/src/core/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use super::{
error::HandlerResult,
model::{data_flow::DataFlow, messages::DataFlowResponseMessage},
model::{data_flow::DataFlow, messages::DataFlowStatusMessage},
};

#[cfg(test)]
Expand All @@ -29,13 +29,13 @@ pub trait DataFlowHandler: Send + Sync {
&self,
tx: &mut Self::Transaction,
flow: &DataFlow,
) -> HandlerResult<DataFlowResponseMessage>;
) -> HandlerResult<DataFlowStatusMessage>;

async fn on_prepare(
&self,
tx: &mut Self::Transaction,
flow: &DataFlow,
) -> HandlerResult<DataFlowResponseMessage>;
) -> HandlerResult<DataFlowStatusMessage>;

async fn on_terminate(&self, tx: &mut Self::Transaction, flow: &DataFlow) -> HandlerResult<()>;
async fn on_started(&self, tx: &mut Self::Transaction, flow: &DataFlow) -> HandlerResult<()>;
Expand Down
3 changes: 1 addition & 2 deletions crates/sdk/src/core/model/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ pub struct DataFlowPrepareMessage {
#[derive(Debug, Builder, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
#[builder(on(String, into))]
pub struct DataFlowResponseMessage {
pub dataplane_id: String,
pub struct DataFlowStatusMessage {
pub data_address: Option<DataAddress>,
pub state: DataFlowState,
pub error: Option<String>,
Expand Down
9 changes: 4 additions & 5 deletions crates/sdk/src/sdk/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::{
model::{
data_flow::{DataFlow, DataFlowState, DataFlowType},
messages::{
DataFlowPrepareMessage, DataFlowResponseMessage, DataFlowStartMessage,
DataFlowStartedNotificationMessage,
DataFlowPrepareMessage, DataFlowStartMessage, DataFlowStartedNotificationMessage,
DataFlowStatusMessage,
},
},
},
Expand All @@ -46,7 +46,7 @@ where
&self,
participant_context_id: &str,
req: DataFlowStartMessage,
) -> SdkResult<DataFlowResponseMessage> {
) -> SdkResult<DataFlowStatusMessage> {
let flow = DataFlow::builder()
.id(req.process_id)
.counter_party_id(req.counter_party_id)
Expand Down Expand Up @@ -82,7 +82,7 @@ where
&self,
participant_context_id: &str,
req: DataFlowPrepareMessage,
) -> SdkResult<DataFlowResponseMessage> {
) -> SdkResult<DataFlowStatusMessage> {
let mut flow = DataFlow::builder()
.id(req.process_id)
.counter_party_id(req.counter_party_id)
Expand Down Expand Up @@ -120,7 +120,6 @@ where
flow_id: &str,
reason: Option<String>,
) -> SdkResult<()> {
dbg!("Terminating");
let mut tx = self.ctx.begin().await?;
let mut flow = self
.repo
Expand Down
13 changes: 5 additions & 8 deletions crates/sdk/src/sdk_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod prepare {
core::{
db::tx::MockTransaction,
error::{DbError, HandlerError},
model::{data_flow::DataFlowState, messages::DataFlowResponseMessage},
model::{data_flow::DataFlowState, messages::DataFlowStatusMessage},
},
error::SdkError,
sdk::DataPlaneSdk,
Expand All @@ -54,9 +54,8 @@ mod prepare {
.returning(|_| Box::pin(future::ready(Ok(true))));

handler.expect_on_prepare().returning(|_, _| {
Box::pin(future::ready(Ok(DataFlowResponseMessage::builder()
Box::pin(future::ready(Ok(DataFlowStatusMessage::builder()
.state(DataFlowState::Started)
.dataplane_id("dataplane-id")
.build())))
});

Expand Down Expand Up @@ -110,9 +109,8 @@ mod prepare {
});

handler.expect_on_prepare().returning(|_, _| {
Box::pin(future::ready(Ok(DataFlowResponseMessage::builder()
Box::pin(future::ready(Ok(DataFlowStatusMessage::builder()
.state(DataFlowState::Started)
.dataplane_id("dataplane-id")
.build())))
});

Expand Down Expand Up @@ -203,7 +201,7 @@ mod start {
core::{
db::tx::MockTransaction,
error::{DbError, HandlerError},
model::{data_flow::DataFlowState, messages::DataFlowResponseMessage},
model::{data_flow::DataFlowState, messages::DataFlowStatusMessage},
},
error::SdkError,
sdk::DataPlaneSdk,
Expand All @@ -229,9 +227,8 @@ mod start {
.returning(|_| Box::pin(future::ready(Ok(true))));

handler.expect_on_start().returning(|_, _| {
Box::pin(future::ready(Ok(DataFlowResponseMessage::builder()
Box::pin(future::ready(Ok(DataFlowStatusMessage::builder()
.state(DataFlowState::Started)
.dataplane_id("dataplane-id")
.build())))
});

Expand Down
6 changes: 3 additions & 3 deletions examples/example-common/src/controlplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use bon::Builder;
use dataplane_sdk::core::model::{
data_address::DataAddress,
messages::{DataFlowResponseMessage, DataFlowStartMessage, DataFlowStartedNotificationMessage},
messages::{DataFlowStartMessage, DataFlowStartedNotificationMessage, DataFlowStatusMessage},
};
use uuid::Uuid;

Expand Down Expand Up @@ -62,7 +62,7 @@ impl ControlPlaneSimulator {
);
}

let body = response.json::<DataFlowResponseMessage>().await?;
let body = response.json::<DataFlowStatusMessage>().await?;

body.data_address
.ok_or_else(|| anyhow::anyhow!("No data address returned from provider"))
Expand Down Expand Up @@ -150,7 +150,7 @@ impl ControlPlaneSimulator {
);
}

let body = response.json::<DataFlowResponseMessage>().await?;
let body = response.json::<DataFlowStatusMessage>().await?;

Ok(body.data_address)
}
Expand Down
12 changes: 5 additions & 7 deletions examples/sync-pull-dataplane/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use dataplane_sdk::core::{
handler::DataFlowHandler,
model::{
data_flow::{DataFlow, DataFlowState},
messages::DataFlowResponseMessage,
messages::DataFlowStatusMessage,
},
};

Expand Down Expand Up @@ -48,7 +48,7 @@ where
&self,
tx: &mut Self::Transaction,
flow: &DataFlow,
) -> HandlerResult<DataFlowResponseMessage> {
) -> HandlerResult<DataFlowStatusMessage> {
let (token_id, endpoint, data_address) = self
.0
.create_token()
Expand All @@ -68,9 +68,8 @@ where
.await
.map_err(|err| HandlerError::Generic(err.into()))?;

Ok(DataFlowResponseMessage::builder()
Ok(DataFlowStatusMessage::builder()
.data_address(data_address)
.dataplane_id("dataplane-tokens")
.state(DataFlowState::Started)
.build())
}
Expand Down Expand Up @@ -117,9 +116,8 @@ where
&self,
_tx: &mut Self::Transaction,
_flow: &DataFlow,
) -> HandlerResult<DataFlowResponseMessage> {
Ok(DataFlowResponseMessage::builder()
.dataplane_id("dataplane-tokens")
) -> HandlerResult<DataFlowStatusMessage> {
Ok(DataFlowStatusMessage::builder()
.state(DataFlowState::Prepared)
.build())
}
Expand Down