diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api index 30faaa1256..6fe7f41222 100644 --- a/sentry-kafka/api/sentry-kafka.api +++ b/sentry-kafka/api/sentry-kafka.api @@ -5,6 +5,7 @@ public final class io/sentry/kafka/BuildConfig { public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor { public static final field TRACE_ORIGIN Ljava/lang/String; + public fun ()V public fun (Lio/sentry/IScopes;)V public fun close ()V public fun configure (Ljava/util/Map;)V @@ -15,6 +16,7 @@ public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/k public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String; public static final field TRACE_ORIGIN Ljava/lang/String; + public fun ()V public fun (Lio/sentry/IScopes;)V public fun (Lio/sentry/IScopes;Ljava/lang/String;)V public fun close ()V diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java index caa773352e..a37d01cd90 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java @@ -3,6 +3,7 @@ import io.sentry.BaggageHeader; import io.sentry.IScopes; import io.sentry.ITransaction; +import io.sentry.ScopesAdapter; import io.sentry.SentryTraceHeader; import io.sentry.SpanDataConvention; import io.sentry.SpanStatus; @@ -29,6 +30,10 @@ public final class SentryKafkaConsumerInterceptor implements ConsumerInter private final @NotNull IScopes scopes; + public SentryKafkaConsumerInterceptor() { + this(ScopesAdapter.getInstance()); + } + public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) { this.scopes = scopes; } diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java index c6b3184b39..923104427e 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java @@ -4,6 +4,7 @@ import io.sentry.DateUtils; import io.sentry.IScopes; import io.sentry.ISpan; +import io.sentry.ScopesAdapter; import io.sentry.SentryTraceHeader; import io.sentry.SpanDataConvention; import io.sentry.SpanOptions; @@ -28,6 +29,10 @@ public final class SentryKafkaProducerInterceptor implements ProducerInter private final @NotNull IScopes scopes; private final @NotNull String traceOrigin; + public SentryKafkaProducerInterceptor() { + this(ScopesAdapter.getInstance(), TRACE_ORIGIN); + } + public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) { this(scopes, TRACE_ORIGIN); } diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt index daee640793..f6786bc8f5 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt @@ -2,9 +2,13 @@ package io.sentry.kafka import io.sentry.IScopes import io.sentry.ITransaction +import io.sentry.Sentry import io.sentry.SentryOptions import io.sentry.TransactionContext import io.sentry.TransactionOptions +import io.sentry.test.initForTest +import kotlin.test.AfterTest +import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertSame import org.apache.kafka.clients.consumer.ConsumerRecord @@ -19,6 +23,20 @@ import org.mockito.kotlin.whenever class SentryKafkaConsumerInterceptorTest { + @BeforeTest + fun setup() { + initForTest { + it.dsn = "https://key@sentry.io/proj" + it.isEnableQueueTracing = true + it.tracesSampleRate = 1.0 + } + } + + @AfterTest + fun teardown() { + Sentry.close() + } + @Test fun `does nothing when queue tracing is disabled`() { val scopes = mock() @@ -64,6 +82,16 @@ class SentryKafkaConsumerInterceptorTest { interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1))) } + @Test + fun `no-arg constructor uses current scopes`() { + val interceptor = SentryKafkaConsumerInterceptor() + val records = singleRecordBatch() + + val result = interceptor.onConsume(records) + + assertSame(records, result) + } + private fun singleRecordBatch(): ConsumerRecords { val partition = TopicPartition("my-topic", 0) val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt index 99b487c1c0..61ac1ab20e 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt @@ -1,6 +1,7 @@ package io.sentry.kafka import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken import io.sentry.Sentry import io.sentry.SentryOptions import io.sentry.SentryTraceHeader @@ -26,7 +27,11 @@ class SentryKafkaProducerInterceptorTest { @BeforeTest fun setup() { - initForTest { it.dsn = "https://key@sentry.io/proj" } + initForTest { + it.dsn = "https://key@sentry.io/proj" + it.isEnableQueueTracing = true + it.tracesSampleRate = 1.0 + } scopes = mock() options = SentryOptions().apply { @@ -95,4 +100,27 @@ class SentryKafkaProducerInterceptorTest { assertSame(record, result) } + + @Test + fun `no-arg constructor uses current scopes`() { + val transaction = Sentry.startTransaction("tx", "op") + val record = ProducerRecord("my-topic", "key", "value") + + try { + val token: ISentryLifecycleToken = transaction.makeCurrent() + try { + val interceptor = SentryKafkaProducerInterceptor() + interceptor.onSend(record) + } finally { + token.close() + } + } finally { + transaction.finish() + } + + assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertNotNull( + record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) + ) + } } diff --git a/sentry-samples/sentry-samples-console/build.gradle.kts b/sentry-samples/sentry-samples-console/build.gradle.kts index 0dc6183b4f..010195c677 100644 --- a/sentry-samples/sentry-samples-console/build.gradle.kts +++ b/sentry-samples/sentry-samples-console/build.gradle.kts @@ -36,8 +36,10 @@ dependencies { implementation(projects.sentry) implementation(projects.sentryAsyncProfiler) implementation(projects.sentryJcache) + implementation(projects.sentryKafka) implementation(libs.jcache) implementation(libs.caffeine.jcache) + implementation(libs.kafka.clients) testImplementation(kotlin(Config.kotlinStdLib)) testImplementation(projects.sentry) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index 0ed0646c7b..4fee0a8374 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -5,6 +5,7 @@ import io.sentry.jcache.SentryJCacheWrapper; import io.sentry.protocol.Message; import io.sentry.protocol.User; +import io.sentry.samples.console.kafka.KafkaShowcase; import java.util.Collections; import javax.cache.Cache; import javax.cache.CacheManager; @@ -16,6 +17,10 @@ public class Main { private static long numberOfDiscardedSpansDueToOverflow = 0; public static void main(String[] args) throws InterruptedException { + final String kafkaBootstrapServers = System.getenv("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS"); + final boolean kafkaEnabled = + kafkaBootstrapServers != null && !kafkaBootstrapServers.trim().isEmpty(); + Sentry.init( options -> { // NOTE: Replace the test DSN below with YOUR OWN DSN to see the events from this app in @@ -95,6 +100,7 @@ public static void main(String[] args) throws InterruptedException { // Enable cache tracing to create spans for cache operations options.setEnableCacheTracing(true); + options.setEnableQueueTracing(kafkaEnabled); // Determine traces sample rate based on the sampling context // options.setTracesSampler( @@ -178,6 +184,13 @@ public static void main(String[] args) throws InterruptedException { // cache.remove, and cache.flush spans as children of the active transaction. demonstrateCacheTracing(); + // Kafka queue tracing with kafka-clients interceptors. + // + // Enable with: SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092 + if (kafkaEnabled) { + KafkaShowcase.runKafkaWithSentryInterceptors(kafkaBootstrapServers); + } + // Performance feature // // Transactions collect execution time of the piece of code that's executed between the start diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java new file mode 100644 index 0000000000..9c84b6f004 --- /dev/null +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -0,0 +1,132 @@ +package io.sentry.samples.console.kafka; + +import io.sentry.ISentryLifecycleToken; +import io.sentry.ITransaction; +import io.sentry.Sentry; +import io.sentry.kafka.SentryKafkaConsumerInterceptor; +import io.sentry.kafka.SentryKafkaProducerInterceptor; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +public final class KafkaShowcase { + + public static final String TOPIC = "sentry-topic-console-sample"; + + private KafkaShowcase() {} + + public static void runKafkaWithSentryInterceptors(final String bootstrapServers) { + final CountDownLatch consumedLatch = new CountDownLatch(1); + final Thread consumerThread = + startConsumerWithSentryInterceptor(bootstrapServers, consumedLatch); + final Properties producerProperties = createProducerPropertiesWithSentry(bootstrapServers); + + final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); + try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { + try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { + Thread.sleep(500); + producer.send(new ProducerRecord<>(TOPIC, "sentry-kafka sample message")).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception ignoredException) { + // local broker may not be available when running the sample + } + + try { + consumedLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } finally { + consumerThread.interrupt(); + try { + consumerThread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + transaction.finish(); + } + } + + public static Properties createProducerPropertiesWithSentry(final String bootstrapServers) { + final Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProperties.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + // Required for Sentry queue tracing in kafka-clients producer setup. + producerProperties.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); + + // Optional tuning for sample stability in CI/local runs. + producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); + producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); + + return producerProperties; + } + + public static Properties createConsumerPropertiesWithSentry(final String bootstrapServers) { + final Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerProperties.put( + ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + // Required for Sentry queue tracing in kafka-clients consumer setup. + consumerProperties.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaConsumerInterceptor.class.getName()); + + // Optional tuning for sample stability in CI/local runs. + consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); + consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); + + return consumerProperties; + } + + private static Thread startConsumerWithSentryInterceptor( + final String bootstrapServers, final CountDownLatch consumedLatch) { + final Thread consumerThread = + new Thread( + () -> { + final Properties consumerProperties = + createConsumerPropertiesWithSentry(bootstrapServers); + + try (KafkaConsumer consumer = + new KafkaConsumer<>(consumerProperties)) { + consumer.subscribe(Collections.singletonList(TOPIC)); + + while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) { + final ConsumerRecords records = + consumer.poll(Duration.ofMillis(500)); + if (!records.isEmpty()) { + consumedLatch.countDown(); + break; + } + } + } catch (Exception ignored) { + // local broker may not be available when running the sample + } + }, + "sentry-kafka-sample-consumer"); + consumerThread.start(); + return consumerThread; + } +} diff --git a/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt b/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt index 2b009167ac..1b512fdc48 100644 --- a/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt +++ b/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt @@ -19,19 +19,7 @@ class ConsoleApplicationSystemTest { @Test fun `console application sends expected events when run as JAR`() { - val jarFile = testHelper.findJar("sentry-samples-console") - val process = - testHelper.launch( - jarFile, - mapOf( - "SENTRY_DSN" to testHelper.dsn, - "SENTRY_TRACES_SAMPLE_RATE" to "1.0", - "SENTRY_ENABLE_PRETTY_SERIALIZATION_OUTPUT" to "false", - "SENTRY_DEBUG" to "true", - "SENTRY_PROFILE_SESSION_SAMPLE_RATE" to "1.0", - "SENTRY_PROFILE_LIFECYCLE" to "TRACE", - ), - ) + val process = launchConsoleProcess() process.waitFor(30, TimeUnit.SECONDS) assertEquals(0, process.exitValue()) @@ -40,6 +28,40 @@ class ConsoleApplicationSystemTest { verifyExpectedEvents() } + @Test + fun `console application sends kafka producer and consumer tracing when kafka is enabled`() { + val process = + launchConsoleProcess(mapOf("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS" to "localhost:9092")) + + process.waitFor(30, TimeUnit.SECONDS) + assertEquals(0, process.exitValue()) + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.transaction == "kafka-demo" && + testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish") + } + + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionHaveOp(transaction, "queue.receive") && + transaction.contexts.trace?.data?.get("messaging.system") == "kafka" + } + } + + private fun launchConsoleProcess(overrides: Map = emptyMap()): Process { + val jarFile = testHelper.findJar("sentry-samples-console") + val env = + mutableMapOf( + "SENTRY_DSN" to testHelper.dsn, + "SENTRY_TRACES_SAMPLE_RATE" to "1.0", + "SENTRY_ENABLE_PRETTY_SERIALIZATION_OUTPUT" to "false", + "SENTRY_DEBUG" to "true", + "SENTRY_PROFILE_SESSION_SAMPLE_RATE" to "1.0", + "SENTRY_PROFILE_LIFECYCLE" to "TRACE", + ) + env.putAll(overrides) + return testHelper.launch(jarFile, env) + } + private fun verifyExpectedEvents() { var profilerId: SentryId? = null // Verify we received a "Fatal message!" event diff --git a/test/system-test-runner.py b/test/system-test-runner.py index 70489c580a..64979b3e0e 100644 --- a/test/system-test-runner.py +++ b/test/system-test-runner.py @@ -42,6 +42,7 @@ import argparse import requests import threading +import socket from pathlib import Path from typing import Optional, List, Tuple from dataclasses import dataclass @@ -65,6 +66,9 @@ "SENTRY_ENABLE_CACHE_TRACING": "true" } +KAFKA_CONTAINER_NAME = "sentry-java-system-test-kafka" +KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" + class ServerType(Enum): TOMCAT = 0 SPRING = 1 @@ -155,6 +159,7 @@ def __init__(self): self.mock_server = Server(name="Mock", pid_filepath="sentry-mock-server.pid") self.tomcat_server = Server(name="Tomcat", pid_filepath="tomcat-server.pid") self.spring_server = Server(name="Spring", pid_filepath="spring-server.pid") + self.kafka_started_by_runner = False # Load existing PIDs if available for server in (self.mock_server, self.tomcat_server, self.spring_server): @@ -196,7 +201,78 @@ def kill_process(self, pid: int, name: str) -> None: except (OSError, ProcessLookupError): print(f"Process {pid} was already dead") + def module_requires_kafka(self, sample_module: str) -> bool: + return sample_module == "sentry-samples-console" + + def wait_for_port(self, host: str, port: int, max_attempts: int = 20) -> bool: + for _ in range(max_attempts): + try: + with socket.create_connection((host, port), timeout=1): + return True + except OSError: + time.sleep(1) + return False + def start_kafka_broker(self) -> None: + if self.wait_for_port("localhost", 9092, max_attempts=1): + print("Kafka broker already running on localhost:9092, reusing it.") + self.kafka_started_by_runner = False + return + + self.stop_kafka_broker() + + print("Starting Kafka broker (Redpanda) for system tests...") + run_result = subprocess.run( + [ + "docker", + "run", + "-d", + "--name", + KAFKA_CONTAINER_NAME, + "-p", + "9092:9092", + "docker.redpanda.com/redpandadata/redpanda:v24.1.9", + "redpanda", + "start", + "--overprovisioned", + "--smp", + "1", + "--memory", + "1G", + "--reserve-memory", + "0M", + "--node-id", + "0", + "--check=false", + "--kafka-addr", + "PLAINTEXT://0.0.0.0:9092", + "--advertise-kafka-addr", + "PLAINTEXT://localhost:9092", + ], + check=False, + capture_output=True, + text=True, + ) + + if run_result.returncode != 0: + raise RuntimeError(f"Failed to start Kafka container: {run_result.stderr}") + + if not self.wait_for_port("localhost", 9092, max_attempts=30): + raise RuntimeError("Kafka broker did not become ready on localhost:9092") + + self.kafka_started_by_runner = True + + def stop_kafka_broker(self) -> None: + if not self.kafka_started_by_runner: + return + + subprocess.run( + ["docker", "rm", "-f", KAFKA_CONTAINER_NAME], + check=False, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + self.kafka_started_by_runner = False def start_sentry_mock_server(self) -> None: """Start the Sentry mock server.""" @@ -557,6 +633,12 @@ def setup_test_infrastructure(self, sample_module: str, java_agent: str, java_agent_auto_init: str, build_before_run: str, server_type: Optional[ServerType]) -> int: """Set up test infrastructure. Returns 0 on success, error code on failure.""" + if self.module_requires_kafka(sample_module): + self.start_kafka_broker() + os.environ["SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS"] = KAFKA_BOOTSTRAP_SERVERS + else: + os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None) + # Build if requested if build_before_run == "1": print("Building before test run") @@ -624,6 +706,8 @@ def run_single_test(self, sample_module: str, java_agent: str, elif server_type == ServerType.SPRING: self.stop_spring_server() self.stop_sentry_mock_server() + self.stop_kafka_broker() + os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None) def run_all_tests(self) -> int: """Run all system tests.""" @@ -954,6 +1038,8 @@ def cleanup_on_exit(self, signum, frame): self.stop_spring_server() self.stop_sentry_mock_server() self.stop_tomcat_server() + self.stop_kafka_broker() + os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None) sys.exit(1) def main(): @@ -1152,6 +1238,8 @@ def main(): runner.stop_spring_server() runner.stop_sentry_mock_server() runner.stop_tomcat_server() + runner.stop_kafka_broker() + os.environ.pop("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS", None) if __name__ == "__main__": sys.exit(main())