diff --git a/bin/automq-cost-estimator.sh b/bin/automq-cost-estimator.sh new file mode 100755 index 0000000000..16f7e8aa77 --- /dev/null +++ b/bin/automq-cost-estimator.sh @@ -0,0 +1,38 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# AutoMQ S3 Cost Estimator +# +# Estimates the monthly S3 cost of a running AutoMQ cluster and compares it +# to an equivalent Apache Kafka (EBS-backed) deployment cost. +# +# Usage: +# bin/automq-cost-estimator.sh --bootstrap-server [OPTIONS] +# +# Options: +# -B, --bootstrap-server HOST:PORT Bootstrap server (required) +# -c, --config FILE Admin client properties file +# --storage-price USD S3 storage price per GiB/month (default: 0.023) +# --put-price USD S3 PUT price per request (default: 0.000005) +# --get-price USD S3 GET price per request (default: 0.0000004) +# --ebs-price USD EBS gp3 price per GiB/month (default: 0.08) +# --replication-factor N Kafka replication factor for comparison (default: 3) +# --overprovision-factor MULT Kafka EBS over-provision multiplier (default: 2.0) +# --output text|json Output format (default: text) +# --per-topic Show per-topic storage cost breakdown + +exec "$(dirname "$0")/kafka-run-class.sh" \ + org.apache.kafka.tools.automq.S3CostEstimatorCommand "$@" diff --git a/gradle.properties b/gradle.properties index 88d63be550..dfcec42845 100644 --- a/gradle.properties +++ b/gradle.properties @@ -31,3 +31,6 @@ swaggerVersion=2.2.8 task=build org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC org.gradle.parallel=true +# AutoMQ inject start +scalaOptimizerMode=none +# AutoMQ inject end diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/S3CostEstimatorCommand.java b/tools/src/main/java/org/apache/kafka/tools/automq/S3CostEstimatorCommand.java new file mode 100644 index 0000000000..e047e725c1 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/automq/S3CostEstimatorCommand.java @@ -0,0 +1,307 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.automq; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeLogDirsResult; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.clients.admin.ReplicaInfo; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.tools.automq.cost.ClusterStorageInfo; +import org.apache.kafka.tools.automq.cost.CostCalculator; +import org.apache.kafka.tools.automq.cost.CostEstimatorConfig; +import org.apache.kafka.tools.automq.cost.CostReportPrinter; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.internal.HelpScreenException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +/** + * AutoMQ S3 Cost Estimator CLI tool. + *

+ * Connects to a running AutoMQ cluster via the Admin API, collects topic and + * storage metadata, and prints an estimated monthly cost breakdown for: + *

+ * It also computes the equivalent Apache Kafka (EBS-backed) monthly cost so + * users can immediately see their cloud cost savings. + *

+ * Entry point: {@code bin/automq-cost-estimator.sh} + */ +public class S3CostEstimatorCommand { + + public static void main(String[] args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (ArgumentParserException e) { + System.err.println(e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println("ERROR: " + e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + ArgumentParser parser = buildParser(); + Namespace ns; + try { + ns = parser.parseArgs(args); + } catch (HelpScreenException e) { + Exit.exit(0); + return; + } + + CostEstimatorConfig config = buildConfig(ns); + Properties adminProps = buildAdminProperties(config); + + try (Admin admin = Admin.create(adminProps)) { + ClusterStorageInfo info = collectClusterInfo(admin, config); + CostCalculator.CostReport report = new CostCalculator().calculate(info, config); + new CostReportPrinter(System.out).print(info, report, config); + } + } + + // ----------------------------------------------------------------------- + // Cluster data collection + // ----------------------------------------------------------------------- + + /** + * Collects cluster metadata and storage information from the Admin API. + * + * @param admin a connected Admin client + * @param config tool configuration controlling which data to collect + * @return an immutable {@link ClusterStorageInfo} snapshot + */ + static ClusterStorageInfo collectClusterInfo(Admin admin, + CostEstimatorConfig config) throws Exception { + + // 1. Cluster identity and broker list + String clusterId = admin.describeCluster().clusterId().get(); + Set brokerIds = admin.describeCluster().nodes().get() + .stream().map(Node::id).collect(Collectors.toSet()); + + // 2. Topic list (exclude internal topics like __consumer_offsets) + Set topicNames = admin.listTopics().names().get(); + + // 3. Describe topics to count total partitions + DescribeTopicsResult describeTopics = admin.describeTopics(new ArrayList<>(topicNames)); + Map topicDescriptions = describeTopics.allTopicNames().get(); + + int partitionCount = topicDescriptions.values().stream() + .mapToInt(td -> td.partitions().size()) + .sum(); + + // 4. Describe log dirs to estimate storage sizes + DescribeLogDirsResult logDirsResult = admin.describeLogDirs(brokerIds); + Map> logDirsByBroker = + logDirsResult.allDescriptions().get(); + + // Aggregate per-topic bytes (de-duplicate replicas by keeping the first seen per partition) + Map partitionBytesMap = new HashMap<>(); + for (Map logDirs : logDirsByBroker.values()) { + for (LogDirDescription logDir : logDirs.values()) { + for (Map.Entry entry + : logDir.replicaInfos().entrySet()) { + // Only count each partition once (avoid triple-counting replicas) + partitionBytesMap.putIfAbsent(entry.getKey(), entry.getValue().size()); + } + } + } + + long totalLogBytes = partitionBytesMap.values().stream() + .mapToLong(Long::longValue).sum(); + + // Per-topic aggregation (only computed when --per-topic is set) + Map topicBytes = Collections.emptyMap(); + if (config.perTopic) { + Map topicBytesMap = new HashMap<>(); + for (Map.Entry entry : partitionBytesMap.entrySet()) { + topicBytesMap.merge(entry.getKey().topic(), entry.getValue(), Long::sum); + } + topicBytes = topicBytesMap; + } + + return ClusterStorageInfo.builder() + .clusterId(clusterId) + .brokerCount(brokerIds.size()) + .topicCount(topicNames.size()) + .partitionCount(partitionCount) + .totalLogBytes(totalLogBytes) + .topicBytes(topicBytes) + .build(); + } + + // ----------------------------------------------------------------------- + // CLI argument parsing + // ----------------------------------------------------------------------- + + private static ArgumentParser buildParser() { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("automq-cost-estimator") + .defaultHelp(true) + .description("Estimates the monthly S3 cost of your AutoMQ cluster and compares it\n" + + "to the equivalent Apache Kafka (EBS-backed) deployment cost.\n\n" + + "Storage sizes are approximated from WAL log-dir data reported by each broker.\n" + + "In a fully diskless deployment, bulk data lives in S3; treat the output as\n" + + "a lower-bound estimate and see the disclaimer in the report footer."); + + // Connection + parser.addArgument("-B", "--bootstrap-server") + .action(store()) + .required(true) + .dest("bootstrapServer") + .metavar("HOST:PORT") + .help("Bootstrap server address of the AutoMQ cluster."); + + parser.addArgument("--config", "-c") + .action(store()) + .dest("configFile") + .metavar("FILE") + .help("Path to a properties file with additional Admin client configuration."); + + // S3 pricing + parser.addArgument("--storage-price") + .action(store()) + .type(Double.class) + .setDefault(0.023) + .dest("storagePricePerGib") + .metavar("USD") + .help("S3 storage price per GiB per month (default: $0.023, AWS us-east-1)."); + + parser.addArgument("--put-price") + .action(store()) + .type(Double.class) + .setDefault(0.000005) + .dest("putPricePerRequest") + .metavar("USD") + .help("S3 PUT request price per request (default: $0.000005)."); + + parser.addArgument("--get-price") + .action(store()) + .type(Double.class) + .setDefault(0.0000004) + .dest("getPricePerRequest") + .metavar("USD") + .help("S3 GET request price per request (default: $0.0000004)."); + + // Kafka comparison + parser.addArgument("--ebs-price") + .action(store()) + .type(Double.class) + .setDefault(0.08) + .dest("ebsPricePerGib") + .metavar("USD") + .help("EBS gp3 price per GiB per month used for Kafka comparison (default: $0.08)."); + + parser.addArgument("--replication-factor") + .action(store()) + .type(Integer.class) + .setDefault(3) + .dest("kafkaReplicationFactor") + .metavar("N") + .help("Replication factor to simulate for the Kafka EBS cost (default: 3)."); + + parser.addArgument("--overprovision-factor") + .action(store()) + .type(Double.class) + .setDefault(2.0) + .dest("overprovisionFactor") + .metavar("MULTIPLIER") + .help("Over-provisioning multiplier for the Kafka EBS size (default: 2.0)."); + + // Output + parser.addArgument("--output") + .action(store()) + .choices("text", "json") + .setDefault("text") + .dest("outputFormat") + .help("Output format: 'text' (default) or 'json'."); + + parser.addArgument("--per-topic") + .action(storeTrue()) + .dest("perTopic") + .help("Print a per-topic storage cost breakdown in addition to the cluster total."); + + return parser; + } + + private static CostEstimatorConfig buildConfig(Namespace ns) { + return CostEstimatorConfig.builder(ns.getString("bootstrapServer")) + .configFile(ns.getString("configFile")) + .storagePricePerGib(ns.getDouble("storagePricePerGib")) + .putPricePerRequest(ns.getDouble("putPricePerRequest")) + .getPricePerRequest(ns.getDouble("getPricePerRequest")) + .ebsPricePerGib(ns.getDouble("ebsPricePerGib")) + .kafkaReplicationFactor(ns.getInt("kafkaReplicationFactor")) + .overprovisionFactor(ns.getDouble("overprovisionFactor")) + .outputFormat(ns.getString("outputFormat")) + .perTopic(ns.getBoolean("perTopic")) + .build(); + } + + private static Properties buildAdminProperties(CostEstimatorConfig config) throws IOException { + Properties props = new Properties(); + if (config.configFile != null) { + props.putAll(Utils.loadProps(config.configFile)); + } + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer); + props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "automq-cost-estimator"); + props.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); + return props; + } + + // Expose for testing + static List filterInternalTopics(Set topics) { + return topics.stream() + .filter(t -> !t.startsWith("__")) + .collect(Collectors.toList()); + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/cost/ClusterStorageInfo.java b/tools/src/main/java/org/apache/kafka/tools/automq/cost/ClusterStorageInfo.java new file mode 100644 index 0000000000..0889e453c3 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/automq/cost/ClusterStorageInfo.java @@ -0,0 +1,123 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.automq.cost; + +import java.util.Collections; +import java.util.Map; + +/** + * Holds the storage and metadata facts collected from a live AutoMQ cluster. + *

+ * Instances are created by {@code S3CostEstimatorCommand} after querying the + * Admin API, and are then handed to {@link CostCalculator} for cost computation. + *

+ * Note: {@code totalLogBytes} is derived from {@code describeLogDirs()} which + * reflects data currently held in local WAL directories. In a fully diskless + * AutoMQ deployment the bulk of data lives in S3, so this value is used as an + * approximation. The printed report includes a disclaimer about this limitation. + */ +public final class ClusterStorageInfo { + + /** Cluster identifier returned by the Admin API. */ + public final String clusterId; + + /** Number of brokers currently registered in the cluster. */ + public final int brokerCount; + + /** Total number of user-visible topics (internal topics excluded). */ + public final int topicCount; + + /** Total number of partitions across all user-visible topics. */ + public final int partitionCount; + + /** + * Sum of replica log sizes in bytes reported by {@code describeLogDirs()}. + * This is an approximation of the logical data volume. + */ + public final long totalLogBytes; + + /** + * Per-topic sum of replica log sizes in bytes. + * Key: topic name. Value: total bytes across all partitions of that topic. + * Empty when per-topic breakdown was not requested. + */ + public final Map topicBytes; + + private ClusterStorageInfo(Builder builder) { + this.clusterId = builder.clusterId; + this.brokerCount = builder.brokerCount; + this.topicCount = builder.topicCount; + this.partitionCount = builder.partitionCount; + this.totalLogBytes = builder.totalLogBytes; + this.topicBytes = Collections.unmodifiableMap(builder.topicBytes); + } + + /** Returns a new builder for {@link ClusterStorageInfo}. */ + public static Builder builder() { + return new Builder(); + } + + /** Builder for {@link ClusterStorageInfo}. */ + public static final class Builder { + private String clusterId = "unknown"; + private int brokerCount; + private int topicCount; + private int partitionCount; + private long totalLogBytes; + private Map topicBytes = Collections.emptyMap(); + + private Builder() { + } + + public Builder clusterId(String clusterId) { + this.clusterId = clusterId; + return this; + } + + public Builder brokerCount(int brokerCount) { + this.brokerCount = brokerCount; + return this; + } + + public Builder topicCount(int topicCount) { + this.topicCount = topicCount; + return this; + } + + public Builder partitionCount(int partitionCount) { + this.partitionCount = partitionCount; + return this; + } + + public Builder totalLogBytes(long totalLogBytes) { + this.totalLogBytes = totalLogBytes; + return this; + } + + public Builder topicBytes(Map topicBytes) { + this.topicBytes = topicBytes; + return this; + } + + public ClusterStorageInfo build() { + return new ClusterStorageInfo(this); + } + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/cost/CostCalculator.java b/tools/src/main/java/org/apache/kafka/tools/automq/cost/CostCalculator.java new file mode 100644 index 0000000000..7616173938 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/automq/cost/CostCalculator.java @@ -0,0 +1,143 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.automq.cost; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Pure cost calculation logic for the AutoMQ S3 Cost Estimator. + *

+ * This class is stateless and has no I/O dependencies, making it straightforward + * to unit-test in isolation. + *

+ * Cost model: + *

+ */ +public class CostCalculator { + + private static final double BYTES_PER_GIB = 1024.0 * 1024.0 * 1024.0; + private static final int DAYS_PER_MONTH = 30; + + /** + * Immutable result produced by {@link #calculate}. + * All monetary values are in USD per month. + */ + public static final class CostReport { + + /** Total logical data volume in GiB. */ + public final double totalGib; + + /** Monthly cost of S3 storage in USD. */ + public final double storageCostUsd; + + /** Estimated monthly cost of S3 PUT requests in USD. */ + public final double putCostUsd; + + /** Estimated monthly cost of S3 GET requests in USD. */ + public final double getCostUsd; + + /** Total estimated monthly AutoMQ cost in USD (storage + PUT + GET). */ + public final double automqTotalUsd; + + /** EBS storage volume (in GiB) required for an equivalent Kafka cluster. */ + public final double kafkaEbsGib; + + /** Monthly EBS cost for the equivalent Kafka cluster in USD. */ + public final double kafkaEbsCostUsd; + + /** Monthly savings of AutoMQ vs Kafka in USD. */ + public final double savingsUsd; + + /** Savings as a percentage of the Kafka EBS cost. */ + public final double savingsPct; + + /** + * Per-topic cost breakdown in USD per month. + * Key: topic name. Value: estimated monthly storage cost. + * Empty when no per-topic data was provided. + */ + public final Map perTopicStorageCostUsd; + + CostReport(double totalGib, double storageCostUsd, double putCostUsd, double getCostUsd, + double kafkaEbsGib, double kafkaEbsCostUsd, Map perTopicStorageCostUsd) { + this.totalGib = totalGib; + this.storageCostUsd = storageCostUsd; + this.putCostUsd = putCostUsd; + this.getCostUsd = getCostUsd; + this.automqTotalUsd = storageCostUsd + putCostUsd + getCostUsd; + this.kafkaEbsGib = kafkaEbsGib; + this.kafkaEbsCostUsd = kafkaEbsCostUsd; + this.savingsUsd = Math.max(0.0, kafkaEbsCostUsd - this.automqTotalUsd); + this.savingsPct = kafkaEbsCostUsd > 0 + ? (this.savingsUsd / kafkaEbsCostUsd) * 100.0 + : 0.0; + this.perTopicStorageCostUsd = Collections.unmodifiableMap(perTopicStorageCostUsd); + } + } + + /** + * Computes the monthly cost estimate for an AutoMQ cluster and the equivalent + * Apache Kafka (EBS-backed) cluster. + * + * @param info cluster storage facts collected from the Admin API + * @param config pricing parameters and Kafka comparison factors + * @return an immutable {@link CostReport} containing all computed values + */ + public CostReport calculate(ClusterStorageInfo info, CostEstimatorConfig config) { + double totalGib = info.totalLogBytes / BYTES_PER_GIB; + + // S3 storage cost + double storageCost = totalGib * config.storagePricePerGib; + + // S3 PUT cost: each partition uploads WAL objects periodically + double totalPuts = (double) info.partitionCount + * config.estimatedPutsPerPartitionPerDay + * DAYS_PER_MONTH; + double putCost = totalPuts * config.putPricePerRequest; + + // S3 GET cost: consumers fetch from S3 + double totalGets = (double) info.partitionCount + * config.estimatedGetsPerPartitionPerDay + * DAYS_PER_MONTH; + double getCost = totalGets * config.getPricePerRequest; + + // Kafka EBS equivalent: replicated + over-provisioned + double kafkaEbsGib = totalGib * config.kafkaReplicationFactor * config.overprovisionFactor; + double kafkaEbsCost = kafkaEbsGib * config.ebsPricePerGib; + + // Per-topic storage breakdown + Map perTopicCosts = new LinkedHashMap<>(); + for (Map.Entry entry : info.topicBytes.entrySet()) { + double topicGib = entry.getValue() / BYTES_PER_GIB; + perTopicCosts.put(entry.getKey(), topicGib * config.storagePricePerGib); + } + + return new CostReport(totalGib, storageCost, putCost, getCost, + kafkaEbsGib, kafkaEbsCost, perTopicCosts); + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/cost/CostEstimatorConfig.java b/tools/src/main/java/org/apache/kafka/tools/automq/cost/CostEstimatorConfig.java new file mode 100644 index 0000000000..6f976a2a9c --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/automq/cost/CostEstimatorConfig.java @@ -0,0 +1,190 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.automq.cost; + +/** + * Immutable configuration for the S3 Cost Estimator tool. + *

+ * Holds the bootstrap server address, optional Admin client property file path, + * S3/EBS pricing parameters, and Kafka comparison factors. All prices are in + * USD. Sizes are in GiB. + */ +public final class CostEstimatorConfig { + + /** Bootstrap server connection string for the Admin client. */ + public final String bootstrapServer; + + /** Optional path to an Admin client properties file. May be null. */ + public final String configFile; + + /** + * S3 storage price in USD per GiB per month. + * Default: AWS us-east-1 S3 Standard storage ($0.023/GiB). + */ + public final double storagePricePerGib; + + /** + * S3 PUT request price in USD per request. + * Default: AWS us-east-1 ($0.000005 per request = $5 per million). + */ + public final double putPricePerRequest; + + /** + * S3 GET request price in USD per request. + * Default: AWS us-east-1 ($0.0000004 per request = $0.40 per million). + */ + public final double getPricePerRequest; + + /** + * EBS gp3 price in USD per GiB per month used for the Kafka cost comparison. + * Default: AWS us-east-1 gp3 ($0.08/GiB). + */ + public final double ebsPricePerGib; + + /** + * Replication factor to assume when simulating an equivalent Apache Kafka cluster. + * Default: 3 (standard production setup). + */ + public final int kafkaReplicationFactor; + + /** + * Over-provisioning multiplier applied on top of the replicated storage size + * to account for typical Kafka disk headroom requirements. + * Default: 2.0. + */ + public final double overprovisionFactor; + + /** + * Estimated number of S3 PUT requests (WAL object uploads) per partition per day. + * One WAL upload happens roughly every minute under moderate load. + */ + public final int estimatedPutsPerPartitionPerDay; + + /** + * Estimated number of S3 GET requests (consumer fetches) per partition per day. + * Approximately 5 fetches per minute per partition under moderate load. + */ + public final int estimatedGetsPerPartitionPerDay; + + /** Output format: "text" (default) or "json". */ + public final String outputFormat; + + /** Whether to print a per-topic cost breakdown. */ + public final boolean perTopic; + + private CostEstimatorConfig(Builder builder) { + this.bootstrapServer = builder.bootstrapServer; + this.configFile = builder.configFile; + this.storagePricePerGib = builder.storagePricePerGib; + this.putPricePerRequest = builder.putPricePerRequest; + this.getPricePerRequest = builder.getPricePerRequest; + this.ebsPricePerGib = builder.ebsPricePerGib; + this.kafkaReplicationFactor = builder.kafkaReplicationFactor; + this.overprovisionFactor = builder.overprovisionFactor; + this.estimatedPutsPerPartitionPerDay = builder.estimatedPutsPerPartitionPerDay; + this.estimatedGetsPerPartitionPerDay = builder.estimatedGetsPerPartitionPerDay; + this.outputFormat = builder.outputFormat; + this.perTopic = builder.perTopic; + } + + /** Returns a new builder with AWS us-east-1 pricing defaults. */ + public static Builder builder(String bootstrapServer) { + return new Builder(bootstrapServer); + } + + /** Builder for {@link CostEstimatorConfig}. */ + public static final class Builder { + private final String bootstrapServer; + private String configFile = null; + private double storagePricePerGib = 0.023; + private double putPricePerRequest = 0.000005; + private double getPricePerRequest = 0.0000004; + private double ebsPricePerGib = 0.08; + private int kafkaReplicationFactor = 3; + private double overprovisionFactor = 2.0; + private int estimatedPutsPerPartitionPerDay = 1440; // ~1 per minute + private int estimatedGetsPerPartitionPerDay = 7200; // ~5 per minute + private String outputFormat = "text"; + private boolean perTopic = false; + + private Builder(String bootstrapServer) { + this.bootstrapServer = bootstrapServer; + } + + public Builder configFile(String configFile) { + this.configFile = configFile; + return this; + } + + public Builder storagePricePerGib(double price) { + this.storagePricePerGib = price; + return this; + } + + public Builder putPricePerRequest(double price) { + this.putPricePerRequest = price; + return this; + } + + public Builder getPricePerRequest(double price) { + this.getPricePerRequest = price; + return this; + } + + public Builder ebsPricePerGib(double price) { + this.ebsPricePerGib = price; + return this; + } + + public Builder kafkaReplicationFactor(int factor) { + this.kafkaReplicationFactor = factor; + return this; + } + + public Builder overprovisionFactor(double factor) { + this.overprovisionFactor = factor; + return this; + } + + public Builder estimatedPutsPerPartitionPerDay(int puts) { + this.estimatedPutsPerPartitionPerDay = puts; + return this; + } + + public Builder estimatedGetsPerPartitionPerDay(int gets) { + this.estimatedGetsPerPartitionPerDay = gets; + return this; + } + + public Builder outputFormat(String format) { + this.outputFormat = format; + return this; + } + + public Builder perTopic(boolean perTopic) { + this.perTopic = perTopic; + return this; + } + + public CostEstimatorConfig build() { + return new CostEstimatorConfig(this); + } + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/cost/CostReportPrinter.java b/tools/src/main/java/org/apache/kafka/tools/automq/cost/CostReportPrinter.java new file mode 100644 index 0000000000..c84304c912 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/automq/cost/CostReportPrinter.java @@ -0,0 +1,197 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.automq.cost; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +import java.io.PrintStream; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Formats and prints a {@link CostCalculator.CostReport} to a {@link PrintStream}. + *

+ * Two output modes are supported: + *

    + *
  • text – human-readable table (default)
  • + *
  • json – machine-readable JSON object
  • + *
+ */ +public class CostReportPrinter { + + private static final String SEPARATOR = "─".repeat(60); + private static final ObjectMapper MAPPER = new ObjectMapper() + .enable(SerializationFeature.INDENT_OUTPUT); + + private final PrintStream out; + + /** + * Creates a printer that writes to the given stream. + * + * @param out the target print stream (e.g. {@code System.out}) + */ + public CostReportPrinter(PrintStream out) { + this.out = out; + } + + /** + * Prints the cost report in the format specified by {@code config.outputFormat}. + * + * @param info cluster facts used as report header context + * @param report computed cost figures + * @param config tool configuration (pricing and output format) + */ + public void print(ClusterStorageInfo info, CostCalculator.CostReport report, + CostEstimatorConfig config) { + if ("json".equalsIgnoreCase(config.outputFormat)) { + printJson(info, report, config); + } else { + printText(info, report, config); + } + } + + private void printText(ClusterStorageInfo info, CostCalculator.CostReport report, + CostEstimatorConfig config) { + out.println(); + out.println("AutoMQ S3 Cost Estimator"); + out.println(SEPARATOR); + out.printf(" Cluster ID : %s%n", info.clusterId); + out.printf(" Broker count : %d%n", info.brokerCount); + out.printf(" Topics : %d%n", info.topicCount); + out.printf(" Partitions : %d%n", info.partitionCount); + out.println(); + + out.println("Storage Breakdown"); + out.println(SEPARATOR); + out.printf(" Total data (approx.) : %s%n", formatGib(report.totalGib)); + out.printf(" S3 storage cost : %s/month (@ $%.4f/GiB)%n", + formatUsd(report.storageCostUsd), config.storagePricePerGib); + out.printf(" S3 PUT cost : %s/month (~%,d PUTs/day, $%.7f/req)%n", + formatUsd(report.putCostUsd), + (long) info.partitionCount * config.estimatedPutsPerPartitionPerDay, + config.putPricePerRequest); + out.printf(" S3 GET cost : %s/month (~%,d GETs/day, $%.7f/req)%n", + formatUsd(report.getCostUsd), + (long) info.partitionCount * config.estimatedGetsPerPartitionPerDay, + config.getPricePerRequest); + out.println(); + + out.printf(" AutoMQ Estimated Total: %s / month%n", formatUsd(report.automqTotalUsd)); + out.println(); + + out.println("Equivalent Apache Kafka (EBS) Cost"); + out.println(SEPARATOR); + out.printf(" EBS size (RF=%d, %.1fx over-provision): %s%n", + config.kafkaReplicationFactor, config.overprovisionFactor, + formatGib(report.kafkaEbsGib)); + out.printf(" EBS cost : %s/month (@ $%.4f/GiB)%n", + formatUsd(report.kafkaEbsCostUsd), config.ebsPricePerGib); + out.println(); + + out.println(SEPARATOR); + out.printf(" Savings vs Kafka : %s/month (%.1f%% cheaper)%n", + formatUsd(report.savingsUsd), report.savingsPct); + out.println(SEPARATOR); + out.println(); + + if (config.perTopic && !report.perTopicStorageCostUsd.isEmpty()) { + out.println("Per-Topic Storage Cost Breakdown"); + out.println(SEPARATOR); + out.printf(" %-45s %12s%n", "Topic", "Cost/month"); + out.printf(" %-45s %12s%n", "-".repeat(45), "-".repeat(12)); + report.perTopicStorageCostUsd.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .forEach(e -> out.printf(" %-45s %12s%n", + truncate(e.getKey(), 45), formatUsd(e.getValue()))); + out.println(); + } + + out.println("Note: Prices estimated using AWS us-east-1 defaults."); + out.println(" Storage size is approximated from WAL log-dir data."); + out.println(" Use --storage-price, --put-price, --get-price, --ebs-price to override."); + out.println(); + } + + private void printJson(ClusterStorageInfo info, CostCalculator.CostReport report, + CostEstimatorConfig config) { + Map root = new LinkedHashMap<>(); + + Map cluster = new LinkedHashMap<>(); + cluster.put("clusterId", info.clusterId); + cluster.put("brokerCount", info.brokerCount); + cluster.put("topicCount", info.topicCount); + cluster.put("partitionCount", info.partitionCount); + root.put("cluster", cluster); + + Map automq = new LinkedHashMap<>(); + automq.put("totalGib", round(report.totalGib)); + automq.put("storageCostUsd", round(report.storageCostUsd)); + automq.put("putCostUsd", round(report.putCostUsd)); + automq.put("getCostUsd", round(report.getCostUsd)); + automq.put("totalCostUsd", round(report.automqTotalUsd)); + root.put("automq", automq); + + Map kafka = new LinkedHashMap<>(); + kafka.put("ebsGib", round(report.kafkaEbsGib)); + kafka.put("ebsCostUsd", round(report.kafkaEbsCostUsd)); + kafka.put("replicationFactor", config.kafkaReplicationFactor); + kafka.put("overprovisionFactor", config.overprovisionFactor); + root.put("kafka", kafka); + + Map savings = new LinkedHashMap<>(); + savings.put("savingsUsd", round(report.savingsUsd)); + savings.put("savingsPct", round(report.savingsPct)); + root.put("savings", savings); + + if (config.perTopic && !report.perTopicStorageCostUsd.isEmpty()) { + Map perTopic = new LinkedHashMap<>(); + report.perTopicStorageCostUsd.forEach((topic, cost) -> perTopic.put(topic, round(cost))); + root.put("perTopicStorageCostUsd", perTopic); + } + + try { + out.println(MAPPER.writeValueAsString(root)); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize cost report to JSON", e); + } + } + + // --- formatting helpers --- + + private static String formatGib(double gib) { + return String.format("%.2f GiB", gib); + } + + private static String formatUsd(double usd) { + return String.format("$%,.2f", usd); + } + + private static double round(double value) { + return Math.round(value * 10000.0) / 10000.0; + } + + private static String truncate(String s, int maxLen) { + if (s.length() <= maxLen) { + return s; + } + return s.substring(0, maxLen - 1) + "…"; + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/automq/cost/CostCalculatorTest.java b/tools/src/test/java/org/apache/kafka/tools/automq/cost/CostCalculatorTest.java new file mode 100644 index 0000000000..65bbe69436 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/automq/cost/CostCalculatorTest.java @@ -0,0 +1,215 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.automq.cost; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for {@link CostCalculator}. + *

+ * Tests cover zero-data edge cases, known-value computations, savings percentage + * accuracy, and the effect of different replication and over-provision factors. + */ +@Tag("S3Unit") +public class CostCalculatorTest { + + private static final double DELTA = 0.0001; + + /** Default AWS us-east-1 pricing config for test convenience. */ + private static CostEstimatorConfig defaultConfig() { + return CostEstimatorConfig.builder("localhost:9092").build(); + } + + /** Helper: build a ClusterStorageInfo with the given byte total and 10 partitions. */ + private static ClusterStorageInfo infoWithBytes(long bytes) { + return ClusterStorageInfo.builder() + .clusterId("test-cluster") + .brokerCount(3) + .topicCount(5) + .partitionCount(10) + .totalLogBytes(bytes) + .topicBytes(Collections.emptyMap()) + .build(); + } + + // ----------------------------------------------------------------------- + // Edge cases + // ----------------------------------------------------------------------- + + @Test + public void zeroDataProducesZeroStorageCost() { + // Given a cluster with no data + ClusterStorageInfo info = infoWithBytes(0L); + CostEstimatorConfig config = defaultConfig(); + + // When costs are computed + CostCalculator.CostReport report = new CostCalculator().calculate(info, config); + + // Then storage cost must be zero; only request costs remain + assertEquals(0.0, report.storageCostUsd, DELTA); + assertEquals(0.0, report.totalGib, DELTA); + assertEquals(0.0, report.kafkaEbsGib, DELTA); + assertEquals(0.0, report.kafkaEbsCostUsd, DELTA); + } + + @Test + public void zeroDataStillIncursRequestCosts() { + // Given a cluster with 10 partitions but no persisted data + ClusterStorageInfo info = infoWithBytes(0L); + + // When costs are computed + CostCalculator.CostReport report = new CostCalculator().calculate(info, defaultConfig()); + + // Then PUT and GET costs are non-zero because partitions still generate API calls + assertTrue(report.putCostUsd > 0.0, + "PUT cost should be non-zero even with 0 data bytes"); + assertTrue(report.getCostUsd > 0.0, + "GET cost should be non-zero even with 0 data bytes"); + } + + // ----------------------------------------------------------------------- + // Known-value computations + // ----------------------------------------------------------------------- + + @Test + public void storageCalculationMatchesExpectedValue() { + // Given exactly 1 GiB of data + long oneGiB = 1024L * 1024L * 1024L; + ClusterStorageInfo info = infoWithBytes(oneGiB); + CostEstimatorConfig config = defaultConfig(); // $0.023/GiB + + CostCalculator.CostReport report = new CostCalculator().calculate(info, config); + + // Then storage cost = 1 GiB × $0.023 = $0.023 + assertEquals(1.0, report.totalGib, DELTA); + assertEquals(0.023, report.storageCostUsd, DELTA); + } + + @Test + public void putCostCalculationMatchesExpectedValue() { + // Given 10 partitions, 1440 PUTs/partition/day, 30 days, $0.000005/req + // Expected: 10 × 1440 × 30 × 0.000005 = $2.16 + ClusterStorageInfo info = infoWithBytes(0L); // storage cost is 0; only PUTs matter + CostEstimatorConfig config = defaultConfig(); + + CostCalculator.CostReport report = new CostCalculator().calculate(info, config); + + double expected = 10 * 1440 * 30 * 0.000005; + assertEquals(expected, report.putCostUsd, DELTA); + } + + @Test + public void getCostCalculationMatchesExpectedValue() { + // Given 10 partitions, 7200 GETs/partition/day, 30 days, $0.0000004/req + // Expected: 10 × 7200 × 30 × 0.0000004 = $0.864 + ClusterStorageInfo info = infoWithBytes(0L); + CostEstimatorConfig config = defaultConfig(); + + CostCalculator.CostReport report = new CostCalculator().calculate(info, config); + + double expected = 10 * 7200 * 30 * 0.0000004; + assertEquals(expected, report.getCostUsd, DELTA); + } + + @Test + public void kafkaEbsCostReflectsReplicationAndOverprovision() { + // Given 10 GiB of data, RF=3, 2x over-provision → 60 GiB EBS × $0.08 = $4.80 + long tenGiB = 10L * 1024L * 1024L * 1024L; + ClusterStorageInfo info = infoWithBytes(tenGiB); + CostEstimatorConfig config = defaultConfig(); // RF=3, factor=2.0, $0.08/GiB + + CostCalculator.CostReport report = new CostCalculator().calculate(info, config); + + assertEquals(60.0, report.kafkaEbsGib, DELTA); + assertEquals(4.80, report.kafkaEbsCostUsd, DELTA); + } + + // ----------------------------------------------------------------------- + // Savings calculation + // ----------------------------------------------------------------------- + + @Test + public void savingsArePositiveWhenAutomqIsCheaper() { + long tenGiB = 10L * 1024L * 1024L * 1024L; + ClusterStorageInfo info = infoWithBytes(tenGiB); + + CostCalculator.CostReport report = new CostCalculator().calculate(info, defaultConfig()); + + assertTrue(report.savingsUsd >= 0.0, "Savings must be non-negative"); + assertTrue(report.savingsPct >= 0.0 && report.savingsPct <= 100.0, + "Savings percentage must be in [0, 100]"); + } + + @Test + public void savingsPercentageIsZeroWhenKafkaCostIsZero() { + // Given zero data and a config with zero EBS price + ClusterStorageInfo info = infoWithBytes(0L); + CostEstimatorConfig config = CostEstimatorConfig.builder("localhost:9092") + .ebsPricePerGib(0.0) + .build(); + + CostCalculator.CostReport report = new CostCalculator().calculate(info, config); + + assertEquals(0.0, report.savingsPct, DELTA, + "Savings percentage must be 0 when Kafka EBS cost is 0"); + } + + // ----------------------------------------------------------------------- + // Replication factor variations + // ----------------------------------------------------------------------- + + @Test + public void higherReplicationFactorIncreasesKafkaCost() { + long oneGiB = 1024L * 1024L * 1024L; + ClusterStorageInfo info = infoWithBytes(oneGiB); + + CostEstimatorConfig rf3 = CostEstimatorConfig.builder("localhost:9092") + .kafkaReplicationFactor(3).overprovisionFactor(1.0).build(); + CostEstimatorConfig rf5 = CostEstimatorConfig.builder("localhost:9092") + .kafkaReplicationFactor(5).overprovisionFactor(1.0).build(); + + CostCalculator.CostReport reportRf3 = new CostCalculator().calculate(info, rf3); + CostCalculator.CostReport reportRf5 = new CostCalculator().calculate(info, rf5); + + assertTrue(reportRf5.kafkaEbsCostUsd > reportRf3.kafkaEbsCostUsd, + "Higher RF should result in higher Kafka EBS cost"); + } + + // ----------------------------------------------------------------------- + // automqTotal is sum of parts + // ----------------------------------------------------------------------- + + @Test + public void automqTotalIsExactSumOfComponents() { + long twoGiB = 2L * 1024L * 1024L * 1024L; + ClusterStorageInfo info = infoWithBytes(twoGiB); + + CostCalculator.CostReport report = new CostCalculator().calculate(info, defaultConfig()); + + double expectedTotal = report.storageCostUsd + report.putCostUsd + report.getCostUsd; + assertEquals(expectedTotal, report.automqTotalUsd, DELTA); + } +}