Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.a2a.server.events.EventQueueFactory;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.InMemoryQueueManager;
import io.a2a.server.events.MainEventBus;
import io.a2a.server.events.QueueManager;
import io.a2a.server.tasks.TaskStateProvider;
import org.slf4j.Logger;
Expand Down Expand Up @@ -45,10 +46,12 @@ protected ReplicatedQueueManager() {
}

@Inject
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) {
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy,
TaskStateProvider taskStateProvider,
MainEventBus mainEventBus) {
this.replicationStrategy = replicationStrategy;
this.taskStateProvider = taskStateProvider;
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider);
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider, mainEventBus);
}


Expand Down Expand Up @@ -152,12 +155,11 @@ public EventQueue.EventQueueBuilder builder(String taskId) {
// which sends the QueueClosedEvent after the database transaction commits.
// This ensures proper ordering and transactional guarantees.

// Return the builder with callbacks
return delegate.getEventQueueBuilder(taskId)
.taskId(taskId)
.hook(new ReplicationHook(taskId))
.addOnCloseCallback(delegate.getCleanupCallback(taskId))
.taskStateProvider(taskStateProvider);
// Call createBaseEventQueueBuilder() directly to avoid infinite recursion
// (getEventQueueBuilder() would delegate back to this factory, creating a loop)
// The base builder already includes: taskId, cleanup callback, taskStateProvider, mainEventBus
return delegate.createBaseEventQueueBuilder(taskId)
.hook(new ReplicationHook(taskId));
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.a2a.server.events;

public class EventQueueUtil {
public static void start(MainEventBusProcessor processor) {
processor.start();
}

public static void stop(MainEventBusProcessor processor) {
processor.stop();
}
}
28 changes: 28 additions & 0 deletions fix_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python3
import re
import sys
import glob

test_files = glob.glob('**/src/test/java/**/*Test.java', recursive=True)

for path in test_files:
try:
with open(path, 'r') as f:
content = f.read()

# Pattern to match create() calls with 5 arguments ending with executor or internalExecutor
pattern = r'DefaultRequestHandler\.create\(\s*([^,]+),\s*([^,]+),\s*([^,]+),\s*([^,]+),\s*((?:executor|internalExecutor))\s*\)'

def replacer(match):
args = [match.group(i).strip() for i in range(1, 6)]
executor = args[4]
return f'DefaultRequestHandler.create({args[0]}, {args[1]}, {args[2]}, {args[3]}, {executor}, {executor})'

new_content = re.sub(pattern, replacer, content, flags=re.MULTILINE | re.DOTALL)

if new_content != content:
with open(path, 'w') as f:
f.write(new_content)
print(f'Fixed: {path}')
except Exception as e:
print(f'Error processing {path}: {e}')
101 changes: 101 additions & 0 deletions main-rest-tck.log

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import com.google.gson.JsonSyntaxException;
import io.a2a.common.A2AHeaders;
import io.a2a.server.util.sse.SseFormatter;
import io.a2a.grpc.utils.JSONRPCUtils;
import io.a2a.jsonrpc.common.json.IdJsonMappingException;
import io.a2a.jsonrpc.common.json.InvalidParamsJsonMappingException;
Expand Down Expand Up @@ -65,7 +65,6 @@
import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
import io.quarkus.security.Authenticated;
import io.quarkus.vertx.web.Body;
import io.quarkus.vertx.web.ReactiveRoutes;
import io.quarkus.vertx.web.Route;
import io.smallrye.mutiny.Multi;
import io.vertx.core.AsyncResult;
Expand All @@ -74,6 +73,8 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.RoutingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class A2AServerRoutes {
Expand Down Expand Up @@ -135,8 +136,12 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
} else if (streaming) {
final Multi<? extends A2AResponse<?>> finalStreamingResponse = streamingResponse;
executor.execute(() -> {
MultiSseSupport.subscribeObject(
finalStreamingResponse.map(i -> (Object) i), rc);
// Convert Multi<A2AResponse> to Multi<String> with SSE formatting
AtomicLong eventIdCounter = new AtomicLong(0);
Multi<String> sseEvents = finalStreamingResponse
.map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement()));
// Write SSE-formatted strings to HTTP response
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
});

} else {
Expand Down Expand Up @@ -295,41 +300,44 @@ private static com.google.protobuf.MessageOrBuilder convertToProto(A2AResponse<?
}
}

// Port of import io.quarkus.vertx.web.runtime.MultiSseSupport, which is considered internal API
/**
* Simplified SSE support for Vert.x/Quarkus.
* <p>
* This class only handles HTTP-specific concerns (writing to response, backpressure, disconnect).
* SSE formatting and JSON serialization are handled by {@link SseFormatter}.
*/
private static class MultiSseSupport {
private static final Logger logger = LoggerFactory.getLogger(MultiSseSupport.class);

private MultiSseSupport() {
// Avoid direct instantiation.
}

private static void initialize(HttpServerResponse response) {
if (response.bytesWritten() == 0) {
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
response.setChunked(true);
}
}

private static void onWriteDone(Flow.Subscription subscription, AsyncResult<Void> ar, RoutingContext rc) {
if (ar.failed()) {
rc.fail(ar.cause());
} else {
subscription.request(1);
}
}

public static void write(Multi<Buffer> multi, RoutingContext rc) {
/**
* Write SSE-formatted strings to HTTP response.
*
* @param sseStrings Multi stream of SSE-formatted strings (from SseFormatter)
* @param rc Vert.x routing context
* @param context A2A server call context (for EventConsumer cancellation)
*/
public static void writeSseStrings(Multi<String> sseStrings, RoutingContext rc, ServerCallContext context) {
HttpServerResponse response = rc.response();
multi.subscribe().withSubscriber(new Flow.Subscriber<Buffer>() {

sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() {
Flow.Subscription upstream;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.upstream = subscription;
this.upstream.request(1);

// Detect client disconnect and call EventConsumer.cancel() directly
response.closeHandler(v -> {
logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop");
context.invokeEventConsumerCancelCallback();
subscription.cancel();
});

// Notify tests that we are subscribed
Runnable runnable = streamingMultiSseSupportSubscribedRunnable;
if (runnable != null) {
Expand All @@ -338,54 +346,50 @@ public void onSubscribe(Flow.Subscription subscription) {
}

@Override
public void onNext(Buffer item) {
initialize(response);
response.write(item, new Handler<AsyncResult<Void>>() {
public void onNext(String sseEvent) {
// Set SSE headers on first event
if (response.bytesWritten() == 0) {
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
response.setChunked(true);
}

// Write SSE-formatted string to response
response.write(Buffer.buffer(sseEvent), new Handler<AsyncResult<Void>>() {
@Override
public void handle(AsyncResult<Void> ar) {
onWriteDone(upstream, ar, rc);
if (ar.failed()) {
// Client disconnected or write failed - cancel upstream to stop EventConsumer
upstream.cancel();
rc.fail(ar.cause());
} else {
upstream.request(1);
}
}
});
}

@Override
public void onError(Throwable throwable) {
// Cancel upstream to stop EventConsumer when error occurs
upstream.cancel();
rc.fail(throwable);
}

@Override
public void onComplete() {
endOfStream(response);
}
});
}

public static void subscribeObject(Multi<Object> multi, RoutingContext rc) {
AtomicLong count = new AtomicLong();
write(multi.map(new Function<Object, Buffer>() {
@Override
public Buffer apply(Object o) {
if (o instanceof ReactiveRoutes.ServerSentEvent) {
ReactiveRoutes.ServerSentEvent<?> ev = (ReactiveRoutes.ServerSentEvent<?>) o;
long id = ev.id() != -1 ? ev.id() : count.getAndIncrement();
String e = ev.event() == null ? "" : "event: " + ev.event() + "\n";
String data = serializeResponse((A2AResponse<?>) ev.data());
return Buffer.buffer(e + "data: " + data + "\nid: " + id + "\n\n");
if (response.bytesWritten() == 0) {
// No events written - still set SSE content type
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
}
String data = serializeResponse((A2AResponse<?>) o);
return Buffer.buffer("data: " + data + "\nid: " + count.getAndIncrement() + "\n\n");
}
}), rc);
}

private static void endOfStream(HttpServerResponse response) {
if (response.bytesWritten() == 0) { // No item
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
response.end();
}
}
response.end();
});
}
}
}
5 changes: 5 additions & 0 deletions reference/jsonrpc/src/test/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
quarkus.arc.selected-alternatives=io.a2a.server.apps.common.TestHttpClient

# Debug logging for event processing and request handling
quarkus.log.category."io.a2a.server.events".level=DEBUG
quarkus.log.category."io.a2a.server.requesthandlers".level=DEBUG
quarkus.log.category."io.a2a.server.tasks".level=DEBUG
Loading
Loading