Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sentry-kafka/api/sentry-kafka.api
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public final class io/sentry/kafka/BuildConfig {

public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor {
public static final field TRACE_ORIGIN Ljava/lang/String;
public fun <init> ()V
public fun <init> (Lio/sentry/IScopes;)V
public fun close ()V
public fun configure (Ljava/util/Map;)V
Expand All @@ -15,6 +16,7 @@ public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/k
public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
public static final field TRACE_ORIGIN Ljava/lang/String;
public fun <init> ()V
public fun <init> (Lio/sentry/IScopes;)V
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
public fun close ()V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.sentry.BaggageHeader;
import io.sentry.IScopes;
import io.sentry.ITransaction;
import io.sentry.ScopesAdapter;
import io.sentry.SentryTraceHeader;
import io.sentry.SpanDataConvention;
import io.sentry.SpanStatus;
Expand All @@ -29,6 +30,10 @@ public final class SentryKafkaConsumerInterceptor<K, V> implements ConsumerInter

private final @NotNull IScopes scopes;

public SentryKafkaConsumerInterceptor() {
this(ScopesAdapter.getInstance());
}

public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) {
this.scopes = scopes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.sentry.DateUtils;
import io.sentry.IScopes;
import io.sentry.ISpan;
import io.sentry.ScopesAdapter;
import io.sentry.SentryTraceHeader;
import io.sentry.SpanDataConvention;
import io.sentry.SpanOptions;
Expand All @@ -28,6 +29,10 @@ public final class SentryKafkaProducerInterceptor<K, V> implements ProducerInter
private final @NotNull IScopes scopes;
private final @NotNull String traceOrigin;

public SentryKafkaProducerInterceptor() {
this(ScopesAdapter.getInstance(), TRACE_ORIGIN);
}

public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) {
this(scopes, TRACE_ORIGIN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package io.sentry.kafka

import io.sentry.IScopes
import io.sentry.ITransaction
import io.sentry.Sentry
import io.sentry.SentryOptions
import io.sentry.TransactionContext
import io.sentry.TransactionOptions
import io.sentry.test.initForTest
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertSame
import org.apache.kafka.clients.consumer.ConsumerRecord
Expand All @@ -19,6 +23,20 @@ import org.mockito.kotlin.whenever

class SentryKafkaConsumerInterceptorTest {

@BeforeTest
fun setup() {
initForTest {
it.dsn = "https://key@sentry.io/proj"
it.isEnableQueueTracing = true
it.tracesSampleRate = 1.0
}
}

@AfterTest
fun teardown() {
Sentry.close()
}

@Test
fun `does nothing when queue tracing is disabled`() {
val scopes = mock<IScopes>()
Expand Down Expand Up @@ -64,6 +82,16 @@ class SentryKafkaConsumerInterceptorTest {
interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1)))
}

@Test
fun `no-arg constructor uses current scopes`() {
val interceptor = SentryKafkaConsumerInterceptor<String, String>()
val records = singleRecordBatch()

val result = interceptor.onConsume(records)

assertSame(records, result)
}

private fun singleRecordBatch(): ConsumerRecords<String, String> {
val partition = TopicPartition("my-topic", 0)
val record = ConsumerRecord("my-topic", 0, 0L, "key", "value")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.sentry.kafka

import io.sentry.IScopes
import io.sentry.ISentryLifecycleToken
import io.sentry.Sentry
import io.sentry.SentryOptions
import io.sentry.SentryTraceHeader
Expand All @@ -26,7 +27,11 @@ class SentryKafkaProducerInterceptorTest {

@BeforeTest
fun setup() {
initForTest { it.dsn = "https://key@sentry.io/proj" }
initForTest {
it.dsn = "https://key@sentry.io/proj"
it.isEnableQueueTracing = true
it.tracesSampleRate = 1.0
}
scopes = mock()
options =
SentryOptions().apply {
Expand Down Expand Up @@ -95,4 +100,27 @@ class SentryKafkaProducerInterceptorTest {

assertSame(record, result)
}

@Test
fun `no-arg constructor uses current scopes`() {
val transaction = Sentry.startTransaction("tx", "op")
val record = ProducerRecord("my-topic", "key", "value")

try {
val token: ISentryLifecycleToken = transaction.makeCurrent()
try {
val interceptor = SentryKafkaProducerInterceptor<String, String>()
interceptor.onSend(record)
} finally {
token.close()
}
} finally {
transaction.finish()
}

assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER))
assertNotNull(
record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER)
)
}
}
2 changes: 2 additions & 0 deletions sentry-samples/sentry-samples-console/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ dependencies {
implementation(projects.sentry)
implementation(projects.sentryAsyncProfiler)
implementation(projects.sentryJcache)
implementation(projects.sentryKafka)
implementation(libs.jcache)
implementation(libs.caffeine.jcache)
implementation(libs.kafka.clients)

testImplementation(kotlin(Config.kotlinStdLib))
testImplementation(projects.sentry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.sentry.jcache.SentryJCacheWrapper;
import io.sentry.protocol.Message;
import io.sentry.protocol.User;
import io.sentry.samples.console.kafka.KafkaShowcase;
import java.util.Collections;
import javax.cache.Cache;
import javax.cache.CacheManager;
Expand All @@ -16,6 +17,10 @@ public class Main {
private static long numberOfDiscardedSpansDueToOverflow = 0;

public static void main(String[] args) throws InterruptedException {
final String kafkaBootstrapServers = System.getenv("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS");
final boolean kafkaEnabled =
kafkaBootstrapServers != null && !kafkaBootstrapServers.trim().isEmpty();

Sentry.init(
options -> {
// NOTE: Replace the test DSN below with YOUR OWN DSN to see the events from this app in
Expand Down Expand Up @@ -95,6 +100,7 @@ public static void main(String[] args) throws InterruptedException {

// Enable cache tracing to create spans for cache operations
options.setEnableCacheTracing(true);
options.setEnableQueueTracing(kafkaEnabled);

// Determine traces sample rate based on the sampling context
// options.setTracesSampler(
Expand Down Expand Up @@ -178,6 +184,13 @@ public static void main(String[] args) throws InterruptedException {
// cache.remove, and cache.flush spans as children of the active transaction.
demonstrateCacheTracing();

// Kafka queue tracing with kafka-clients interceptors.
//
// Enable with: SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092
if (kafkaEnabled) {
KafkaShowcase.runKafkaWithSentryInterceptors(kafkaBootstrapServers);
}

// Performance feature
//
// Transactions collect execution time of the piece of code that's executed between the start
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package io.sentry.samples.console.kafka;

import io.sentry.ISentryLifecycleToken;
import io.sentry.ITransaction;
import io.sentry.Sentry;
import io.sentry.kafka.SentryKafkaConsumerInterceptor;
import io.sentry.kafka.SentryKafkaProducerInterceptor;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public final class KafkaShowcase {

public static final String TOPIC = "sentry-topic-console-sample";

private KafkaShowcase() {}

public static void runKafkaWithSentryInterceptors(final String bootstrapServers) {
final CountDownLatch consumedLatch = new CountDownLatch(1);
final Thread consumerThread =
startConsumerWithSentryInterceptor(bootstrapServers, consumedLatch);
final Properties producerProperties = createProducerPropertiesWithSentry(bootstrapServers);

final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo");
try (ISentryLifecycleToken ignored = transaction.makeCurrent()) {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
Thread.sleep(500);
producer.send(new ProducerRecord<>(TOPIC, "sentry-kafka sample message")).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception ignoredException) {
// local broker may not be available when running the sample
}

try {
consumedLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} finally {
consumerThread.interrupt();
try {
consumerThread.join(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
transaction.finish();
}
}

public static Properties createProducerPropertiesWithSentry(final String bootstrapServers) {
final Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProperties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Required for Sentry queue tracing in kafka-clients producer setup.
producerProperties.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName());

// Optional tuning for sample stability in CI/local runs.
producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000);
producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000);

return producerProperties;
}

public static Properties createConsumerPropertiesWithSentry(final String bootstrapServers) {
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProperties.put(
ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// Required for Sentry queue tracing in kafka-clients consumer setup.
consumerProperties.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaConsumerInterceptor.class.getName());

// Optional tuning for sample stability in CI/local runs.
consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000);
consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);

return consumerProperties;
}

private static Thread startConsumerWithSentryInterceptor(
final String bootstrapServers, final CountDownLatch consumedLatch) {
final Thread consumerThread =
new Thread(
() -> {
final Properties consumerProperties =
createConsumerPropertiesWithSentry(bootstrapServers);

try (KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singletonList(TOPIC));

while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) {
final ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(500));
if (!records.isEmpty()) {
consumedLatch.countDown();
break;
}
}
} catch (Exception ignored) {
// local broker may not be available when running the sample
}
},
"sentry-kafka-sample-consumer");
consumerThread.start();
return consumerThread;
}
}
Loading
Loading