diff --git a/agent-flow/src/components/base/jadeNode.jsx b/agent-flow/src/components/base/jadeNode.jsx index 813fc0496e..567e8a6db1 100644 --- a/agent-flow/src/components/base/jadeNode.jsx +++ b/agent-flow/src/components/base/jadeNode.jsx @@ -469,7 +469,7 @@ export const jadeNode = (id, x, y, width, height, parent, drawer) => { * @returns {number} 连接数。 */ self.maxNumToLink = () => { - return 1; + return self.graph?.connectionLimitDisabled ? 100 : 1; }; /** diff --git a/agent-flow/src/components/base/validator.js b/agent-flow/src/components/base/validator.js index 861d4f14e8..3301c7641d 100644 --- a/agent-flow/src/components/base/validator.js +++ b/agent-flow/src/components/base/validator.js @@ -35,7 +35,9 @@ export class NormalNodeConnectorValidator extends Validator { validate() { const nextEvents = this.node.getNextRunnableEvents(); const i18n = this.node.graph.i18n; - if (nextEvents.length !== 1) { + const isConnectionLimitDisabled = Boolean(this.node.graph?.connectionLimitDisabled); + const isValid = isConnectionLimitDisabled ? nextEvents.length >= 1 : nextEvents.length === 1; + if (!isValid) { return Promise.reject({ errorFields: [{ errors: [`${i18n?.t('node') ?? 'node'} ${this.node.text} ${i18n?.t('problemWithConnection') ?? 'problemWithConnection'}`], diff --git a/agent-flow/src/components/code/codeNodeState.jsx b/agent-flow/src/components/code/codeNodeState.jsx index b3dfd55968..7faf063c8a 100644 --- a/agent-flow/src/components/code/codeNodeState.jsx +++ b/agent-flow/src/components/code/codeNodeState.jsx @@ -93,7 +93,7 @@ export const codeNodeState = (id, x, y, width, height, parent, drawer) => { * @override */ self.maxNumToLink = () => { - return 10; + return self.graph?.connectionLimitDisabled ? 100 : 10; }; return self; diff --git a/agent-flow/src/flow/jadeFlowEntry.jsx b/agent-flow/src/flow/jadeFlowEntry.jsx index 155fa45365..72225011ca 100644 --- a/agent-flow/src/flow/jadeFlowEntry.jsx +++ b/agent-flow/src/flow/jadeFlowEntry.jsx @@ -378,6 +378,10 @@ const jadeFlowAgent = (graph) => { graph.destroy(); }; + self.setConnectionLimitDisabled = (disabled) => { + graph.connectionLimitDisabled = Boolean(disabled); + }; + return self; }; @@ -432,6 +436,7 @@ export const JadeFlow = (() => { div, tenant, appId, + connectionLimitDisabled = false, flowConfigData, configs, i18n, @@ -440,7 +445,7 @@ export const JadeFlow = (() => { }) => { const graphDom = getGraphDom(div); const g = jadeFlowGraph(div, 'jadeFlow'); - await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements); + await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled); g.flowType = flowType; const pageData = g.getPageData(0); await g.editFlow(0, graphDom, pageData.id); @@ -470,8 +475,9 @@ export const JadeFlow = (() => { return jadeFlowAgent(g); }; - const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements) => { + const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled = false) => { g.collaboration.mute = true; + g.connectionLimitDisabled = Boolean(connectionLimitDisabled); g.configs = configs; g.i18n = i18n; for (let i = 0; i < importStatements.length; i++) { diff --git a/agent-flow/src/flow/jadeFlowGraph.js b/agent-flow/src/flow/jadeFlowGraph.js index 4ce925df9f..2b5f88db72 100644 --- a/agent-flow/src/flow/jadeFlowGraph.js +++ b/agent-flow/src/flow/jadeFlowGraph.js @@ -76,6 +76,7 @@ export const jadeFlowGraph = (div, title) => { const self = defaultGraph(div, title); self.type = 'jadeFlowGraph'; self.pageType = 'jadeFlowPage'; + self.connectionLimitDisabled = false; self.enableText = false; self.flowMeta = { exceptionFitables: ['modelengine.fit.jober.aipp.fitable.AippFlowExceptionHandler'], diff --git a/agent-flow/src/flow/jadeFlowPage.js b/agent-flow/src/flow/jadeFlowPage.js index f8397cd8ee..4897358347 100644 --- a/agent-flow/src/flow/jadeFlowPage.js +++ b/agent-flow/src/flow/jadeFlowPage.js @@ -40,6 +40,8 @@ export const jadeFlowPage = (div, graph, name, id) => { self.addEventListener('COPY_SHAPE', shapeChangeListener); self.addEventListener('DELETE_SHAPE', shapeChangeListener); + const isConnectionLimitDisabled = () => Boolean(self.graph?.connectionLimitDisabled); + /** * @override */ @@ -305,7 +307,7 @@ export const jadeFlowPage = (div, graph, name, id) => { */ self.canDragOut = (node, connector) => { const lines = self.getEvents().filter(s => s.fromShape === node.id && s.getDefinedFromConnector() === connector); - return lines && lines.length < 1; + return lines.length < (isConnectionLimitDisabled() ? 10 : 1); }; /** @@ -330,7 +332,9 @@ export const jadeFlowPage = (div, graph, name, id) => { } }; - return jadeEvent.fromShape !== jadeEvent.toShape && isConnectorAllowToLink() && isConnectorWithinLimit(); + return jadeEvent.fromShape !== jadeEvent.toShape + && isConnectorAllowToLink() + && (isConnectionLimitDisabled() || isConnectorWithinLimit()); }; /** diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java index 52b343fc44..170aa4ebb4 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContext.java @@ -265,6 +265,29 @@ public List> generate(List data, String position) { return data.stream().map(d -> this.generate(d, position, LocalDateTime.now())).collect(Collectors.toList()); } + /** + * fork一个新的context用于一拖多分支,继承当前context的运行元数据,但生成新的contextId。 + * + * @return 新的分支context + */ + public FlowContext fork() { + return this.convertData(this.data); + } + + /** + * convertData + * + * @param 转换后的数据类型 + * @param data 转换后的数据 + * @return 转换后的context + */ + public FlowContext convertData(R data) { + FlowContext context = this.copyContextWithoutID(data); + context.previous = this.previous; + context.nextPositionId = this.nextPositionId; + return context; + } + /** * 用于when.convert数据时候的转换context,除了包裹的数据类型不一样,所有其他信息都一样 * @@ -274,12 +297,17 @@ public List> generate(List data, String position) { * @return 转换后的context */ public FlowContext convertData(R data, String id) { + FlowContext context = this.copyContextWithoutID(data); + context.previous = this.previous; + context.id = id; + return context; + } + + private FlowContext copyContextWithoutID(R data) { FlowContext context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position, this.parallel, this.parallelMode, LocalDateTime.now()); - context.previous = this.previous; context.status = this.status; context.trans = this.trans; - context.id = id; context.batchId = this.batchId; context.toBatch = this.toBatch; context.createAt = this.createAt; diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java index d49466dfd5..2960a94d80 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java @@ -92,16 +92,16 @@ default void update(List> contexts) { } /** - * updateToSent + * 更新context状态为SENT * - * @param contexts contexts + * @param contexts 上下文列表 */ void updateToSent(List> contexts); /** - * updateToReady + * 更新context状态为READY * - * @param contexts contexts + * @param contexts 上下文列表 */ void updateToReady(List> contexts); diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/FlowDefinition.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/FlowDefinition.java index 4e8c0fb312..b0731280ab 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/FlowDefinition.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/FlowDefinition.java @@ -179,6 +179,7 @@ public FitStream.Publisher convertToFlow(FlowContextRepo rep // startNode不能出现在event的to属性, endNode不能出现在event的from属性 FlowNode toNode = nodeMap.get(event.getTo()); fromNode.subscribe(streamId, flowEnv, toNode, event); + }); }); return getFlowNode(FlowNodeType.START).getPublisher(streamId, repo, messenger, locks); diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowForkNode.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowForkNode.java index 8693234858..2695a298e9 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowForkNode.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowForkNode.java @@ -46,7 +46,8 @@ public class FlowForkNode extends FlowNode { public FitStream.Processor getProcessor(String streamId, FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) { if (!Optional.ofNullable(processor).isPresent()) { - this.processor = new Node<>(streamId, this.metaId, this::forkJuster, repo, messenger, locks, this.type); + this.processor = new Node<>(streamId, this.metaId, this::forkJuster, repo, messenger, locks, this.type, + FlowData.class); this.processor.onError(errorHandler(streamId)); } return this.processor; diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowStateNode.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowStateNode.java index e0266c8d33..7b9c173680 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowStateNode.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowStateNode.java @@ -20,8 +20,13 @@ import modelengine.fit.waterflow.flowsengine.domain.flows.streams.FitStream; import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.Blocks; import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.Node; +import modelengine.fit.waterflow.flowsengine.utils.FlowUtil; import modelengine.fitframework.log.Logger; +import modelengine.fitframework.util.CollectionUtils; +import modelengine.fitframework.util.ObjectUtils; +import modelengine.fitframework.util.StringUtils; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,10 +59,11 @@ public class FlowStateNode extends FlowNode { */ @Override public FitStream.Processor getProcessor(String streamId, FlowContextRepo repo, - FlowContextMessenger messenger, FlowLocks locks) { + FlowContextMessenger messenger, FlowLocks locks) { if (!Optional.ofNullable(this.processor).isPresent()) { Node node = new Node<>(streamId, this.metaId, this::stateProduce, repo, messenger, - locks, this.type); + locks, this.type, FlowData.class); + if (!Objects.isNull(this.jober)) { node.setIsAsyncJob(this.jober.isAsync()); } @@ -74,6 +80,50 @@ public FitStream.Processor getProcessor(String streamId, Flo return this.processor; } + private List> mergeProcessInputs(List> pre) { + if (CollectionUtils.isEmpty(pre) || pre.size() <= 1) { + return pre; + } + if (pre.stream().anyMatch(context -> !(context.getData() instanceof FlowData))) { + return pre; + } + if (pre.stream().map(FlowContext::getPosition).filter(StringUtils::isNotEmpty).distinct().count() <= 1) { + return pre; + } + FlowContext baseContext = pre.get(0); + FlowData mergedFlowData = mergeFlowData(pre, baseContext.getId()); + return Collections.singletonList( + baseContext.convertData(ObjectUtils.cast(mergedFlowData), baseContext.getId())); + } + + private FlowData mergeFlowData(List> pre, String baseContextId) { + FlowData first = pre.get(0).getData(); + Map businessData = new HashMap<>( + Optional.ofNullable(first.getBusinessData()).orElseGet(HashMap::new)); + Map contextData = new HashMap<>( + Optional.ofNullable(first.getContextData()).orElseGet(HashMap::new)); + Map passData = new HashMap<>(Optional.ofNullable(first.getPassData()).orElseGet(HashMap::new)); + + pre.stream().skip(1).map(FlowContext::getData).forEach(flowData -> { + businessData.putAll(FlowUtil.mergeMaps(businessData, + Optional.ofNullable(flowData.getBusinessData()).orElseGet(HashMap::new))); + contextData.putAll(FlowUtil.mergeMaps(contextData, + Optional.ofNullable(flowData.getContextData()).orElseGet(HashMap::new))); + passData.putAll(FlowUtil.mergeMaps(passData, + Optional.ofNullable(flowData.getPassData()).orElseGet(HashMap::new))); + }); + contextData.put(Constant.CONTEXT_ID, baseContextId); + return FlowData.builder() + .operator(first.getOperator()) + .startTime(first.getStartTime()) + .businessData(businessData) + .contextData(contextData) + .passData(passData) + .errorMessage(first.getErrorMessage()) + .errorInfo(first.getErrorInfo()) + .build(); + } + private List stateProduce(List> inputs) { addContextData(inputs); List flowDataList = inputs.stream().map(FlowContext::getData).collect(Collectors.toList()); diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/FlowDataMerger.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/FlowDataMerger.java new file mode 100644 index 0000000000..e947254018 --- /dev/null +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/FlowDataMerger.java @@ -0,0 +1,64 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.flowsengine.domain.flows.streams; + +import modelengine.fit.waterflow.flowsengine.domain.flows.context.FlowContext; +import modelengine.fit.waterflow.flowsengine.domain.flows.context.FlowData; +import modelengine.fit.waterflow.flowsengine.utils.FlowUtil; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * FlowData 类型数据的多输入合并器 + * 用于 fan-in 场景下将多条 FlowData 输入合并为单条处理 + * + * @author 高诗意 + * @since 2023/08/14 + */ +public class FlowDataMerger implements Processors.Merger { + + @Override + public FlowContext merge(List> contexts) { + if (contexts == null || contexts.isEmpty()) { + return null; + } + FlowContext baseContext = contexts.get(0); + FlowData mergedFlowData = mergeFlowData(contexts); + return baseContext.convertData(mergedFlowData, baseContext.getId()); + } + + private FlowData mergeFlowData(List> contexts) { + FlowData first = contexts.get(0).getData(); + Map businessData = new HashMap<>( + Optional.ofNullable(first.getBusinessData()).orElseGet(HashMap::new)); + Map contextData = new HashMap<>( + Optional.ofNullable(first.getContextData()).orElseGet(HashMap::new)); + Map passData = new HashMap<>( + Optional.ofNullable(first.getPassData()).orElseGet(HashMap::new)); + + contexts.stream().skip(1).map(FlowContext::getData).forEach(flowData -> { + businessData.putAll(FlowUtil.mergeMaps(businessData, + Optional.ofNullable(flowData.getBusinessData()).orElseGet(HashMap::new))); + contextData.putAll(FlowUtil.mergeMaps(contextData, + Optional.ofNullable(flowData.getContextData()).orElseGet(HashMap::new))); + passData.putAll(FlowUtil.mergeMaps(passData, + Optional.ofNullable(flowData.getPassData()).orElseGet(HashMap::new))); + }); + return FlowData.builder() + .operator(first.getOperator()) + .startTime(first.getStartTime()) + .businessData(businessData) + .contextData(contextData) + .passData(passData) + .errorMessage(first.getErrorMessage()) + .errorInfo(first.getErrorInfo()) + .build(); + } +} diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/From.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/From.java index 8afe281bad..829d73823d 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/From.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/From.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -322,26 +323,55 @@ public void offer(List> contexts, Consumer // qualifiedWhens表示的与from节点连接的所有事件,条件节点符合条件的事件在这里筛选,在事件上处理需要下发的context java.util.Map, List>> matchedContexts = new LinkedHashMap<>(); Set> matchedContextSet = new HashSet<>(); - qualifiedWhens.forEach( - w -> { - List> afterContexts = contexts - .stream() - .filter(c -> w.getWhether().is(c)) - .peek(c -> c.setNextPositionId(w.getId())) - .collect(Collectors.toList()); - matchedContexts.put(w, afterContexts); - matchedContextSet.addAll(afterContexts); + List> forkedContexts = new ArrayList<>(); + for (FlowContext contextItem : contexts) { + List> matchedSubscriptions = qualifiedWhens.stream() + .filter(w -> w.getWhether().is(contextItem)) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(matchedSubscriptions)) { + continue; + } + matchedContextSet.add(contextItem); + for (int index = 0; index < matchedSubscriptions.size(); index++) { + FitStream.Subscription matchedSubscription = matchedSubscriptions.get(index); + FlowContext branchContext = index == 0 ? contextItem : contextItem.fork(); + branchContext.setNextPositionId(matchedSubscription.getId()); + matchedContexts.computeIfAbsent(matchedSubscription, key -> new ArrayList<>()).add(branchContext); + if (index > 0) { + forkedContexts.add(branchContext); } - ); + } + } + qualifiedWhens.forEach(w -> matchedContexts.computeIfAbsent(w, key -> new ArrayList<>())); List> unMatchedContexts = contexts .stream() .filter(c -> !matchedContextSet.contains(c)) .collect(Collectors.toList()); PreSendCallbackInfo callbackInfo = new PreSendCallbackInfo<>(matchedContexts, unMatchedContexts); preSendCallback.accept(callbackInfo); + persistForkedContexts(forkedContexts); matchedContexts.forEach(FitStream.Subscription::cache); } + // 依赖约束:preSendCallback 仅做只读回调,不改写 matchedContexts,因此 forkedContexts 可直接持久化。 + private void persistForkedContexts(List> forkedContexts) { + if (CollectionUtils.isEmpty(forkedContexts)) { + return; + } + Set traces = forkedContexts.stream() + .flatMap(context -> context.getTraceId().stream()) + .collect(Collectors.toSet()); + Lock lock = this.locks.getDistributedLock(this.locks.streamNodeLockKey(this.streamId, this.id, + "ForkContextPool")); + lock.lock(); + try { + this.repo.updateContextPool(forkedContexts, traces); + this.repo.save(forkedContexts); + } finally { + lock.unlock(); + } + } + /** * 是否有publisher目标 * 用于stream闭环时将没有subscribed的publisher关闭到close subscriber diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/MergerRegistry.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/MergerRegistry.java new file mode 100644 index 0000000000..20329fae31 --- /dev/null +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/MergerRegistry.java @@ -0,0 +1,72 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.flowsengine.domain.flows.streams; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * Merger 注册中心,用于管理数据类型到 Merger 的映射关系 + * 支持插件式扩展,无需修改核心代码即可添加新的 Merger 实现 + * + * @author 高诗意 + * @since 2023/08/14 + */ +public class MergerRegistry { + private static final MergerRegistry INSTANCE = new MergerRegistry(); + + private final Map, Supplier>> registry = new ConcurrentHashMap<>(); + + private MergerRegistry() { + registerDefaults(); + } + + public static MergerRegistry getInstance() { + return INSTANCE; + } + + /** + * 注册指定类型对应的 Merger 工厂 + * + * @param type 数据类型 + * @param mergerFactory Merger 工厂方法 + * @param 输入数据类型 + * @param Merger 实现类型 + */ + @SuppressWarnings("unchecked") + public > void register(Class type, Supplier mergerFactory) { + registry.put(type, (Supplier>) mergerFactory); + } + + /** + * 根据类型获取对应的 Merger + * + * @param type 数据类型 + * @param 泛型类型 + * @return Merger 实例,如果未注册则返回 null + */ + @SuppressWarnings("unchecked") + public Processors.Merger getMerger(Class type) { + Supplier> factory = registry.get(type); + if (factory != null) { + return (Processors.Merger) factory.get(); + } + // 尝试查找父类型或接口 + for (Map.Entry, Supplier>> entry : registry.entrySet()) { + if (entry.getKey().isAssignableFrom(type)) { + return (Processors.Merger) entry.getValue().get(); + } + } + return null; + } + + private void registerDefaults() { + register(modelengine.fit.waterflow.flowsengine.domain.flows.context.FlowData.class, + FlowDataMerger::new); + } +} diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/Processors.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/Processors.java index 7c9f37d696..df3a4d6246 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/Processors.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/Processors.java @@ -198,5 +198,21 @@ public interface Validator { */ boolean check(FlowContext input, List> inputs); } + + /** + * 多输入数据合并器,用于 fan-in 场景下将多条输入合并为单条处理 + * + * @param 输入数据类型 + */ + @FunctionalInterface + public interface Merger { + /** + * 将多条输入上下文合并为单条输出上下文 + * + * @param contexts 待合并的上下文列表 + * @return 合并后的单个上下文,如果无需合并则返回 null + */ + FlowContext merge(List> contexts); + } } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java index c7e783a117..5b3c513537 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java @@ -27,6 +27,7 @@ import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.Blocks; import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.Retryable; import modelengine.fit.waterflow.flowsengine.utils.FlowExecutors; +import modelengine.fit.waterflow.flowsengine.utils.FlowUtil; import modelengine.fit.waterflow.flowsengine.utils.PriorityThreadPool; import modelengine.fitframework.log.Logger; import modelengine.fitframework.util.CollectionUtils; @@ -34,13 +35,7 @@ import modelengine.fitframework.util.StringUtils; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -131,13 +126,13 @@ public class To extends IdGenerator implements FitStream.Subscriber private Boolean isAsyncJob = false; private Processors.Validator validator = (i, all) -> true; + private FanInMode fanInMode = FanInMode.ANY; + private Processors.Map, String> mergeKeyGenerator = this::defaultMergeKey; + private Processors.Merger merger; private Blocks.Block block = null; - private Processors.Filter preFilter = null; - private Processors.Filter postFilter = null; - /** * 该节点只做单数据处理,理解为一条数据一条数据处理,是一个mapping操作 */ @@ -389,6 +384,7 @@ public synchronized void accept(ProcessType type, List> contexts) if (type == ProcessType.PROCESS && (processT == null || !processRunning)) { processRunning = true; String threadName = getThreadName(PROCESS_T_NAME_PREFIX); + processT = new Thread(this::process, threadName); processT.setUncaughtExceptionHandler((tr, ex) -> LOG.error(tr.getName() + " : " + ex.getMessage())); processT.start(); @@ -546,6 +542,13 @@ public Processors.Filter postFilter() { return Optional.ofNullable(this.postFilter).orElseGet(this::defaultFilter); } + private Processors.Filter requestFilter(Processors.Filter fallbackFilter) { + if (!FanInMode.ALL.equals(this.fanInMode)) { + return fallbackFilter; + } + return this::selectReadyMergeGroup; + } + /** * defaultFilter * @@ -567,6 +570,93 @@ public void setValidator(Processors.Validator validator) { } } + public void setFanInMode(FanInMode fanInMode) { + this.fanInMode = Optional.ofNullable(fanInMode).orElse(FanInMode.ANY); + } + + public void setMergeKeyGenerator(Processors.Map, String> mergeKeyGenerator) { + this.mergeKeyGenerator = Optional.ofNullable(mergeKeyGenerator).orElse(this::defaultMergeKey); + } + + public void setMerger(Processors.Merger merger) { + this.merger = merger; + } + + private String defaultMergeKey(FlowContext context) { + String rootId = Optional.ofNullable(context.getRootId()).orElse(""); + String transId = Optional.ofNullable(context.getTrans()).map(trans -> trans.getId()).orElse(""); + String traceIds = context.getTraceId().stream().sorted().collect(Collectors.joining(",")); + return StringUtils.join("|", rootId, transId, traceIds); + } + + private String buildMergeKey(FlowContext context) { + try { + String mergeKey = this.mergeKeyGenerator.process(ObjectUtils.cast(context)); + if (StringUtils.isNotEmpty(mergeKey)) { + return mergeKey; + } + } catch (Exception exception) { + LOG.warn("build merge key failed for context: {}", context.getId(), exception); + } + return defaultMergeKey(context); + } + + private List> filterReadyByFanIn(List> candidates) { + if (CollectionUtils.isEmpty(candidates)) { + return Collections.emptyList(); + } + if (FanInMode.ANY.equals(this.fanInMode)) { + return candidates; + } + + long expectedInputs = this.froms.stream().map(Identity::getId).distinct().count(); + if (expectedInputs <= 1) { + return candidates; + } + + Map>> grouped = candidates.stream().collect(Collectors.groupingBy(this::buildMergeKey)); + Set qualifiedMergeKeys = grouped.entrySet() + .stream() + .filter(entry -> entry.getValue() + .stream() + .map(FlowContext::getPosition) + .filter(StringUtils::isNotEmpty) + .distinct() + .count() >= expectedInputs) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + return candidates.stream().filter(context -> qualifiedMergeKeys.contains(buildMergeKey(context))).collect( + Collectors.toList()); + } + + private List> selectReadyMergeGroup(List> candidates) { + if (CollectionUtils.isEmpty(candidates)) { + return Collections.emptyList(); + } + long expectedInputs = this.froms.stream().map(Identity::getId).distinct().count(); + if (expectedInputs <= 1) { + return candidates; + } + Map>> grouped = new LinkedHashMap<>(); + candidates.forEach(context -> grouped.computeIfAbsent(buildMergeKey(context), key -> new ArrayList<>()).add( + context)); + return grouped.values() + .stream() + .filter(group -> group.stream() + .map(FlowContext::getPosition) + .filter(StringUtils::isNotEmpty) + .distinct() + .count() >= expectedInputs) + .findFirst() + .orElseGet(Collections::emptyList); + } + + private List> markReady(List> contexts) { + this.introduceToProcess(contexts); + return contexts.stream().filter(context -> context.getStatus() == FlowNodeStatus.READY).collect( + Collectors.toList()); + } + public ProcessMode getProcessMode() { return this.processMode; } @@ -574,6 +664,8 @@ public ProcessMode getProcessMode() { @Override public void onSubscribe(FitStream.Subscription subscription) { this.froms.add(subscription); // 将该节点的from的event加入 + long fromCount = this.froms.stream().map(Identity::getId).distinct().count(); + this.fanInMode = fromCount > 1 ? FanInMode.ALL : FanInMode.ANY; } @Override @@ -592,13 +684,14 @@ public void onProcess(List> pre) { this.afterProcess(pre, new ArrayList<>()); return; } + List> processInputs = mergeProcessInputs(pre); if (this.isAsyncJob) { beforeAsyncProcess(pre); - this.getProcessMode().process(this, pre); + this.getProcessMode().process(this, processInputs); return; } logFileTest(this, "before", pre); - List> after = this.getProcessMode().process(this, pre); + List> after = this.getProcessMode().process(this, processInputs); logFileTest(this, "after", pre); if (!isOwnTrace(pre)) { LOG.warn("[AfterProcess] The trace is not belong to this node, traceId={}.", @@ -631,6 +724,33 @@ public void setFailed(List> pre, Exception ex) { Optional.ofNullable(this.globalErrorHandler).ifPresent(handler -> handler.handle(ex, retryable, pre)); } + private List> mergeProcessInputs(List> pre) { + if (!FanInMode.ALL.equals(this.fanInMode) || pre.size() <= 1) { + return pre; + } + if (!(ProcessMode.MAPPING.equals(this.processMode) + || ProcessMode.FLATMAPPING.equals(this.processMode) + || ProcessMode.PRODUCING.equals(this.processMode))) { + return pre; + } +// if (this.merger == null) { +// this.merger = detectMerger(pre); +// } + if (this.merger == null) { + return pre; + } + FlowContext merged = this.merger.merge(pre); + return merged != null ? Collections.singletonList(merged) : pre; + } + + private Processors.Merger detectMerger(List> pre) { + if (pre == null || pre.isEmpty()) { + return null; + } + Class inputType = pre.get(0).getData().getClass(); + return MergerRegistry.getInstance().getMerger(inputType); + } + private boolean isOwnTrace(List> pre) { return pre.get(0).getTraceId().stream().allMatch(traceId -> { if (!repo.getTraceOwnerService().isOwn(traceId)) { @@ -907,7 +1027,8 @@ private List processData(To to, List> conte @Override protected List> requestAll(To to) { return to.repo.requestProducingContext(to.streamId, - to.froms.stream().map(Identity::getId).collect(Collectors.toList()), to.postFilter()); + to.froms.stream().map(Identity::getId).collect(Collectors.toList()), + to.requestFilter(to.postFilter())); } }, REDUCING { @@ -935,7 +1056,8 @@ public List> process(To to, List List> requestAll(To to) { return to.repo.requestMappingContext(to.streamId, - to.froms.stream().map(Identity::getId).collect(Collectors.toList()), to.defaultFilter(), + to.froms.stream().map(Identity::getId).collect(Collectors.toList()), + to.requestFilter(to.defaultFilter()), to.validator); } }, @@ -1063,10 +1185,8 @@ private List> requestReady(To to) { * @return List */ private List> filterReady(To to, List> pre) { - to.introduceToProcess(pre); - return pre.stream() - .filter(context -> context.getStatus() == FlowNodeStatus.READY) - .collect(Collectors.toList()); + List> grouped = to.filterReadyByFanIn(pre); + return to.markReady(grouped); } /** @@ -1098,9 +1218,24 @@ private void handleProcessConcurrentConflict(To to) { if (CollectionUtils.isEmpty(pending) || to.inParallelMode(pending)) { return; } + List> ready = filterReady(to, pending); + if (CollectionUtils.isEmpty(ready)) { + return; + } LOG.info("[{}] process thread conflict happens for stream-id: {}, node-id: {}", to.getThreadName(To.PROCESS_T_NAME_PREFIX), to.streamId, to.id); to.accept(ProcessType.PROCESS, pending); } } + + + /* + 多个数据到达后采用的处理方式 + * */ + public enum FanInMode { +// 表示即到即用 + ANY, +// 表示所有数据全部到达之后才可以使用 + ALL + } } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/When.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/When.java index 0aa62253e2..161c5e19d1 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/When.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/When.java @@ -61,7 +61,7 @@ public class When extends IdGenerator implements FitStream.Subscription When(String streamId, FitStream.Subscriber to, Processors.Map converter, Processors.Whether whether, FlowContextRepo repo, - FlowContextMessenger messenger) { + FlowContextMessenger messenger) { this.streamId = streamId; this.converter = converter == null ? input -> (O) input : converter; this.whether = whether == null ? i -> true : whether; @@ -83,7 +83,7 @@ public When(String streamId, FitStream.Subscriber to, Processors.Map When(String streamId, String eventId, FitStream.Subscriber to, Processors.Map converter, Processors.Whether whether, - FlowContextRepo repo, FlowContextMessenger messenger) { + FlowContextRepo repo, FlowContextMessenger messenger) { this(streamId, to, converter, whether, repo, messenger); this.id = eventId; } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/nodes/Node.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/nodes/Node.java index a59fe15b3d..9e81f339d9 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/nodes/Node.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/nodes/Node.java @@ -18,7 +18,9 @@ import modelengine.fit.waterflow.flowsengine.domain.flows.streams.From; import modelengine.fit.waterflow.flowsengine.domain.flows.streams.Identity; import modelengine.fit.waterflow.flowsengine.domain.flows.streams.Processors; +import modelengine.fit.waterflow.flowsengine.domain.flows.streams.MergerRegistry; import modelengine.fit.waterflow.flowsengine.domain.flows.streams.To; +import modelengine.fitframework.util.ObjectUtils; import modelengine.fit.waterflow.flowsengine.domain.flows.streams.callbacks.PreSendCallbackInfo; import java.util.List; @@ -52,12 +54,46 @@ public Node(String streamId, Processors.Map, R> processor, FlowCo this.publisher = this.initFrom(repo, messenger, locks); } + /** + * 1->1处理节点,自动注入 Merger + * + * @param streamId stream流程ID + * @param processor 对应处理器 + * @param repo 上下文持久化repo,默认在内存 + * @param messenger 上下文事件发送器,默认在内存 + * @param locks 流程锁 + * @param inputType 输入数据类型,用于从 MergerRegistry 获取对应的 Merger + */ + public Node(String streamId, Processors.Map, R> processor, FlowContextRepo repo, + FlowContextMessenger messenger, FlowLocks locks, Class inputType) { + super(streamId, processor, repo, messenger, locks); + this.publisher = this.initFrom(repo, messenger, locks); + this.autoInjectMerger(inputType); + } + public Node(String streamId, Processors.FlatMap, R> processor, FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) { super(streamId, processor, repo, messenger, locks); this.publisher = this.initFrom(repo, messenger, locks); } + /** + * 1->N处理节点,自动注入 Merger + * + * @param streamId stream流程ID + * @param processor 对应处理器 + * @param repo 上下文持久化repo,默认在内存 + * @param messenger 上下文事件发送器,默认在内存 + * @param locks 流程锁 + * @param inputType 输入数据类型,用于从 MergerRegistry 获取对应的 Merger + */ + public Node(String streamId, Processors.FlatMap, R> processor, FlowContextRepo repo, + FlowContextMessenger messenger, FlowLocks locks, Class inputType) { + super(streamId, processor, repo, messenger, locks); + this.publisher = this.initFrom(repo, messenger, locks); + this.autoInjectMerger(inputType); + } + /** * 1->1处理节点 * @@ -75,6 +111,26 @@ public Node(String streamId, String nodeId, Processors.Map, R> pr this.publisher = this.initFrom(repo, messenger, locks); } + /** + * 1->1处理节点,自动注入 Merger + * + * @param streamId stream流程ID + * @param nodeId stream流程节点ID + * @param processor 对应处理器 + * @param repo 上下文持久化repo,默认在内存 + * @param messenger 上下文事件发送器,默认在内存 + * @param locks 流程锁 + * @param nodeType 节点类型 + * @param inputType 输入数据类型,用于从 MergerRegistry 获取对应的 Merger + */ + public Node(String streamId, String nodeId, Processors.Map, R> processor, + FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks, FlowNodeType nodeType, + Class inputType) { + super(streamId, nodeId, processor, repo, messenger, locks, nodeType); + this.publisher = this.initFrom(repo, messenger, locks); + this.autoInjectMerger(inputType); + } + /** * m->n处理节点 * @@ -90,6 +146,23 @@ public Node(String streamId, Processors.Produce, R> processor, Fl this.publisher = this.initFrom(repo, messenger, locks); } + /** + * m->n处理节点,自动注入 Merger + * + * @param streamId stream流程ID + * @param processor 对应处理器 + * @param repo 上下文持久化repo,默认在内存 + * @param messenger 上下文发送器,默认在内存 + * @param locks 流程锁 + * @param inputType 输入数据类型,用于从 MergerRegistry 获取对应的 Merger + */ + public Node(String streamId, Processors.Produce, R> processor, FlowContextRepo repo, + FlowContextMessenger messenger, FlowLocks locks, Class inputType) { + super(streamId, processor, repo, messenger, locks); + this.publisher = this.initFrom(repo, messenger, locks); + this.autoInjectMerger(inputType); + } + /** * m->n处理节点 * @@ -107,6 +180,26 @@ public Node(String streamId, String nodeId, Processors.Produce, R this.publisher = this.initFrom(repo, messenger, locks); } + /** + * m->n处理节点,自动注入 Merger + * + * @param streamId stream流程ID + * @param nodeId stream流程节点ID + * @param processor 对应处理器 + * @param repo 上下文持久化repo,默认在内存 + * @param messenger 上下文发送器,默认在内存 + * @param locks 流程锁 + * @param nodeType 节点类型 + * @param inputType 输入数据类型,用于从 MergerRegistry 获取对应的 Merger + */ + public Node(String streamId, String nodeId, Processors.Produce, R> processor, + FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks, FlowNodeType nodeType, + Class inputType) { + super(streamId, nodeId, processor, repo, messenger, locks, nodeType); + this.publisher = this.initFrom(repo, messenger, locks); + this.autoInjectMerger(inputType); + } + /** * n->1 处理节点 * @@ -122,6 +215,23 @@ public Node(String streamId, Processors.Reduce, R> processor, Flo this.publisher = this.initFrom(repo, messenger, locks); } + /** + * n->1 处理节点,自动注入 Merger + * + * @param streamId stream流程ID + * @param processor 对应处理器 + * @param repo 上下文持久化repo,默认在内存 + * @param messenger 上下文发送器,默认在内存 + * @param locks 流程锁 + * @param inputType 输入数据类型,用于从 MergerRegistry 获取对应的 Merger + */ + public Node(String streamId, Processors.Reduce, R> processor, FlowContextRepo repo, + FlowContextMessenger messenger, FlowLocks locks, Class inputType) { + super(streamId, processor, repo, messenger, locks); + this.publisher = this.initFrom(repo, messenger, locks); + this.autoInjectMerger(inputType); + } + /** * n->1 处理节点 * @@ -163,12 +273,25 @@ protected Node(String streamId, Processors.Map, R> processor, Flo * @param locks 流程锁 * @return From */ - private From initFrom(FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) { + protected From initFrom(FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) { From from = new From<>(this.getStreamId(), repo, messenger, locks); // node里的from跟随subscriber的streamId from.setId(this.getId()); return from; } + /** + * 自动从 MergerRegistry 注入 Merger + * 仅在用户未手动设置 Merger 时注入 + * + * @param inputType 输入数据类型,用于从 Registry 获取对应的 Merger + */ + protected void autoInjectMerger(Class inputType) { + Processors.Merger registered = ObjectUtils.cast(MergerRegistry.getInstance().getMerger(inputType)); + if (registered != null) { + this.setMerger(registered); + } + } + @Override public FitStream.Processor conditions( Processors.Just> processor, Processors.Map convert, Processors.Whether whether) { diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/ParallelNodeRule.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/ParallelNodeRule.java index 0cf287c7b7..a767b0858b 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/ParallelNodeRule.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/ParallelNodeRule.java @@ -25,6 +25,6 @@ public class ParallelNodeRule implements NodeRule { */ @Override public void apply(FlowNode flowNode) { - Validation.same(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, exception("parallel node event size")); + Validation.greaterThanOrEquals(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, exception("parallel node event size")); } } diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StartNodeRule.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StartNodeRule.java index 53052941fb..fc4309cbeb 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StartNodeRule.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StartNodeRule.java @@ -27,7 +27,7 @@ public class StartNodeRule implements NodeRule { */ @Override public void apply(FlowNode flowNode) { - Validation.same(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, + Validation.greaterThanOrEquals(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, () -> new WaterflowParamException(INVALID_START_NODE_EVENT_SIZE)); validateNull(flowNode.getJober(), "start node jober should be null"); validateTriggerMode(flowNode, "start node trigger mode"); diff --git a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StateNodeRule.java b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StateNodeRule.java index 513b418795..ab2c3baf11 100644 --- a/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StateNodeRule.java +++ b/app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/validators/rules/nodes/StateNodeRule.java @@ -27,7 +27,7 @@ public class StateNodeRule implements NodeRule { */ @Override public void apply(FlowNode flowNode) { - Validation.same(flowNode.getEvents().size(), EXPECT_EVENT_SIZE, + Validation.greaterThanOrEquals(flowNode.getEvents().size(), MINIMUM_EVENT_SIZE, () -> new WaterflowParamException(INVALID_STATE_NODE_EVENT_SIZE)); if (!flowNode.getTriggerMode().isAuto()) { Validation.notNull(flowNode.getTask(), exception("Flow node task error")); diff --git a/docker/dev-app-builder.sh b/docker/dev-app-builder.sh index 8e93e5b7f3..41f0fad53f 100644 --- a/docker/dev-app-builder.sh +++ b/docker/dev-app-builder.sh @@ -44,6 +44,7 @@ docker cp "$PLUGINS_DIR"/. app-builder-tmp:/opt/fit-framework/plugins/ echo "Copying shared libraries..." docker cp "$SHARED_DIR"/. app-builder-tmp:/opt/fit-framework/shared/ +docker exec app-builder-tmp bash -c "rm -f /opt/fit-framework/plugins/authentication-oauth2-client-1.0.0-SNAPSHOT.jar" # Commit as development version echo "Committing development version image: ${DEV_VERSION}" docker commit --change='ENTRYPOINT ["/opt/fit-framework/bin/start.sh"]' app-builder-tmp ${REPO}/app-builder:${DEV_VERSION} diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index c6e37e5f19..a3630f7025 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -118,6 +118,7 @@ services: - "./app-platform-tmp/app-builder:/var/share" ports: - "8004:8004" + - "5005:5005" fit-runtime-java: container_name: fit-runtime-java diff --git a/frontend/src/pages/addFlow/components/addflow-header.tsx b/frontend/src/pages/addFlow/components/addflow-header.tsx index 2a161db9ab..b033eb3d90 100644 --- a/frontend/src/pages/addFlow/components/addflow-header.tsx +++ b/frontend/src/pages/addFlow/components/addflow-header.tsx @@ -37,7 +37,13 @@ import timeImg from '@/assets/images/ai/time.png'; const AddHeader = (props) => { const dispatch = useAppDispatch(); const { t } = useTranslation(); - const { handleDebugClick, workFlow, types, saveTime, updateAippCallBack } = props; + const { + handleDebugClick, + workFlow, + types, + saveTime, + updateAippCallBack, + } = props; const { appInfo, setFlowInfo } = useContext(FlowContext); const [open, setOpen] = useState(false); const [imgPath, setImgPath] = useState(''); diff --git a/frontend/src/pages/addFlow/components/elsa-stage.tsx b/frontend/src/pages/addFlow/components/elsa-stage.tsx index d7685c954e..a535d2e29e 100644 --- a/frontend/src/pages/addFlow/components/elsa-stage.tsx +++ b/frontend/src/pages/addFlow/components/elsa-stage.tsx @@ -157,6 +157,7 @@ const Stage = (props) => { div: stageDom, tenant: tenantId, appId: realAppId, + connectionLimitDisabled: false, flowConfigData: data, configs: CONFIGS, i18n, diff --git a/frontend/src/pages/addFlow/index.tsx b/frontend/src/pages/addFlow/index.tsx index 689e79c1ef..6e8bcfce66 100644 --- a/frontend/src/pages/addFlow/index.tsx +++ b/frontend/src/pages/addFlow/index.tsx @@ -147,7 +147,6 @@ const AddFlow = (props) => { { updateAippCallBack, mashupClick, openDebug, - saveTime + saveTime, } = props; const { aippId } = useParams(); const testStatus = useAppSelector((state) => state.flowTestStore.testStatus); diff --git a/frontend/src/pages/components/styles/header.scss b/frontend/src/pages/components/styles/header.scss index 065ae83bcf..d8021ed3c2 100644 --- a/frontend/src/pages/components/styles/header.scss +++ b/frontend/src/pages/components/styles/header.scss @@ -103,6 +103,10 @@ margin-right: 5px; } } + .link-limit-btn-active { + color: #047bfc; + border-color: #047bfc; + } .publish-btn { color: #fff; background-color: #047bfc; diff --git a/frontend/src/styles/workSpace.scss b/frontend/src/styles/workSpace.scss index 1c9198edd9..c36b697144 100644 --- a/frontend/src/styles/workSpace.scss +++ b/frontend/src/styles/workSpace.scss @@ -74,4 +74,9 @@ color: rgba(0, 0, 0, 0.25); border-color: #d9d9d9; } + .link-limit-btn-active { + color: #047bfc; + background-color: #e6f4ff; + border-color: #91caff; + } }