diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 444e2ceb9..04c7b1147 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -9,6 +9,16 @@ find_package(gRPC CONFIG REQUIRED) find_package(absl REQUIRED) find_package(OpenSSL REQUIRED) +# Thread safety annotation macros +# Map legacy names to ABSL_-prefixed names (newer abseil >= 20230125) +# so that existing code using GUARDED_BY, LOCKS_EXCLUDED, etc. continues to work. +# Function-style macros cannot be passed via -D on the command line, so we +# force-include a compatibility header that defines them instead. +add_compile_options( + "-include" + "${CMAKE_CURRENT_SOURCE_DIR}/source/base/include/thread_annotations_compat.h" +) + if(NOT EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/proto/apache/rocketmq/v2/definition.proto") message(FATAL_ERROR "Proto files not found. Run: git submodule update --init --recursive") endif() diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel index 44efb9240..5cb5c8b22 100644 --- a/cpp/examples/BUILD.bazel +++ b/cpp/examples/BUILD.bazel @@ -113,4 +113,26 @@ cc_binary( "//source/rocketmq:rocketmq_library", "@com_github_gflags_gflags//:gflags", ] +) + +cc_binary( + name = "example_lite_push_consumer", + srcs = [ + "ExampleLitePushConsumer.cpp", + ], + deps = [ + "//source/rocketmq:rocketmq_library", + "@com_github_gflags_gflags//:gflags", + ], +) + +cc_binary( + name = "example_lite_producer", + srcs = [ + "ExampleLiteProducer.cpp", + ], + deps = [ + "//source/rocketmq:rocketmq_library", + "@com_github_gflags_gflags//:gflags", + ], ) \ No newline at end of file diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index 8de1b9dfe..d860b468f 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -11,4 +11,6 @@ add_example(example_producer_with_timed_message ExampleProducerWithTimedMessage. add_example(example_producer_with_priority_message ExampleProducerWithPriorityMessage.cpp) add_example(example_producer_with_transactional_message ExampleProducerWithTransactionalMessage.cpp) add_example(example_push_consumer ExamplePushConsumer.cpp) -add_example(example_simple_consumer ExampleSimpleConsumer.cpp) \ No newline at end of file +add_example(example_simple_consumer ExampleSimpleConsumer.cpp) +add_example(example_lite_push_consumer ExampleLitePushConsumer.cpp) +add_example(example_lite_producer ExampleLiteProducer.cpp) \ No newline at end of file diff --git a/cpp/examples/ExampleLiteProducer.cpp b/cpp/examples/ExampleLiteProducer.cpp new file mode 100644 index 000000000..de8fce625 --- /dev/null +++ b/cpp/examples/ExampleLiteProducer.cpp @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include "gflags/gflags.h" +#include "rocketmq/Logger.h" +#include "rocketmq/Producer.h" + +using namespace ROCKETMQ_NAMESPACE; + +DEFINE_string(topic, "LiteTopic", "Parent topic for lite messages"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); +DEFINE_string(access_key, "", "Your access key ID"); +DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); +DEFINE_int32(message_count, 5, "Number of lite messages to send"); + +int main(int argc, char* argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + auto& logger = getLogger(); + logger.setConsoleLevel(Level::Info); + logger.setLevel(Level::Info); + logger.init(); + + std::cout << "=== Lite Producer Example ===" << std::endl; + + CredentialsProviderPtr credentials_provider; + if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { + credentials_provider = std::make_shared(FLAGS_access_key, FLAGS_access_secret); + } + + auto producer = Producer::newBuilder() + .withConfiguration(Configuration::newBuilder() + .withEndpoints(FLAGS_access_point) + .withRequestTimeout(std::chrono::seconds(3)) + .withCredentialsProvider(credentials_provider) + .withSsl(FLAGS_tls) + .build()) + .build(); + + std::cout << "Producer started successfully" << std::endl; + + // Send lite topic messages + for (int i = 1; i <= FLAGS_message_count; ++i) { + std::string lite_topic_name = "lite-topic-" + std::to_string(i); + std::string message_body = "This is a lite message for Apache RocketMQ, index=" + std::to_string(i); + + auto message = Message::newBuilder() + .withTopic(FLAGS_topic) + .withBody(message_body) + .withKeys({"key-" + std::to_string(i)}) + .withLiteTopic(lite_topic_name) + .build(); + + std::error_code ec; + auto send_receipt = producer.send(std::move(message), ec); + + if (ec) { + std::cerr << "Failed to send message " << i << ": " << ec.message() << std::endl; + } else { + std::cout << "Message " << i << " sent successfully" << std::endl; + std::cout << " - Topic: " << FLAGS_topic << std::endl; + std::cout << " - Lite Topic: " << lite_topic_name << std::endl; + std::cout << " - Message ID: " << send_receipt.message_id << std::endl; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + std::cout << "All messages sent!" << std::endl; + + return EXIT_SUCCESS; +} diff --git a/cpp/examples/ExampleLitePushConsumer.cpp b/cpp/examples/ExampleLitePushConsumer.cpp new file mode 100644 index 000000000..55047b2b1 --- /dev/null +++ b/cpp/examples/ExampleLitePushConsumer.cpp @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#include "gflags/gflags.h" +#include "rocketmq/LitePushConsumer.h" +#include "rocketmq/Logger.h" +#include "rocketmq/OffsetOption.h" + +using namespace ROCKETMQ_NAMESPACE; + +DEFINE_string(topic, "LiteTopic", "Bind topic to which lite topics belong"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); +DEFINE_string(group, "LitePushConsumer", "GroupId, created through your instance management console"); +DEFINE_string(access_key, "", "Your access key ID"); +DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); + +int main(int argc, char* argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + auto& logger = getLogger(); + logger.setConsoleLevel(Level::Info); + logger.setLevel(Level::Info); + logger.init(); + + auto listener = [](const Message& message) { + std::cout << "Received a message[topic=" << message.topic() + << ", liteTopic=" << message.liteTopic() + << ", MsgId=" << message.id() + << ", body=" << message.body() << "]" << std::endl; + return ConsumeResult::SUCCESS; + }; + + CredentialsProviderPtr credentials_provider; + if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { + credentials_provider = std::make_shared(FLAGS_access_key, FLAGS_access_secret); + } + + // Build LitePushConsumer + auto lite_consumer = LitePushConsumer::newBuilder() + .bindTopic(FLAGS_topic) + .withGroup(FLAGS_group) + .withConfiguration(Configuration::newBuilder() + .withEndpoints(FLAGS_access_point) + .withRequestTimeout(std::chrono::seconds(3)) + .withCredentialsProvider(credentials_provider) + .withSsl(FLAGS_tls) + .build()) + .withConsumeThreads(4) + .withListener(listener) + .build(); + + std::cout << "LitePushConsumer started, group=" << FLAGS_group + << ", bindTopic=" << FLAGS_topic << std::endl; + + // Subscribe to lite topics + std::error_code ec; + lite_consumer.subscribeLite("lite-topic-1", ec); + if (ec) { + std::cerr << "Failed to subscribe lite-topic-1: " << ec.message() << std::endl; + } else { + std::cout << "Subscribed to lite-topic-1" << std::endl; + } + + lite_consumer.subscribeLite("lite-topic-2", OffsetOption::lastOffset(), ec); + if (ec) { + std::cerr << "Failed to subscribe lite-topic-2: " << ec.message() << std::endl; + } else { + std::cout << "Subscribed to lite-topic-2 with LAST offset" << std::endl; + } + + // Print current lite topic set + auto topic_set = lite_consumer.getLiteTopicSet(); + std::cout << "Current lite topic set size: " << topic_set.size() << std::endl; + for (const auto& topic : topic_set) { + std::cout << " - " << topic << std::endl; + } + + // Keep running + std::this_thread::sleep_for(std::chrono::minutes(30)); + + // Shutdown + lite_consumer.shutdown(); + std::cout << "LitePushConsumer shutdown" << std::endl; + + return EXIT_SUCCESS; +} diff --git a/cpp/include/rocketmq/ErrorCode.h b/cpp/include/rocketmq/ErrorCode.h index eb1119265..a5a9496ea 100644 --- a/cpp/include/rocketmq/ErrorCode.h +++ b/cpp/include/rocketmq/ErrorCode.h @@ -86,6 +86,11 @@ enum class ErrorCode : int { IllegalFilterExpression = 40010, InvalidReceiptHandle = 40011, + /** + * @brief Format of lite topic is illegal. + */ + IllegalLiteTopic = 40020, + /** * @brief Message property conflicts with its type. */ @@ -164,6 +169,16 @@ enum class ErrorCode : int { */ TooManyRequests = 42900, + /** + * @brief Lite topic quota exceeded. + */ + LiteTopicQuotaExceeded = 42901, + + /** + * @brief Lite subscription quota exceeded. + */ + LiteSubscriptionQuotaExceeded = 42902, + /** * @brief The server is unwilling to process the request because either an * individual header field, or all the header fields collectively, are too diff --git a/cpp/include/rocketmq/LitePushConsumer.h b/cpp/include/rocketmq/LitePushConsumer.h new file mode 100644 index 000000000..bcd47e91b --- /dev/null +++ b/cpp/include/rocketmq/LitePushConsumer.h @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +#include "Configuration.h" +#include "Executor.h" +#include "FilterExpression.h" +#include "Logger.h" +#include "MessageListener.h" +#include "OffsetOption.h" + +ROCKETMQ_NAMESPACE_BEGIN + +class LitePushConsumerImpl; +class LitePushConsumerBuilder; + +/** + * @brief LitePushConsumer provides a lightweight push-based message consumption model. + * + * Unlike PushConsumer which subscribes to fixed topics at build time, LitePushConsumer + * binds to a parent topic and allows dynamic subscription/unsubscription of lite topics + * at runtime through subscribeLite() and unsubscribeLite(). + * + * Key features: + * - Dynamic lite topic subscription and unsubscription + * - Server-side quota management for lite subscriptions + * - Support for FIFO and standard consumption modes + * - Automatic re-synchronization of lite subscriptions every 30 seconds + */ +class LitePushConsumer { +public: + static LitePushConsumerBuilder newBuilder(); + + /** + * Subscribe to a lite topic with default offset (LAST). + * + * @param lite_topic Name of the lite topic to subscribe to. + * @param ec Error code set on failure (e.g., quota exceeded, network error). + */ + void subscribeLite(const std::string& lite_topic, std::error_code& ec); + + /** + * Subscribe to a lite topic with a specific offset option. + * + * @param lite_topic Name of the lite topic to subscribe to. + * @param offset_option Specifies where to start consuming. + * @param ec Error code set on failure. + */ + void subscribeLite(const std::string& lite_topic, const OffsetOption& offset_option, std::error_code& ec); + + /** + * Unsubscribe from a lite topic. + * + * @param lite_topic Name of the lite topic to unsubscribe from. + * @param ec Error code set on failure. + */ + void unsubscribeLite(const std::string& lite_topic, std::error_code& ec); + + /** + * Get the set of currently subscribed lite topics. + * + * @return Immutable copy of lite topic set. + */ + std::set getLiteTopicSet() const; + + /** + * Get the consumer group name. + * + * @return Consumer group name. + */ + std::string getConsumerGroup() const; + + /** + * Shutdown the consumer and release all resources. + */ + void shutdown(); + +private: + friend class LitePushConsumerBuilder; + + explicit LitePushConsumer(std::shared_ptr impl) + : impl_(std::move(impl)) { + } + + std::shared_ptr impl_; +}; + +/** + * @brief Builder for constructing LitePushConsumer instances. + */ +class LitePushConsumerBuilder { +public: + LitePushConsumerBuilder() : configuration_(Configuration::newBuilder().build()) {} + + /** + * Set the bind topic for the lite push consumer. + * This is the parent topic that all lite topics belong to. + * + * @param topic Parent topic name. + */ + LitePushConsumerBuilder& bindTopic(std::string topic) { + bind_topic_ = std::move(topic); + return *this; + } + + /** + * Set client configuration (endpoints, credentials, etc.). + */ + LitePushConsumerBuilder& withConfiguration(Configuration configuration) { + configuration_ = std::move(configuration); + return *this; + } + + /** + * Set the consumer group name for load balancing. + */ + LitePushConsumerBuilder& withGroup(std::string group) { + group_ = std::move(group); + return *this; + } + + /** + * Register the message listener to process received messages. + */ + LitePushConsumerBuilder& withListener(MessageListener listener) { + listener_ = std::move(listener); + return *this; + } + + /** + * Set the maximum number of messages cached locally. + * + * @param count Maximum cached message count (must be > 0). Default: 1024. + */ + LitePushConsumerBuilder& withMaxCacheMessageCount(int count) { + max_cache_message_count_ = count; + return *this; + } + + /** + * Set the maximum bytes of messages cached locally. + * + * @param bytes Maximum cached message bytes (must be > 0). Default: 64MB. + */ + LitePushConsumerBuilder& withMaxCacheMessageSizeInBytes(int bytes) { + max_cache_message_size_in_bytes_ = bytes; + return *this; + } + + /** + * Set the number of consumption threads. + * + * @param count Thread count (must be > 0). Default: 20. + */ + LitePushConsumerBuilder& withConsumeThreads(int count) { + consume_threads_ = count; + return *this; + } + + /** + * Enable FIFO consume accelerator for parallel processing by lite topic. + * May increase the probability of repeated message consumption. + */ + LitePushConsumerBuilder& withFifoConsumeAccelerator(bool enable) { + fifo_consume_accelerator_ = enable; + return *this; + } + + /** + * Build and start the LitePushConsumer. + * + * This method blocks until the consumer starts successfully. + * + * @return A started LitePushConsumer instance. + * @throws std::invalid_argument if required parameters are missing. + */ + LitePushConsumer build(); + +private: + std::string bind_topic_; + std::string group_; + Configuration configuration_; + MessageListener listener_; + int max_cache_message_count_{1024}; + int max_cache_message_size_in_bytes_{64 * 1024 * 1024}; + int consume_threads_{20}; + bool fifo_consume_accelerator_{false}; +}; + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/include/rocketmq/Message.h b/cpp/include/rocketmq/Message.h index 01bc58ed3..07fac99a9 100644 --- a/cpp/include/rocketmq/Message.h +++ b/cpp/include/rocketmq/Message.h @@ -100,6 +100,10 @@ class Message { return group_; } + const std::string& liteTopic() const { + return lite_topic_; + } + std::int32_t priority() const { return priority_; } @@ -135,6 +139,7 @@ class Message { std::string body_; std::unordered_map properties_; std::string group_; + std::string lite_topic_; std::int32_t priority_{-1}; Extension extension_; }; @@ -160,6 +165,8 @@ class MessageBuilder { MessageBuilder& withGroup(std::string group); + MessageBuilder& withLiteTopic(std::string lite_topic); + MessageBuilder& withPriority(std::int32_t priority); MessageBuilder& withProperties(std::unordered_map properties); diff --git a/cpp/include/rocketmq/OffsetOption.h b/cpp/include/rocketmq/OffsetOption.h new file mode 100644 index 000000000..d37b03e75 --- /dev/null +++ b/cpp/include/rocketmq/OffsetOption.h @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "RocketMQ.h" + +ROCKETMQ_NAMESPACE_BEGIN + +/** + * @brief Specifies where to start consuming messages when subscribing to a lite topic. + * + * OffsetOption provides multiple strategies for controlling the initial consumption offset: + * - Policy-based: LAST (resume from last position), MIN (earliest available), MAX (latest available) + * - Absolute offset: start from a specific queue offset + * - Tail N: consume the last N messages + * - Timestamp: consume messages from a specific point in time + */ +class OffsetOption { +public: + enum class Type : std::uint8_t { + POLICY = 0, + OFFSET = 1, + TAIL_N = 2, + TIMESTAMP = 3, + }; + + enum class Policy : std::int64_t { + LAST = 0, + MIN = 1, + MAX = 2, + }; + + /** + * Resume from the last consumed position. + */ + static OffsetOption lastOffset() { + return OffsetOption(Type::POLICY, static_cast(Policy::LAST)); + } + + /** + * Start from the earliest available message. + */ + static OffsetOption minOffset() { + return OffsetOption(Type::POLICY, static_cast(Policy::MIN)); + } + + /** + * Start from the latest available message. + */ + static OffsetOption maxOffset() { + return OffsetOption(Type::POLICY, static_cast(Policy::MAX)); + } + + /** + * Start from a specific queue offset. + * + * @param offset Absolute offset position (must be >= 0). + */ + static OffsetOption ofOffset(std::int64_t offset) { + if (offset < 0) { + throw std::invalid_argument("offset must be >= 0"); + } + return OffsetOption(Type::OFFSET, offset); + } + + /** + * Consume the last N messages. + * + * @param tail_n Number of trailing messages (must be >= 0). + */ + static OffsetOption ofTailN(std::int64_t tail_n) { + if (tail_n < 0) { + throw std::invalid_argument("tail_n must be >= 0"); + } + return OffsetOption(Type::TAIL_N, tail_n); + } + + /** + * Consume messages from a specific timestamp. + * + * @param timestamp Unix timestamp in milliseconds (must be >= 0). + */ + static OffsetOption ofTimestamp(std::int64_t timestamp) { + if (timestamp < 0) { + throw std::invalid_argument("timestamp must be >= 0"); + } + return OffsetOption(Type::TIMESTAMP, timestamp); + } + + Type type() const { return type_; } + std::int64_t value() const { return value_; } + + bool operator==(const OffsetOption& other) const { + return type_ == other.type_ && value_ == other.value_; + } + + bool operator!=(const OffsetOption& other) const { + return !(*this == other); + } + +private: + OffsetOption(Type type, std::int64_t value) : type_(type), value_(value) {} + + Type type_; + std::int64_t value_; +}; + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/base/CredentialsProvider.cpp b/cpp/source/base/CredentialsProvider.cpp index 5a35bf194..dfb98b856 100644 --- a/cpp/source/base/CredentialsProvider.cpp +++ b/cpp/source/base/CredentialsProvider.cpp @@ -88,7 +88,7 @@ ConfigFileCredentialsProvider::ConfigFileCredentialsProvider() { config_file_stream.read(&content[0], size); config_file_stream.close(); google::protobuf::Struct root; - google::protobuf::util::Status status = google::protobuf::util::JsonStringToMessage(content, &root); + auto status = google::protobuf::util::JsonStringToMessage(content, &root); if (status.ok()) { auto&& fields = root.fields(); if (fields.contains(ACCESS_KEY_FIELD_NAME)) { @@ -100,7 +100,7 @@ ConfigFileCredentialsProvider::ConfigFileCredentialsProvider() { } SPDLOG_DEBUG("Credentials for access_key={} loaded", access_key_); } else { - SPDLOG_WARN("Failed to parse credential JSON config file. Message: {}", status.message().data()); + SPDLOG_WARN("Failed to parse credential JSON config file. Message: {}", status.ToString()); } } else { SPDLOG_WARN("Failed to open file: {}", config_file); diff --git a/cpp/source/base/Message.cpp b/cpp/source/base/Message.cpp index 1fa18136e..04b9c9f26 100644 --- a/cpp/source/base/Message.cpp +++ b/cpp/source/base/Message.cpp @@ -16,13 +16,21 @@ */ #include "rocketmq/Message.h" +#include #include #include +#include #include "UniqueIdGenerator.h" ROCKETMQ_NAMESPACE_BEGIN +namespace { +bool isBlank(const std::string& s) { + return std::all_of(s.begin(), s.end(), [](unsigned char c) { return std::isspace(c); }); +} +} // namespace + Message::Message() { id_ = UniqueIdGenerator::instance().next(); } @@ -60,11 +68,49 @@ MessageBuilder& MessageBuilder::withBody(std::string body) { } MessageBuilder& MessageBuilder::withGroup(std::string group) { + if (message_->delivery_timestamp_.time_since_epoch().count()) { + throw std::invalid_argument("messageGroup and deliveryTimestamp should not be set at same time"); + } + if (!message_->lite_topic_.empty()) { + throw std::invalid_argument("messageGroup and liteTopic should not be set at same time"); + } + if (message_->priority_ >= 0) { + throw std::invalid_argument("messageGroup and priority should not be set at same time"); + } + if (group.empty() || isBlank(group)) { + throw std::invalid_argument("messageGroup should not be blank"); + } message_->group_.swap(group); return *this; } +MessageBuilder& MessageBuilder::withLiteTopic(std::string lite_topic) { + if (message_->delivery_timestamp_.time_since_epoch().count()) { + throw std::invalid_argument("liteTopic and deliveryTimestamp should not be set at same time"); + } + if (!message_->group_.empty()) { + throw std::invalid_argument("liteTopic and messageGroup should not be set at same time"); + } + if (message_->priority_ >= 0) { + throw std::invalid_argument("liteTopic and priority should not be set at same time"); + } + if (lite_topic.empty() || isBlank(lite_topic)) { + throw std::invalid_argument("liteTopic should not be blank"); + } + message_->lite_topic_.swap(lite_topic); + return *this; +} + MessageBuilder& MessageBuilder::withPriority(std::int32_t priority) { + if (message_->delivery_timestamp_.time_since_epoch().count()) { + throw std::invalid_argument("priority and deliveryTimestamp should not be set at same time"); + } + if (!message_->group_.empty()) { + throw std::invalid_argument("priority and messageGroup should not be set at same time"); + } + if (!message_->lite_topic_.empty()) { + throw std::invalid_argument("priority and liteTopic should not be set at same time"); + } message_->priority_ = priority; return *this; } @@ -75,6 +121,15 @@ MessageBuilder& MessageBuilder::withProperties(std::unordered_mapgroup_.empty()) { + throw std::invalid_argument("deliveryTimestamp and messageGroup should not be set at same time"); + } + if (!message_->lite_topic_.empty()) { + throw std::invalid_argument("deliveryTimestamp and liteTopic should not be set at same time"); + } + if (message_->priority_ >= 0) { + throw std::invalid_argument("deliveryTimestamp and priority should not be set at same time"); + } message_->delivery_timestamp_ = delivery_timepoint; return *this; } diff --git a/cpp/source/base/include/Protocol.h b/cpp/source/base/include/Protocol.h index 2c11e70d0..44d707d90 100644 --- a/cpp/source/base/include/Protocol.h +++ b/cpp/source/base/include/Protocol.h @@ -58,6 +58,10 @@ using ForwardMessageToDeadLetterQueueRequest = rmq::ForwardMessageToDeadLetterQu using ForwardMessageToDeadLetterQueueResponse = rmq::ForwardMessageToDeadLetterQueueResponse; using NotifyClientTerminationRequest = rmq::NotifyClientTerminationRequest; using NotifyClientTerminationResponse = rmq::NotifyClientTerminationResponse; +using SyncLiteSubscriptionRequest = rmq::SyncLiteSubscriptionRequest; +using SyncLiteSubscriptionResponse = rmq::SyncLiteSubscriptionResponse; +using NotifyUnsubscribeLiteCommand = rmq::NotifyUnsubscribeLiteCommand; +using LiteSubscriptionAction = rmq::LiteSubscriptionAction; const char* protocolVersion(); diff --git a/cpp/source/base/include/thread_annotations_compat.h b/cpp/source/base/include/thread_annotations_compat.h new file mode 100644 index 000000000..791acc2cc --- /dev/null +++ b/cpp/source/base/include/thread_annotations_compat.h @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +// Self-contained thread safety annotation macros. +// +// Defines ABSL_* macros directly via Clang __attribute__ (no dependency on +// absl/base/thread_annotations.h), then maps legacy names used throughout +// the codebase to the ABSL_-prefixed equivalents. +// +// This file is force-included into every translation unit via the compiler +// -include option (configured in CMakeLists.txt). + +// --------------------------------------------------------------------------- +// Step 1: Define ABSL_* macros using Clang thread-safety attributes. +// Non-Clang compilers get empty expansions. +// --------------------------------------------------------------------------- +#if defined(__clang__) && (!defined(SWIG)) + +#define ABSL_GUARDED_BY(x) __attribute__((guarded_by(x))) +#define ABSL_PT_GUARDED_BY(x) __attribute__((pt_guarded_by(x))) +#define ABSL_LOCKS_EXCLUDED(...) __attribute__((locks_excluded(__VA_ARGS__))) +#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...) __attribute__((exclusive_locks_required(__VA_ARGS__))) +#define ABSL_SHARED_LOCKS_REQUIRED(...) __attribute__((shared_locks_required(__VA_ARGS__))) +#define ABSL_LOCK_RETURNED(x) __attribute__((lock_returned(x))) +#define ABSL_LOCKS_SHARED __attribute__((shared_lock_function)) +#define ABSL_LOCKS_EXCLUSIVE __attribute__((exclusive_lock_function)) +#define ABSL_ACQUIRED_BEFORE(...) __attribute__((acquired_before(__VA_ARGS__))) +#define ABSL_ACQUIRED_AFTER(...) __attribute__((acquired_after(__VA_ARGS__))) +#define ABSL_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis)) + +#define ABSL_ASSERT_EXCLUSIVE_LOCK(...) __attribute__((assert_exclusive_lock(__VA_ARGS__))) +#define ABSL_ASSERT_SHARED_LOCK(...) __attribute__((assert_shared_lock(__VA_ARGS__))) +#define ABSL_EXCLUSIVE_LOCK_FUNCTION(...) __attribute__((exclusive_lock_function(__VA_ARGS__))) +#define ABSL_SHARED_LOCK_FUNCTION(...) __attribute__((shared_lock_function(__VA_ARGS__))) +#define ABSL_UNLOCK_FUNCTION(...) __attribute__((unlock_function(__VA_ARGS__))) +#define ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(...) __attribute__((exclusive_trylock_function(__VA_ARGS__))) +#define ABSL_SHARED_TRYLOCK_FUNCTION(...) __attribute__((shared_trylock_function(__VA_ARGS__))) +#define ABSL_SCOPED_LOCKABLE __attribute__((scoped_lockable)) + +#else // Non-Clang: all annotations expand to nothing. + +#define ABSL_GUARDED_BY(x) +#define ABSL_PT_GUARDED_BY(x) +#define ABSL_LOCKS_EXCLUDED(...) +#define ABSL_EXCLUSIVE_LOCKS_REQUIRED(...) +#define ABSL_SHARED_LOCKS_REQUIRED(...) +#define ABSL_LOCK_RETURNED(x) +#define ABSL_LOCKS_SHARED +#define ABSL_LOCKS_EXCLUSIVE +#define ABSL_ACQUIRED_BEFORE(...) +#define ABSL_ACQUIRED_AFTER(...) +#define ABSL_NO_THREAD_SAFETY_ANALYSIS + +#define ABSL_ASSERT_EXCLUSIVE_LOCK(...) +#define ABSL_ASSERT_SHARED_LOCK(...) +#define ABSL_EXCLUSIVE_LOCK_FUNCTION(...) +#define ABSL_SHARED_LOCK_FUNCTION(...) +#define ABSL_UNLOCK_FUNCTION(...) +#define ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(...) +#define ABSL_SHARED_TRYLOCK_FUNCTION(...) +#define ABSL_SCOPED_LOCKABLE + +#endif // __clang__ + +// --------------------------------------------------------------------------- +// Step 2: Map legacy names (used in the codebase) to ABSL_-prefixed macros. +// --------------------------------------------------------------------------- +#ifndef GUARDED_BY +#define GUARDED_BY(x) ABSL_GUARDED_BY(x) +#endif + +#ifndef PT_GUARDED_BY +#define PT_GUARDED_BY(x) ABSL_PT_GUARDED_BY(x) +#endif + +#ifndef LOCKS_EXCLUDED +#define LOCKS_EXCLUDED(...) ABSL_LOCKS_EXCLUDED(__VA_ARGS__) +#endif + +#ifndef EXCLUSIVE_LOCKS_REQUIRED +#define EXCLUSIVE_LOCKS_REQUIRED(...) ABSL_EXCLUSIVE_LOCKS_REQUIRED(__VA_ARGS__) +#endif + +#ifndef SHARED_LOCKS_REQUIRED +#define SHARED_LOCKS_REQUIRED(...) ABSL_SHARED_LOCKS_REQUIRED(__VA_ARGS__) +#endif + +#ifndef LOCK_RETURNED +#define LOCK_RETURNED(x) ABSL_LOCK_RETURNED(x) +#endif + +#ifndef LOCKS_SHARED +#define LOCKS_SHARED ABSL_LOCKS_SHARED +#endif + +#ifndef LOCKS_EXCLUSIVE +#define LOCKS_EXCLUSIVE ABSL_LOCKS_EXCLUSIVE +#endif + +#ifndef ACQUIRED_BEFORE +#define ACQUIRED_BEFORE(...) ABSL_ACQUIRED_BEFORE(__VA_ARGS__) +#endif + +#ifndef ACQUIRED_AFTER +#define ACQUIRED_AFTER(...) ABSL_ACQUIRED_AFTER(__VA_ARGS__) +#endif + +#ifndef NO_THREAD_SAFETY_ANALYSIS +#define NO_THREAD_SAFETY_ANALYSIS ABSL_NO_THREAD_SAFETY_ANALYSIS +#endif + +#ifndef ASSERT_EXCLUSIVE_LOCK +#define ASSERT_EXCLUSIVE_LOCK(...) ABSL_ASSERT_EXCLUSIVE_LOCK(__VA_ARGS__) +#endif + +#ifndef ASSERT_SHARED_LOCK +#define ASSERT_SHARED_LOCK(...) ABSL_ASSERT_SHARED_LOCK(__VA_ARGS__) +#endif + +#ifndef EXCLUSIVE_LOCK_FUNCTION +#define EXCLUSIVE_LOCK_FUNCTION(...) ABSL_EXCLUSIVE_LOCK_FUNCTION(__VA_ARGS__) +#endif + +#ifndef SHARED_LOCK_FUNCTION +#define SHARED_LOCK_FUNCTION(...) ABSL_SHARED_LOCK_FUNCTION(__VA_ARGS__) +#endif + +#ifndef UNLOCK_FUNCTION +#define UNLOCK_FUNCTION(...) ABSL_UNLOCK_FUNCTION(__VA_ARGS__) +#endif + +#ifndef EXCLUSIVE_TRYLOCK_FUNCTION +#define EXCLUSIVE_TRYLOCK_FUNCTION(...) ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(__VA_ARGS__) +#endif + +#ifndef SHARED_TRYLOCK_FUNCTION +#define SHARED_TRYLOCK_FUNCTION(...) ABSL_SHARED_TRYLOCK_FUNCTION(__VA_ARGS__) +#endif + +#ifndef SCOPED_LOCKABLE +#define SCOPED_LOCKABLE ABSL_SCOPED_LOCKABLE +#endif diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index 0441c8f23..845a17d3b 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -596,6 +596,19 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, case rmq::Code::OK: { std::vector message_queues; for (const auto& item : invocation_context->response.message_queues()) { + // Filter out MessageQueue whose broker endpoint has no valid host + bool has_valid_host = false; + for (const auto& addr : item.broker().endpoints().addresses()) { + if (!addr.host().empty()) { + has_valid_host = true; + break; + } + } + if (!has_valid_host) { + SPDLOG_WARN("Filtered out MessageQueue with empty broker host: topic={}, queueId={}, brokerName={}", + item.topic().name(), item.id(), item.broker().name()); + continue; + } message_queues.push_back(item); } auto ptr = std::make_shared(std::move(message_queues)); @@ -819,6 +832,11 @@ MessageConstSharedPtr ClientManagerImpl::wrapMessage(const rmq::Message& item) { builder.withPriority(system_properties.priority()); } + // Lite Topic + if (system_properties.has_lite_topic()) { + builder.withLiteTopic(system_properties.lite_topic()); + } + // Message-Id const auto& message_id = system_properties.message_id(); builder.withId(message_id); @@ -1589,4 +1607,90 @@ void ClientManagerImpl::submit(std::function task) { const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task"; const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task"; +void ClientManagerImpl::syncLiteSubscription( + const std::string& target_host, + const Metadata& metadata, + const SyncLiteSubscriptionRequest& request, + std::chrono::milliseconds timeout, + const std::function& cb) { + + SPDLOG_DEBUG("SyncLiteSubscription Request: {}", request.ShortDebugString()); + + RpcClientSharedPtr client = getRpcClient(target_host); + if (!client) { + SPDLOG_WARN("No RPC client for {}", target_host); + SyncLiteSubscriptionResponse response; + std::error_code ec = ErrorCode::BadRequest; + cb(ec, response); + return; + } + + auto invocation_context = new InvocationContext(); + invocation_context->task_name = fmt::format("SyncLiteSubscription, topic={}, group={}, action={} to {}", + request.topic().name(), request.group().name(), + static_cast(request.action()), target_host); + invocation_context->remote_address = target_host; + for (const auto& item : metadata) { + invocation_context->context.AddMetadata(item.first, item.second); + } + invocation_context->context.set_deadline(std::chrono::system_clock::now() + timeout); + + auto callback = + [target_host, cb](const InvocationContext* invocation_context) { + + std::error_code ec; + if (!invocation_context->status.ok()) { + SPDLOG_WARN("Failed to SyncLiteSubscription. gRPC-code: {}, gRPC-message: {}, host={}", + invocation_context->status.error_code(), invocation_context->status.error_message(), + invocation_context->remote_address); + ec = ErrorCode::BadRequest; + cb(ec, invocation_context->response); + return; + } + + auto&& status = invocation_context->response.status(); + switch (status.code()) { + case rmq::Code::OK: { + SPDLOG_DEBUG("SyncLiteSubscription OK. host={}", target_host); + break; + } + + case rmq::Code::LITE_SUBSCRIPTION_QUOTA_EXCEEDED: { + SPDLOG_WARN("LiteSubscriptionQuotaExceeded: {}, host={}", status.message(), target_host); + ec = ErrorCode::NotSupported; // Reuse NotSupported; can add specific code if needed + break; + } + + case rmq::Code::UNAUTHORIZED: { + SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), target_host); + ec = ErrorCode::Unauthorized; + break; + } + + case rmq::Code::FORBIDDEN: { + SPDLOG_WARN("Forbidden: {}, host={}", status.message(), target_host); + ec = ErrorCode::Forbidden; + break; + } + + case rmq::Code::INTERNAL_SERVER_ERROR: { + SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), target_host); + ec = ErrorCode::InternalServerError; + break; + } + + default: { + SPDLOG_WARN("SyncLiteSubscription failed with code: {}, message: {}, host={}", + static_cast(status.code()), status.message(), target_host); + ec = ErrorCode::NotSupported; + break; + } + } + cb(ec, invocation_context->response); + }; + + invocation_context->callback = callback; + client->asyncSyncLiteSubscription(request, invocation_context); +} + ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/client/RpcClientImpl.cpp b/cpp/source/client/RpcClientImpl.cpp index 94e2c3ad5..70e934d8d 100644 --- a/cpp/source/client/RpcClientImpl.cpp +++ b/cpp/source/client/RpcClientImpl.cpp @@ -129,6 +129,13 @@ void RpcClientImpl::asyncRecallMessage(const RecallMessageRequest& request, stub_->async()->RecallMessage(&invocation_context->context, &request, &invocation_context->response, callback); } +void RpcClientImpl::asyncSyncLiteSubscription(const SyncLiteSubscriptionRequest& request, + InvocationContext* invocation_context) { + std::weak_ptr rpc_client(shared_from_this()); + auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, invocation_context, std::placeholders::_1); + stub_->async()->SyncLiteSubscription(&invocation_context->context, &request, &invocation_context->response, callback); +} + bool RpcClientImpl::ok() const { return channel_ && grpc_connectivity_state::GRPC_CHANNEL_SHUTDOWN != channel_->GetState(false); } diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index a73efaddd..7e63d52a6 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -164,6 +164,13 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { break; } + case rmq::TelemetryCommand::kNotifyUnsubscribeLiteCommand: { + const auto& lite_topic = read_.notify_unsubscribe_lite_command().lite_topic(); + SPDLOG_INFO("Received NotifyUnsubscribeLiteCommand from {}, liteTopic={}", peer_address_, lite_topic); + client->onNotifyUnsubscribeLiteCommand(lite_topic); + break; + } + default: { SPDLOG_WARN("Telemetry command receive unsupported command"); break; @@ -267,6 +274,14 @@ void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout()); client->config().subscriber.polling_timeout = absl::Milliseconds(polling_timeout); client->config().subscriber.receive_batch_size = settings.subscription().receive_batch_size(); + + // Lite push consumer specific settings + if (settings.subscription().has_lite_subscription_quota()) { + client->config().subscriber.lite_subscription_quota = settings.subscription().lite_subscription_quota(); + } + if (settings.subscription().has_max_lite_topic_size()) { + client->config().subscriber.max_lite_topic_size = settings.subscription().max_lite_topic_size(); + } } void TelemetryBidiReactor::write(TelemetryCommand command) { diff --git a/cpp/source/client/include/Client.h b/cpp/source/client/include/Client.h index db4eb09f3..6e317d848 100644 --- a/cpp/source/client/include/Client.h +++ b/cpp/source/client/include/Client.h @@ -58,6 +58,8 @@ class Client { virtual void notifyClientTermination() = 0; + virtual void onNotifyUnsubscribeLiteCommand(const std::string& lite_topic) {} + virtual void withCredentialsProvider(std::shared_ptr credentials_provider) = 0; virtual std::shared_ptr manager() const = 0; diff --git a/cpp/source/client/include/ClientConfig.h b/cpp/source/client/include/ClientConfig.h index dddcc4bed..e975af6e1 100644 --- a/cpp/source/client/include/ClientConfig.h +++ b/cpp/source/client/include/ClientConfig.h @@ -42,6 +42,9 @@ struct SubscriberConfig { bool fifo_consume_accelerator{false}; std::uint32_t receive_batch_size{32}; absl::Duration polling_timeout{absl::Seconds(30)}; + // Lite push consumer specific fields + std::int32_t lite_subscription_quota{0}; + std::int32_t max_lite_topic_size{64}; }; struct Metric { diff --git a/cpp/source/client/include/ClientManager.h b/cpp/source/client/include/ClientManager.h index d6e20ace8..9be781bc2 100644 --- a/cpp/source/client/include/ClientManager.h +++ b/cpp/source/client/include/ClientManager.h @@ -106,6 +106,12 @@ class ClientManager { virtual State state() const = 0; virtual void submit(std::function task) = 0; + + virtual void syncLiteSubscription(const std::string& target_host, + const Metadata& metadata, + const SyncLiteSubscriptionRequest& request, + std::chrono::milliseconds timeout, + const std::function& cb) = 0; }; using ClientManagerPtr = std::shared_ptr; diff --git a/cpp/source/client/include/ClientManagerImpl.h b/cpp/source/client/include/ClientManagerImpl.h index 08b5afa8d..b98dac0b0 100644 --- a/cpp/source/client/include/ClientManagerImpl.h +++ b/cpp/source/client/include/ClientManagerImpl.h @@ -201,6 +201,12 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share void submit(std::function task) override; + void syncLiteSubscription(const std::string& target_host, + const Metadata& metadata, + const SyncLiteSubscriptionRequest& request, + std::chrono::milliseconds timeout, + const std::function& cb) override; + private: void doHeartbeat(); diff --git a/cpp/source/client/include/RpcClient.h b/cpp/source/client/include/RpcClient.h index ff9269570..a75f57b82 100644 --- a/cpp/source/client/include/RpcClient.h +++ b/cpp/source/client/include/RpcClient.h @@ -75,6 +75,9 @@ class RpcClient { virtual void asyncRecallMessage(const RecallMessageRequest& request, InvocationContext* invocation_context) = 0; + virtual void asyncSyncLiteSubscription(const SyncLiteSubscriptionRequest& request, + InvocationContext* invocation_context) = 0; + virtual std::shared_ptr asyncTelemetry(std::weak_ptr client) = 0; virtual void asyncForwardMessageToDeadLetterQueue( diff --git a/cpp/source/client/include/RpcClientImpl.h b/cpp/source/client/include/RpcClientImpl.h index 9d4483e1b..f771d5ac8 100644 --- a/cpp/source/client/include/RpcClientImpl.h +++ b/cpp/source/client/include/RpcClientImpl.h @@ -69,6 +69,9 @@ class RpcClientImpl : public RpcClient, public std::enable_shared_from_this* invocation_context) override; + void asyncSyncLiteSubscription(const SyncLiteSubscriptionRequest& request, + InvocationContext* invocation_context) override; + std::shared_ptr asyncTelemetry(std::weak_ptr client) override; void asyncForwardMessageToDeadLetterQueue( diff --git a/cpp/source/client/mocks/include/ClientManagerMock.h b/cpp/source/client/mocks/include/ClientManagerMock.h index d255b901b..a0e057620 100644 --- a/cpp/source/client/mocks/include/ClientManagerMock.h +++ b/cpp/source/client/mocks/include/ClientManagerMock.h @@ -35,6 +35,8 @@ class ClientManagerMock : public ClientManager { MOCK_METHOD(SchedulerSharedPtr, getScheduler, (), (override)); + MOCK_METHOD(std::shared_ptr, getRpcClient, (const std::string&, bool), (override)); + MOCK_METHOD((std::shared_ptr), createChannel, (const std::string&), (override)); MOCK_METHOD(void, resolveRoute, @@ -47,10 +49,7 @@ class ClientManagerMock : public ClientManager { (const std::function&)), (override)); - MOCK_METHOD(std::shared_ptr, telemetry, (const std::string&, std::weak_ptr), - (override)); - - MOCK_METHOD(bool, wrapMessage, (const rmq::Message&, MessageExt&), (override)); + MOCK_METHOD(MessageConstSharedPtr, wrapMessage, (const rmq::Message&), (override)); MOCK_METHOD(void, ack, (const std::string&, const Metadata&, const AckMessageRequest&, std::chrono::milliseconds, @@ -59,13 +58,18 @@ class ClientManagerMock : public ClientManager { MOCK_METHOD(void, changeInvisibleDuration, (const std::string&, const Metadata&, const ChangeInvisibleDurationRequest&, std::chrono::milliseconds, - (const std::function&)), + (const std::function&)), (override)); MOCK_METHOD(void, forwardMessageToDeadLetterQueue, (const std::string&, const Metadata&, const ForwardMessageToDeadLetterQueueRequest&, std::chrono::milliseconds, - (const std::function*)>&)), + (const std::function&)), + (override)); + + MOCK_METHOD(void, recallMessage, + (const std::string&, const Metadata&, const RecallMessageRequest&, std::chrono::milliseconds, + (const std::function&)), (override)); MOCK_METHOD(void, endTransaction, @@ -85,7 +89,7 @@ class ClientManagerMock : public ClientManager { ReceiveMessageCallback), (override)); - MOCK_METHOD(bool, send, (const std::string&, const Metadata&, SendMessageRequest&, SendCallback), (override)); + MOCK_METHOD(bool, send, (const std::string&, const Metadata&, SendMessageRequest&, SendResultCallback), (override)); MOCK_METHOD(std::error_code, notifyClientTermination, (const std::string&, const Metadata&, const NotifyClientTerminationRequest&, std::chrono::milliseconds), @@ -94,6 +98,11 @@ class ClientManagerMock : public ClientManager { MOCK_METHOD(State, state, (), (const override)); MOCK_METHOD(void, submit, (std::function), (override)); + + MOCK_METHOD(void, syncLiteSubscription, + (const std::string&, const Metadata&, const SyncLiteSubscriptionRequest&, std::chrono::milliseconds, + (const std::function&)), + (override)); }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/mocks/include/ClientMock.h b/cpp/source/client/mocks/include/ClientMock.h index 7fc39f200..21f72e951 100644 --- a/cpp/source/client/mocks/include/ClientMock.h +++ b/cpp/source/client/mocks/include/ClientMock.h @@ -45,6 +45,8 @@ class ClientMock : virtual public Client { MOCK_METHOD(void, notifyClientTermination, (), (override)); + MOCK_METHOD(void, onNotifyUnsubscribeLiteCommand, (const std::string&), (override)); + MOCK_METHOD(void, verify, (MessageConstSharedPtr, (std::function)), (override)); MOCK_METHOD(void, recoverOrphanedTransaction, (MessageConstSharedPtr), (override)); diff --git a/cpp/source/client/mocks/include/RpcClientMock.h b/cpp/source/client/mocks/include/RpcClientMock.h index 898e8453d..178a7c432 100644 --- a/cpp/source/client/mocks/include/RpcClientMock.h +++ b/cpp/source/client/mocks/include/RpcClientMock.h @@ -64,6 +64,9 @@ class RpcClientMock : public RpcClient { InvocationContext*), (override)); + MOCK_METHOD(void, asyncSyncLiteSubscription, + (const SyncLiteSubscriptionRequest&, InvocationContext*), (override)); + MOCK_METHOD(std::shared_ptr, asyncTelemetry, (std::weak_ptr), (override)); MOCK_METHOD(grpc::Status, notifyClientTermination, diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp b/cpp/source/rocketmq/FifoProducerPartition.cpp index 67693f0d5..2df8f57fd 100644 --- a/cpp/source/rocketmq/FifoProducerPartition.cpp +++ b/cpp/source/rocketmq/FifoProducerPartition.cpp @@ -16,8 +16,6 @@ */ #include "FifoProducerPartition.h" -#include "absl/synchronization/mutex.h" - #include #include #include diff --git a/cpp/source/rocketmq/LitePushConsumer.cpp b/cpp/source/rocketmq/LitePushConsumer.cpp new file mode 100644 index 000000000..fb1af8ab1 --- /dev/null +++ b/cpp/source/rocketmq/LitePushConsumer.cpp @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "rocketmq/LitePushConsumer.h" + +#include +#include +#include + +#include "LitePushConsumerImpl.h" +#include "StaticNameServerResolver.h" +#include "rocketmq/Configuration.h" +#include "rocketmq/OffsetOption.h" + +ROCKETMQ_NAMESPACE_BEGIN + +LitePushConsumerBuilder LitePushConsumer::newBuilder() { + return {}; +} + +void LitePushConsumer::subscribeLite(const std::string& lite_topic, std::error_code& ec) { + impl_->subscribeLite(lite_topic, nullptr, ec); +} + +void LitePushConsumer::subscribeLite(const std::string& lite_topic, + const OffsetOption& offset_option, + std::error_code& ec) { + impl_->subscribeLite(lite_topic, &offset_option, ec); +} + +void LitePushConsumer::unsubscribeLite(const std::string& lite_topic, std::error_code& ec) { + impl_->unsubscribeLite(lite_topic, ec); +} + +std::set LitePushConsumer::getLiteTopicSet() const { + return impl_->getLiteTopicSet(); +} + +std::string LitePushConsumer::getConsumerGroup() const { + return impl_->groupName(); +} + +void LitePushConsumer::shutdown() { + impl_->shutdown(); +} + +LitePushConsumer LitePushConsumerBuilder::build() { + if (bind_topic_.empty()) { + throw std::invalid_argument("bindTopic has not been set yet"); + } + if (group_.empty()) { + throw std::invalid_argument("group has not been set yet"); + } + if (!listener_) { + throw std::invalid_argument("messageListener has not been set yet"); + } + + auto impl = std::make_shared(group_, bind_topic_); + + // Register message listener + impl->registerMessageListener(listener_); + + // Configure consumer parameters + impl->consumeThreadPoolSize(consume_threads_); + + // Set name server resolver + impl->withNameServerResolver(std::make_shared(configuration_.endpoints())); + + // Set resource namespace + impl->withResourceNamespace(configuration_.resourceNamespace()); + + // Set credentials provider + impl->withCredentialsProvider(configuration_.credentialsProvider()); + + // Set request timeout + impl->withRequestTimeout(configuration_.requestTimeout()); + + // Set FIFO consume accelerator + impl->withFifoConsumeAccelerator(fifo_consume_accelerator_); + + // Set callback threads + impl->withCallbackThreads(configuration_.callbackThreads()); + + // Set SSL + impl->withSsl(configuration_.withSsl()); + + // Start the consumer + impl->start(); + + return LitePushConsumer(impl); +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/LitePushConsumerImpl.cpp b/cpp/source/rocketmq/LitePushConsumerImpl.cpp new file mode 100644 index 000000000..951e6f1d9 --- /dev/null +++ b/cpp/source/rocketmq/LitePushConsumerImpl.cpp @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "LitePushConsumerImpl.h" + +#include +#include +#include +#include +#include + +#include "MixAll.h" +#include "Protocol.h" +#include "rocketmq/ErrorCode.h" +#include "rocketmq/OffsetOption.h" +#include "spdlog/spdlog.h" + +ROCKETMQ_NAMESPACE_BEGIN + +LitePushConsumerImpl::LitePushConsumerImpl(absl::string_view group_name, const std::string& bind_topic) + : ClientImpl(group_name), PushConsumerImpl(group_name), bind_topic_(bind_topic) { + // Subscribe to bind topic with SUB_ALL expression + subscribe(bind_topic_, "*", ExpressionType::TAG); +} + +LitePushConsumerImpl::~LitePushConsumerImpl() { + shutdown(); +} + +void LitePushConsumerImpl::start() { + // Call parent start + PushConsumerImpl::start(); + + // Create LiteSubscriptionManager lazily here (client_manager_ is ready after parent start) + lite_subscription_manager_.reset(new LiteSubscriptionManager( + client_manager_, client_config_, bind_topic_, client_config_.subscriber.group)); + + // Set endpoints provider so LiteSubscriptionManager can get current route endpoints + lite_subscription_manager_->setEndpointsProvider([this]() -> absl::flat_hash_set { + return getCurrentEndpoints(); + }); + + // Start LiteSubscriptionManager (initial full sync + periodic sync) + lite_subscription_manager_->startUp(); + + SPDLOG_INFO("LitePushConsumer started, group={}, bindTopic={}", + client_config_.subscriber.group.name(), bind_topic_); +} + +void LitePushConsumerImpl::shutdown() { + State expecting = State::STARTED; + if (state_.compare_exchange_strong(expecting, State::STOPPING)) { + // Stop LiteSubscriptionManager first + if (lite_subscription_manager_) { + lite_subscription_manager_->shutdown(); + } + + // Then call parent shutdown (this will transition state to STOPPED) + // Note: PushConsumerImpl::shutdown() expects STARTED state, but we already + // changed to STOPPING, so we call ClientImpl::shutdown() directly. + ClientImpl::shutdown(); + + SPDLOG_INFO("LitePushConsumer stopped, group={}, bindTopic={}", + client_config_.subscriber.group.name(), bind_topic_); + } +} + +void LitePushConsumerImpl::buildClientSettings(rmq::Settings& settings) { + // Use LITE_PUSH_CONSUMER type instead of PUSH_CONSUMER + settings.set_client_type(rmq::ClientType::LITE_PUSH_CONSUMER); + + auto subscription = settings.mutable_subscription(); + subscription->mutable_group()->CopyFrom(client_config_.subscriber.group); + + auto polling_timeout_ms = absl::ToInt64Milliseconds(client_config_.subscriber.polling_timeout); + subscription->mutable_long_polling_timeout()->set_seconds(polling_timeout_ms / 1000); + subscription->mutable_long_polling_timeout()->set_nanos((polling_timeout_ms % 1000) * 1000000); + subscription->set_receive_batch_size(client_config_.subscriber.receive_batch_size); + + // Add bind topic subscription + { + absl::MutexLock lk(&topic_filter_expression_table_mtx_); + for (const auto& entry : topic_filter_expression_table_) { + auto subscription_entry = new rmq::SubscriptionEntry; + subscription_entry->mutable_topic()->set_resource_namespace(resourceNamespace()); + subscription_entry->mutable_topic()->set_name(entry.first); + + subscription_entry->mutable_expression()->set_expression(entry.second.content_); + switch (entry.second.type_) { + case ExpressionType::TAG: { + subscription_entry->mutable_expression()->set_type(rmq::FilterType::TAG); + break; + } + case ExpressionType::SQL92: { + subscription_entry->mutable_expression()->set_type(rmq::FilterType::SQL); + break; + } + } + subscription->mutable_subscriptions()->AddAllocated(subscription_entry); + } + } +} + +void LitePushConsumerImpl::prepareHeartbeatData(HeartbeatRequest& request) { + // Use LITE_PUSH_CONSUMER type + request.set_client_type(rmq::ClientType::LITE_PUSH_CONSUMER); + request.mutable_group()->CopyFrom(client_config_.subscriber.group); +} + +void LitePushConsumerImpl::subscribeLite(const std::string& lite_topic, + const OffsetOption* offset_option, + std::error_code& ec) { + if (!lite_subscription_manager_) { + SPDLOG_ERROR("LiteSubscriptionManager is not initialized"); + ec = ErrorCode::IllegalState; + return; + } + lite_subscription_manager_->subscribeLite(lite_topic, offset_option, ec); +} + +void LitePushConsumerImpl::unsubscribeLite(const std::string& lite_topic, std::error_code& ec) { + if (!lite_subscription_manager_) { + SPDLOG_ERROR("LiteSubscriptionManager is not initialized"); + ec = ErrorCode::IllegalState; + return; + } + lite_subscription_manager_->unsubscribeLite(lite_topic, ec); +} + +std::set LitePushConsumerImpl::getLiteTopicSet() const { + if (!lite_subscription_manager_) { + return {}; + } + return lite_subscription_manager_->getLiteTopicSet(); +} + +void LitePushConsumerImpl::onNotifyUnsubscribeLiteCommand(const std::string& lite_topic) { + SPDLOG_INFO("LitePushConsumer received NotifyUnsubscribeLiteCommand: liteTopic={}, group={}, bindTopic={}", + lite_topic, client_config_.subscriber.group.name(), bind_topic_); + if (lite_subscription_manager_) { + lite_subscription_manager_->onNotifyUnsubscribeLiteCommand(lite_topic); + } +} + +void LitePushConsumerImpl::onSettingsUpdate(const rmq::Settings& settings) { + if (lite_subscription_manager_) { + lite_subscription_manager_->syncSettings(settings); + } +} + +absl::flat_hash_set LitePushConsumerImpl::getCurrentEndpoints() { + absl::flat_hash_set endpoints; + endpointsInUse(endpoints); + return endpoints; +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/LiteSubscriptionManager.cpp b/cpp/source/rocketmq/LiteSubscriptionManager.cpp new file mode 100644 index 000000000..e93d50653 --- /dev/null +++ b/cpp/source/rocketmq/LiteSubscriptionManager.cpp @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "LiteSubscriptionManager.h" + +#include +#include +#include +#include +#include + +#include "MixAll.h" +#include "Protocol.h" +#include "rocketmq/ErrorCode.h" +#include "rocketmq/OffsetOption.h" +#include "spdlog/spdlog.h" + +ROCKETMQ_NAMESPACE_BEGIN + +const char* LiteSubscriptionManager::SYNC_TASK_NAME = "lite-subscription-sync-task"; + +LiteSubscriptionManager::LiteSubscriptionManager(std::shared_ptr client_manager, + ClientConfig& client_config, + const std::string& bind_topic, + const rmq::Resource& group) + : client_manager_(std::move(client_manager)), + client_config_(client_config), + bind_topic_(bind_topic), + group_(group) { + // Sync quota from client config (may have been set from server telemetry) + lite_subscription_quota_ = client_config_.subscriber.lite_subscription_quota; + max_lite_topic_size_ = client_config_.subscriber.max_lite_topic_size; +} + +LiteSubscriptionManager::~LiteSubscriptionManager() { + shutdown(); +} + +void LiteSubscriptionManager::startUp() { + // Perform initial full sync + syncAllLiteSubscription(); + + // Schedule periodic full sync every 30 seconds + auto weak_self = std::weak_ptr(); + // Note: We cannot use shared_from_this here since LiteSubscriptionManager + // doesn't inherit from enable_shared_from_this. Use raw capture instead. + // The scheduler will be cancelled on shutdown. + auto sync_functor = [this]() { + syncAllLiteSubscription(); + }; + + sync_task_handle_ = client_manager_->getScheduler()->schedule( + sync_functor, SYNC_TASK_NAME, + std::chrono::seconds(30), std::chrono::seconds(30)); + SPDLOG_INFO("LiteSubscriptionManager started for bindTopic={}, group={}", + bind_topic_, group_.name()); +} + +void LiteSubscriptionManager::shutdown() { + if (sync_task_handle_) { + client_manager_->getScheduler()->cancel(sync_task_handle_); + sync_task_handle_ = 0; + SPDLOG_INFO("LiteSubscriptionManager stopped for bindTopic={}, group={}", + bind_topic_, group_.name()); + } +} + +void LiteSubscriptionManager::subscribeLite(const std::string& lite_topic, + const OffsetOption* offset_option, + std::error_code& ec) { + { + absl::MutexLock lk(&lite_topic_set_mtx_); + if (lite_topic_set_.contains(lite_topic)) { + SPDLOG_DEBUG("Lite topic already subscribed: {}, bindTopic={}, group={}", + lite_topic, bind_topic_, group_.name()); + return; + } + } + + validateLiteTopic(lite_topic, max_lite_topic_size_, ec); + if (ec) { + return; + } + + checkLiteSubscriptionQuota(1, ec); + if (ec) { + SPDLOG_ERROR("Failed to subscribeLite {}: quota exceeded, bindTopic={}, group={}", + lite_topic, bind_topic_, group_.name()); + return; + } + + std::set topics = {lite_topic}; + syncLiteSubscription(rmq::LiteSubscriptionAction::PARTIAL_ADD, topics, offset_option, ec); + if (ec) { + SPDLOG_ERROR("Failed to subscribeLite {}, bindTopic={}, group={}", + lite_topic, bind_topic_, group_.name()); + return; + } + + { + absl::MutexLock lk(&lite_topic_set_mtx_); + lite_topic_set_.insert(lite_topic); + } + SPDLOG_INFO("SubscribeLite {}, bindTopic={}, group={}, clientId={}", + lite_topic, bind_topic_, group_.name(), client_config_.client_id); +} + +void LiteSubscriptionManager::unsubscribeLite(const std::string& lite_topic, std::error_code& ec) { + { + absl::MutexLock lk(&lite_topic_set_mtx_); + if (!lite_topic_set_.contains(lite_topic)) { + SPDLOG_DEBUG("Lite topic not subscribed: {}, bindTopic={}, group={}", + lite_topic, bind_topic_, group_.name()); + return; + } + } + + std::set topics = {lite_topic}; + syncLiteSubscription(rmq::LiteSubscriptionAction::PARTIAL_REMOVE, topics, nullptr, ec); + if (ec) { + SPDLOG_ERROR("Failed to unsubscribeLite {}, bindTopic={}, group={}", + lite_topic, bind_topic_, group_.name()); + return; + } + + { + absl::MutexLock lk(&lite_topic_set_mtx_); + lite_topic_set_.erase(lite_topic); + } + SPDLOG_INFO("UnsubscribeLite {}, bindTopic={}, group={}, clientId={}", + lite_topic, bind_topic_, group_.name(), client_config_.client_id); +} + +std::set LiteSubscriptionManager::getLiteTopicSet() const { + absl::MutexLock lk(&lite_topic_set_mtx_); + return std::set(lite_topic_set_.begin(), lite_topic_set_.end()); +} + +void LiteSubscriptionManager::syncSettings(const rmq::Settings& settings) { + if (!settings.has_subscription()) { + return; + } + const auto& subscription = settings.subscription(); + if (subscription.has_lite_subscription_quota()) { + lite_subscription_quota_ = subscription.lite_subscription_quota(); + SPDLOG_INFO("Updated lite_subscription_quota={}, bindTopic={}, group={}", + lite_subscription_quota_, bind_topic_, group_.name()); + } + if (subscription.has_max_lite_topic_size()) { + max_lite_topic_size_ = subscription.max_lite_topic_size(); + SPDLOG_INFO("Updated max_lite_topic_size={}, bindTopic={}, group={}", + max_lite_topic_size_, bind_topic_, group_.name()); + } +} + +void LiteSubscriptionManager::onNotifyUnsubscribeLiteCommand(const std::string& lite_topic) { + SPDLOG_INFO("NotifyUnsubscribeLiteCommand: liteTopic={}, group={}, bindTopic={}", + lite_topic, group_.name(), bind_topic_); + if (!lite_topic.empty()) { + absl::MutexLock lk(&lite_topic_set_mtx_); + lite_topic_set_.erase(lite_topic); + } +} + +void LiteSubscriptionManager::syncAllLiteSubscription() { + std::set topics; + { + absl::MutexLock lk(&lite_topic_set_mtx_); + topics.insert(lite_topic_set_.begin(), lite_topic_set_.end()); + } + + // Check quota with 0 delta (just log warning) + std::error_code ec; + checkLiteSubscriptionQuota(0, ec); + if (ec) { + SPDLOG_WARN("Lite subscription quota exceeded during syncAll, bindTopic={}, group={}", + bind_topic_, group_.name()); + return; + } + + syncLiteSubscription(rmq::LiteSubscriptionAction::COMPLETE_ADD, topics, nullptr, ec); + if (ec) { + SPDLOG_ERROR("Schedule syncAllLiteSubscription failed, clientId={}, ec={}", + client_config_.client_id, ec.message()); + } +} + +void LiteSubscriptionManager::syncLiteSubscription(rmq::LiteSubscriptionAction action, + const std::set& topics, + const OffsetOption* offset_option, + std::error_code& ec) { + SyncLiteSubscriptionRequest request; + request.set_action(action); + request.mutable_topic()->set_resource_namespace(client_config_.resource_namespace); + request.mutable_topic()->set_name(bind_topic_); + request.mutable_group()->CopyFrom(group_); + for (const auto& topic : topics) { + request.add_lite_topic_set(topic); + } + + if (offset_option) { + auto* proto_option = request.mutable_offset_option(); + switch (offset_option->type()) { + case OffsetOption::Type::POLICY: { + auto policy_value = static_cast(offset_option->value()); + switch (policy_value) { + case OffsetOption::Policy::LAST: + proto_option->set_policy(rmq::OffsetOption_Policy_LAST); + break; + case OffsetOption::Policy::MIN: + proto_option->set_policy(rmq::OffsetOption_Policy_MIN); + break; + case OffsetOption::Policy::MAX: + proto_option->set_policy(rmq::OffsetOption_Policy_MAX); + break; + } + break; + } + case OffsetOption::Type::OFFSET: + proto_option->set_offset(offset_option->value()); + break; + case OffsetOption::Type::TAIL_N: + proto_option->set_tail_n(offset_option->value()); + break; + case OffsetOption::Type::TIMESTAMP: + proto_option->set_timestamp(offset_option->value()); + break; + } + } + + SPDLOG_DEBUG("SyncLiteSubscription: action={}, bindTopic={}, group={}, topics-size={}", + static_cast(action), bind_topic_, group_.name(), topics.size()); + + auto timeout = absl::ToChronoMilliseconds(client_config_.request_timeout); + + Metadata metadata; + Signature::sign(client_config_, metadata); + + // Get endpoints from the provider (set by LitePushConsumerImpl) + absl::flat_hash_set endpoints; + if (endpoints_provider_) { + endpoints = endpoints_provider_(); + } + + if (endpoints.empty()) { + SPDLOG_WARN("No endpoints available for SyncLiteSubscription, bindTopic={}, group={}", + bind_topic_, group_.name()); + ec = ErrorCode::NotFound; + return; + } + + for (const auto& target_host : endpoints) { + auto callback = [&ec](const std::error_code& callback_ec, const SyncLiteSubscriptionResponse& /*response*/) { + if (callback_ec) { + ec = callback_ec; + } + }; + client_manager_->syncLiteSubscription(target_host, metadata, request, timeout, callback); + } +} + +void LiteSubscriptionManager::validateLiteTopic(const std::string& lite_topic, int max_length, std::error_code& ec) { + if (lite_topic.empty()) { + SPDLOG_ERROR("liteTopic is blank"); + ec = ErrorCode::IllegalLiteTopic; + return; + } + if (static_cast(lite_topic.length()) > max_length) { + SPDLOG_ERROR("liteTopic length {} exceeded max length {}, liteTopic: {}", + lite_topic.length(), max_length, lite_topic); + ec = ErrorCode::IllegalLiteTopic; + return; + } +} + +void LiteSubscriptionManager::checkLiteSubscriptionQuota(int delta, std::error_code& ec) { + std::size_t current_size = 0; + { + absl::MutexLock lk(&lite_topic_set_mtx_); + current_size = lite_topic_set_.size(); + } + + if (static_cast(current_size) + delta > lite_subscription_quota_) { + SPDLOG_WARN("Lite subscription quota exceeded: current={}, delta={}, quota={}", + current_size, delta, lite_subscription_quota_); + ec = ErrorCode::LiteSubscriptionQuotaExceeded; + } +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 3450faf24..c6bc5cea2 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -166,6 +166,8 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq system_properties->set_priority(message.priority()); } else if (message.extension().transactional) { system_properties->set_message_type(rmq::MessageType::TRANSACTION); + } else if (!message.liteTopic().empty()) { + system_properties->set_message_type(rmq::MessageType::LITE); } else { system_properties->set_message_type(rmq::MessageType::NORMAL); } @@ -184,6 +186,11 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq system_properties->set_message_group(message.group()); } + // Lite Topic + if (!message.liteTopic().empty()) { + system_properties->set_lite_topic(message.liteTopic()); + } + system_properties->set_message_id(message.id()); system_properties->set_queue_id(message_queue.id()); diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp index bd2a99775..8ab6bd914 100644 --- a/cpp/source/rocketmq/PushConsumerImpl.cpp +++ b/cpp/source/rocketmq/PushConsumerImpl.cpp @@ -31,7 +31,6 @@ #include "RpcClient.h" #include "Signature.h" #include "Tag.h" -#include "google/protobuf/util/time_util.h" #include "opencensus/stats/stats.h" #include "rocketmq/MQClientException.h" #include "rocketmq/MessageListener.h" @@ -412,8 +411,13 @@ void PushConsumerImpl::nack(const Message& message, const std::functionset_name(message.topic()); request.set_receipt_handle(message.extension().receipt_handle); request.set_message_id(message.id()); - request.mutable_invisible_duration()->CopyFrom( - google::protobuf::util::TimeUtil::MillisecondsToDuration(duration.count())); + // Set lite_topic if present + if (!message.liteTopic().empty()) { + request.set_lite_topic(message.liteTopic()); + } + auto duration_ms = duration.count(); + request.mutable_invisible_duration()->set_seconds(duration_ms / 1000); + request.mutable_invisible_duration()->set_nanos((duration_ms % 1000) * 1000000); auto cb = [callback](const std::error_code& ec, const ChangeInvisibleDurationResponse& response) { @@ -442,6 +446,11 @@ void PushConsumerImpl::forwardToDeadLetterQueue(const Message& message, request.set_delivery_attempt(message.extension().delivery_attempt); request.set_max_delivery_attempts(max_delivery_attempts_); + // Set lite_topic if present + if (!message.liteTopic().empty()) { + request.set_lite_topic(message.liteTopic()); + } + client_manager_->forwardMessageToDeadLetterQueue(target_host, metadata, request, absl::ToChronoMilliseconds(client_config_.request_timeout), cb); } @@ -454,6 +463,10 @@ void PushConsumerImpl::wrapAckMessageRequest(const Message& msg, AckMessageReque auto entry = new rmq::AckMessageEntry(); entry->set_message_id(msg.id()); entry->set_receipt_handle(msg.extension().receipt_handle); + // Set lite_topic if present + if (!msg.liteTopic().empty()) { + entry->set_lite_topic(msg.liteTopic()); + } request.mutable_entries()->AddAllocated(entry); } @@ -527,10 +540,9 @@ void PushConsumerImpl::buildClientSettings(rmq::Settings& settings) { settings.set_client_type(rmq::ClientType::PUSH_CONSUMER); auto subscription = settings.mutable_subscription(); subscription->mutable_group()->CopyFrom(client_config_.subscriber.group); - auto polling_timeout = google::protobuf::util::TimeUtil::MillisecondsToDuration( - absl::ToInt64Milliseconds(client_config_.subscriber.polling_timeout)); - subscription->mutable_long_polling_timeout()->set_seconds(polling_timeout.seconds()); - subscription->mutable_long_polling_timeout()->set_nanos(polling_timeout.nanos()); + auto polling_timeout_ms = absl::ToInt64Milliseconds(client_config_.subscriber.polling_timeout); + subscription->mutable_long_polling_timeout()->set_seconds(polling_timeout_ms / 1000); + subscription->mutable_long_polling_timeout()->set_nanos((polling_timeout_ms % 1000) * 1000000); subscription->set_receive_batch_size(client_config_.subscriber.receive_batch_size); { diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h b/cpp/source/rocketmq/include/FifoProducerPartition.h index 8d0e00d81..22928c7ec 100644 --- a/cpp/source/rocketmq/include/FifoProducerPartition.h +++ b/cpp/source/rocketmq/include/FifoProducerPartition.h @@ -16,8 +16,6 @@ */ #pragma once -#include "absl/base/internal/thread_annotations.h" - #include #include #include diff --git a/cpp/source/rocketmq/include/LitePushConsumerImpl.h b/cpp/source/rocketmq/include/LitePushConsumerImpl.h new file mode 100644 index 000000000..a9d8f9357 --- /dev/null +++ b/cpp/source/rocketmq/include/LitePushConsumerImpl.h @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "LiteSubscriptionManager.h" +#include "PushConsumerImpl.h" +#include "absl/strings/string_view.h" +#include "rocketmq/OffsetOption.h" + +ROCKETMQ_NAMESPACE_BEGIN + +/** + * @brief Internal implementation of LitePushConsumer. + * + * Extends PushConsumerImpl with lite subscription management: + * - Uses ClientType::LITE_PUSH_CONSUMER + * - Creates and manages LiteSubscriptionManager for dynamic lite topic subscriptions + * - Handles NotifyUnsubscribeLiteCommand from server telemetry + * - Syncs lite subscription settings from server + */ +class LitePushConsumerImpl : public PushConsumerImpl { +public: + explicit LitePushConsumerImpl(absl::string_view group_name, const std::string& bind_topic); + + ~LitePushConsumerImpl() override; + + void start() override; + + void shutdown() override; + + void buildClientSettings(rmq::Settings& settings) override + LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); + + void prepareHeartbeatData(HeartbeatRequest& request) override; + + /** + * Subscribe to a lite topic with optional offset. + */ + void subscribeLite(const std::string& lite_topic, const OffsetOption* offset_option, std::error_code& ec); + + /** + * Unsubscribe from a lite topic. + */ + void unsubscribeLite(const std::string& lite_topic, std::error_code& ec); + + /** + * Get current lite topic set. + */ + std::set getLiteTopicSet() const; + + /** + * Get the bind topic name. + */ + const std::string& getBindTopicName() const { return bind_topic_; } + + /** + * Handle NotifyUnsubscribeLiteCommand from server. + */ + void onNotifyUnsubscribeLiteCommand(const std::string& lite_topic) override; + + /** + * Handle settings update from server telemetry. + * Called when server pushes new settings via telemetry stream. + */ + void onSettingsUpdate(const rmq::Settings& settings); + + LiteSubscriptionManager& liteSubscriptionManager() { return *lite_subscription_manager_; } + +protected: + std::shared_ptr self() override { + return shared_from_this(); + } + +private: + std::string bind_topic_; + std::unique_ptr lite_subscription_manager_; + + /** + * Provide current route endpoints to LiteSubscriptionManager. + */ + absl::flat_hash_set getCurrentEndpoints(); +}; + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/include/LiteSubscriptionManager.h b/cpp/source/rocketmq/include/LiteSubscriptionManager.h new file mode 100644 index 000000000..404a9bafc --- /dev/null +++ b/cpp/source/rocketmq/include/LiteSubscriptionManager.h @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "ClientConfig.h" +#include "ClientManager.h" +#include "Protocol.h" +#include "Scheduler.h" +#include "Signature.h" +#include "absl/container/flat_hash_set.h" +#include "absl/synchronization/mutex.h" +#include "rocketmq/OffsetOption.h" + +ROCKETMQ_NAMESPACE_BEGIN + +/** + * @brief Manages lite topic subscriptions for LitePushConsumer. + * + * Responsibilities: + * - Maintains the set of subscribed lite topics + * - Synchronizes subscriptions with the server via SyncLiteSubscription RPC + * - Enforces quota limits (lite_subscription_quota, max_lite_topic_size) + * - Handles server-pushed NotifyUnsubscribeLiteCommand + * - Periodically re-syncs all subscriptions (every 30 seconds) + */ +class LiteSubscriptionManager { +public: + /** + * @param client_manager Shared pointer to the ClientManager for RPC calls. + * @param client_config Reference to the ClientConfig for signing and endpoints. + * @param bind_topic The parent topic name. + * @param group The consumer group resource. + */ + LiteSubscriptionManager(std::shared_ptr client_manager, + ClientConfig& client_config, + const std::string& bind_topic, + const rmq::Resource& group); + + ~LiteSubscriptionManager(); + + /** + * Set the endpoints provider function. + * Called by LitePushConsumerImpl to provide current route endpoints. + */ + void setEndpointsProvider(std::function()> provider) { + endpoints_provider_ = std::move(provider); + } + + /** + * Start the periodic sync task. + * Performs initial full sync, then schedules every 30 seconds. + */ + void startUp(); + + /** + * Stop the periodic sync task. + */ + void shutdown(); + + /** + * Subscribe to a lite topic. + * + * @param lite_topic Lite topic name. + * @param offset_option Optional offset option (nullptr for default). + * @param ec Error code set on failure. + */ + void subscribeLite(const std::string& lite_topic, + const OffsetOption* offset_option, + std::error_code& ec); + + /** + * Unsubscribe from a lite topic. + * + * @param lite_topic Lite topic name. + * @param ec Error code set on failure. + */ + void unsubscribeLite(const std::string& lite_topic, std::error_code& ec); + + /** + * Get a snapshot of currently subscribed lite topics. + */ + std::set getLiteTopicSet() const; + + /** + * Sync subscription settings from server telemetry. + * Updates lite_subscription_quota and max_lite_topic_size. + */ + void syncSettings(const rmq::Settings& settings); + + /** + * Handle server-pushed unsubscribe command. + */ + void onNotifyUnsubscribeLiteCommand(const std::string& lite_topic); + + /** + * Get the bind topic name. + */ + const std::string& getBindTopicName() const { return bind_topic_; } + + /** + * Get the consumer group name. + */ + const std::string& getConsumerGroupName() const { return group_.name(); } + +private: + /** + * Sync all lite subscriptions to all endpoints (COMPLETE_ADD). + */ + void syncAllLiteSubscription(); + + /** + * Send SyncLiteSubscription RPC to all route endpoints. + * + * @param action Subscription action (PARTIAL_ADD, PARTIAL_REMOVE, COMPLETE_ADD, etc.) + * @param topics Set of lite topic names to sync. + * @param offset_option Optional offset option. + * @param ec Error code set on failure. + */ + void syncLiteSubscription(rmq::LiteSubscriptionAction action, + const std::set& topics, + const OffsetOption* offset_option, + std::error_code& ec); + + /** + * Validate lite topic name length. + */ + void validateLiteTopic(const std::string& lite_topic, int max_length, std::error_code& ec); + + /** + * Check if lite subscription quota allows adding delta more subscriptions. + */ + void checkLiteSubscriptionQuota(int delta, std::error_code& ec); + + std::shared_ptr client_manager_; + ClientConfig& client_config_; + std::string bind_topic_; + rmq::Resource group_; + + mutable absl::Mutex lite_topic_set_mtx_; + absl::flat_hash_set lite_topic_set_ GUARDED_BY(lite_topic_set_mtx_); + + std::int32_t lite_subscription_quota_{0}; + std::int32_t max_lite_topic_size_{64}; + + std::function()> endpoints_provider_; + + std::uintptr_t sync_task_handle_{0}; + static const char* SYNC_TASK_NAME; +}; + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h b/cpp/source/rocketmq/include/PushConsumerImpl.h index 42c3fe493..1a1fc0668 100644 --- a/cpp/source/rocketmq/include/PushConsumerImpl.h +++ b/cpp/source/rocketmq/include/PushConsumerImpl.h @@ -177,11 +177,12 @@ class PushConsumerImpl : virtual public ClientImpl, public std::enable_shared_fr void onVerifyMessage(MessageConstSharedPtr, std::function) override; -private: absl::flat_hash_map topic_filter_expression_table_ GUARDED_BY(topic_filter_expression_table_mtx_); mutable absl::Mutex topic_filter_expression_table_mtx_; +private: + /** * Consume message thread pool size. */ diff --git a/cpp/source/rocketmq/tests/BUILD.bazel b/cpp/source/rocketmq/tests/BUILD.bazel index 1c9e2aa2d..6e5bcf41e 100644 --- a/cpp/source/rocketmq/tests/BUILD.bazel +++ b/cpp/source/rocketmq/tests/BUILD.bazel @@ -75,4 +75,22 @@ cc_test( "PriorityMessageTest.cpp", ], deps = base_deps +) + +cc_test( + name = "lite_message_test", + srcs = [ + "LiteMessageTest.cpp", + ], + deps = base_deps +) + +cc_test( + name = "lite_subscription_manager_test", + srcs = [ + "LiteSubscriptionManagerTest.cpp", + ], + deps = base_deps + [ + "//source/client/mocks:client_mocks", + ], ) \ No newline at end of file diff --git a/cpp/source/rocketmq/tests/CMakeLists.txt b/cpp/source/rocketmq/tests/CMakeLists.txt index f8c80eeae..97fd7d371 100644 --- a/cpp/source/rocketmq/tests/CMakeLists.txt +++ b/cpp/source/rocketmq/tests/CMakeLists.txt @@ -12,6 +12,8 @@ endmacro() add_rocketmq_test(client_impl_test ClientImplTest.cpp) add_rocketmq_test(consume_message_service_test ConsumeMessageServiceTest.cpp) +add_rocketmq_test(lite_message_test LiteMessageTest.cpp) +add_rocketmq_test(lite_subscription_manager_test LiteSubscriptionManagerTest.cpp) add_rocketmq_test(optional_test OptionalTest.cpp) add_rocketmq_test(priority_message_test PriorityMessageTest.cpp) add_rocketmq_test(send_context_test SendContextTest.cpp) diff --git a/cpp/source/rocketmq/tests/LiteMessageTest.cpp b/cpp/source/rocketmq/tests/LiteMessageTest.cpp new file mode 100644 index 000000000..8fc93b507 --- /dev/null +++ b/cpp/source/rocketmq/tests/LiteMessageTest.cpp @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "Protocol.h" +#include "rocketmq/Message.h" +#include "rocketmq/OffsetOption.h" +#include "gtest/gtest.h" + +ROCKETMQ_NAMESPACE_BEGIN + +// ============================================================================ +// Message Lite Topic Tests +// ============================================================================ + +TEST(LiteMessageTest, testLiteTopicMessageBuilder) { + auto message = Message::newBuilder() + .withTopic("LiteTopic") + .withBody("Test lite message") + .withTag("LiteTag") + .withKeys({"key1"}) + .withLiteTopic("lite-topic-1") + .build(); + + ASSERT_NE(message, nullptr); + EXPECT_EQ(message->topic(), "LiteTopic"); + EXPECT_EQ(message->body(), "Test lite message"); + EXPECT_EQ(message->tag(), "LiteTag"); + EXPECT_EQ(message->liteTopic(), "lite-topic-1"); +} + +TEST(LiteMessageTest, testDefaultLiteTopicIsEmpty) { + auto message = Message::newBuilder() + .withTopic("NormalTopic") + .withBody("Normal message") + .build(); + + ASSERT_NE(message, nullptr); + EXPECT_TRUE(message->liteTopic().empty()); +} + +TEST(LiteMessageTest, testLiteTopicWithDifferentNames) { + std::vector lite_topic_names = { + "lite-topic-1", + "lite_topic_2", + "lite.topic.3", + "a", + std::string(64, 'x'), + }; + + for (const auto& name : lite_topic_names) { + auto message = Message::newBuilder() + .withTopic("ParentTopic") + .withBody("body") + .withLiteTopic(name) + .build(); + ASSERT_NE(message, nullptr); + EXPECT_EQ(message->liteTopic(), name); + } +} + +TEST(LiteMessageTest, testLiteTopicSetterWithSpaces) { + // Blank lite topic should throw, mirroring Java's testLiteTopicSetterWithSpaces + EXPECT_THROW( + Message::newBuilder().withLiteTopic(" "), + std::invalid_argument); +} + +TEST(LiteMessageTest, testLiteTopicSetterWithTab) { + EXPECT_THROW( + Message::newBuilder().withLiteTopic("\t"), + std::invalid_argument); +} + +TEST(LiteMessageTest, testLiteTopicSetterWithEmpty) { + EXPECT_THROW( + Message::newBuilder().withLiteTopic(""), + std::invalid_argument); +} + +TEST(LiteMessageTest, testLiteTopicSetter) { + // Basic lite topic setter, mirroring Java's testLiteTopicSetter + auto message = Message::newBuilder() + .withLiteTopic("liteTopicA") + .withTopic("TestTopic") + .withBody("Test body") + .build(); + EXPECT_FALSE(message->liteTopic().empty()); + EXPECT_EQ(message->liteTopic(), "liteTopicA"); +} + +TEST(LiteMessageTest, testBuildDefaults) { + // Default message should have no liteTopic, no group, no deliveryTimestamp, no priority + // Mirroring Java's testBuild + auto message = Message::newBuilder() + .withTopic("TestTopic") + .withBody("Test body") + .build(); + EXPECT_TRUE(message->liteTopic().empty()); + EXPECT_TRUE(message->group().empty()); + EXPECT_EQ(message->deliveryTimestamp().time_since_epoch().count(), 0); + EXPECT_EQ(message->priority(), -1); +} + +TEST(LiteMessageTest, testMessageTypeConflict_DeliveryAndLite) { + // deliveryTimestamp + liteTopic conflict + auto now = std::chrono::system_clock::now(); + EXPECT_THROW( + Message::newBuilder().availableAfter(now).withLiteTopic("HW"), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_DeliveryAndGroup) { + auto now = std::chrono::system_clock::now(); + EXPECT_THROW( + Message::newBuilder().availableAfter(now).withGroup("HW"), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_DeliveryAndPriority) { + auto now = std::chrono::system_clock::now(); + EXPECT_THROW( + Message::newBuilder().availableAfter(now).withPriority(1), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_GroupAndLite) { + EXPECT_THROW( + Message::newBuilder().withGroup("HW").withLiteTopic("HW"), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_GroupAndDelivery) { + auto now = std::chrono::system_clock::now(); + EXPECT_THROW( + Message::newBuilder().withGroup("HW").availableAfter(now), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_GroupAndPriority) { + EXPECT_THROW( + Message::newBuilder().withGroup("HW").withPriority(1), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_LiteAndGroup) { + EXPECT_THROW( + Message::newBuilder().withLiteTopic("HW").withGroup("HW"), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_LiteAndDelivery) { + auto now = std::chrono::system_clock::now(); + EXPECT_THROW( + Message::newBuilder().withLiteTopic("HW").availableAfter(now), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_LiteAndPriority) { + EXPECT_THROW( + Message::newBuilder().withLiteTopic("HW").withPriority(1), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_PriorityAndDelivery) { + auto now = std::chrono::system_clock::now(); + EXPECT_THROW( + Message::newBuilder().withPriority(1).availableAfter(now), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_PriorityAndLite) { + EXPECT_THROW( + Message::newBuilder().withPriority(1).withLiteTopic("HW"), + std::invalid_argument); +} + +TEST(LiteMessageTest, testMessageTypeConflict_PriorityAndGroup) { + EXPECT_THROW( + Message::newBuilder().withPriority(1).withGroup("HW"), + std::invalid_argument); +} + +TEST(LiteMessageTest, testLiteTopicWithProperties) { + std::unordered_map properties; + properties["custom_key"] = "custom_value"; + + auto message = Message::newBuilder() + .withTopic("LiteTopic") + .withBody("Lite message with properties") + .withLiteTopic("my-lite-topic") + .withProperties(properties) + .build(); + + ASSERT_NE(message, nullptr); + EXPECT_EQ(message->liteTopic(), "my-lite-topic"); + EXPECT_EQ(message->properties().size(), 1); + EXPECT_EQ(message->properties().at("custom_key"), "custom_value"); +} + +// ============================================================================ +// OffsetOption Tests +// ============================================================================ + +TEST(OffsetOptionTest, testLastOffset) { + auto option = OffsetOption::lastOffset(); + EXPECT_EQ(option.type(), OffsetOption::Type::POLICY); + EXPECT_EQ(option.value(), static_cast(OffsetOption::Policy::LAST)); +} + +TEST(OffsetOptionTest, testMinOffset) { + auto option = OffsetOption::minOffset(); + EXPECT_EQ(option.type(), OffsetOption::Type::POLICY); + EXPECT_EQ(option.value(), static_cast(OffsetOption::Policy::MIN)); +} + +TEST(OffsetOptionTest, testMaxOffset) { + auto option = OffsetOption::maxOffset(); + EXPECT_EQ(option.type(), OffsetOption::Type::POLICY); + EXPECT_EQ(option.value(), static_cast(OffsetOption::Policy::MAX)); +} + +TEST(OffsetOptionTest, testOfOffset) { + auto option = OffsetOption::ofOffset(100); + EXPECT_EQ(option.type(), OffsetOption::Type::OFFSET); + EXPECT_EQ(option.value(), 100); +} + +TEST(OffsetOptionTest, testOfTailN) { + auto option = OffsetOption::ofTailN(50); + EXPECT_EQ(option.type(), OffsetOption::Type::TAIL_N); + EXPECT_EQ(option.value(), 50); +} + +TEST(OffsetOptionTest, testOfTimestamp) { + auto option = OffsetOption::ofTimestamp(1700000000000); + EXPECT_EQ(option.type(), OffsetOption::Type::TIMESTAMP); + EXPECT_EQ(option.value(), 1700000000000); +} + +TEST(OffsetOptionTest, testEquality) { + auto opt1 = OffsetOption::lastOffset(); + auto opt2 = OffsetOption::lastOffset(); + auto opt3 = OffsetOption::minOffset(); + + EXPECT_EQ(opt1, opt2); + EXPECT_NE(opt1, opt3); +} + +TEST(OffsetOptionTest, testNegativeOffsetThrows) { + EXPECT_THROW(OffsetOption::ofOffset(-1), std::invalid_argument); + EXPECT_THROW(OffsetOption::ofTailN(-1), std::invalid_argument); + EXPECT_THROW(OffsetOption::ofTimestamp(-1), std::invalid_argument); +} + +TEST(OffsetOptionTest, testZeroOffsetIsValid) { + EXPECT_NO_THROW(OffsetOption::ofOffset(0)); + EXPECT_NO_THROW(OffsetOption::ofTailN(0)); + EXPECT_NO_THROW(OffsetOption::ofTimestamp(0)); +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/tests/LiteSubscriptionManagerTest.cpp b/cpp/source/rocketmq/tests/LiteSubscriptionManagerTest.cpp new file mode 100644 index 000000000..f6418a968 --- /dev/null +++ b/cpp/source/rocketmq/tests/LiteSubscriptionManagerTest.cpp @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#include "ClientManagerMock.h" +#include "LiteSubscriptionManager.h" +#include "Protocol.h" +#include "rocketmq/ErrorCode.h" +#include "rocketmq/OffsetOption.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using ::testing::_; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::NiceMock; + +ROCKETMQ_NAMESPACE_BEGIN + +class LiteSubscriptionManagerTest : public ::testing::Test { +protected: + void SetUp() override { + mock_client_manager_ = std::make_shared>(); + + // Setup default mock behavior for syncLiteSubscription - return success + ON_CALL(*mock_client_manager_, syncLiteSubscription(_, _, _, _, _)) + .WillByDefault([](const std::string&, const Metadata&, const SyncLiteSubscriptionRequest&, + std::chrono::milliseconds, + const std::function& cb) { + SyncLiteSubscriptionResponse response; + cb(std::error_code{}, response); + }); + + // Set up client config + client_config_.client_id = "test-client-id"; + client_config_.request_timeout = absl::FromChrono(std::chrono::seconds(3)); + + // Set up group resource + group_.set_resource_namespace("test-namespace"); + group_.set_name("test-consumer-group"); + } + + std::shared_ptr> mock_client_manager_; + ClientConfig client_config_; + rmq::Resource group_; + std::string bind_topic_ = "test-bind-topic"; +}; + +// ============================================================================ +// Constructor Tests +// ============================================================================ + +TEST_F(LiteSubscriptionManagerTest, testConstructor) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + EXPECT_EQ(manager.getBindTopicName(), "test-bind-topic"); + EXPECT_EQ(manager.getConsumerGroupName(), "test-consumer-group"); +} + +TEST_F(LiteSubscriptionManagerTest, testInitialLiteTopicSetIsEmpty) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + auto topic_set = manager.getLiteTopicSet(); + EXPECT_TRUE(topic_set.empty()); +} + +// ============================================================================ +// syncSettings Tests +// ============================================================================ + +TEST_F(LiteSubscriptionManagerTest, testSyncSettingsWithSubscription) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + auto* subscription = settings.mutable_subscription(); + subscription->set_lite_subscription_quota(10); + subscription->set_max_lite_topic_size(128); + + manager.syncSettings(settings); + + // Settings should be updated - we can verify by trying to subscribe + // (quota check will pass with quota=10) + std::error_code ec; + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + manager.subscribeLite("test-topic", nullptr, ec); + EXPECT_FALSE(ec); +} + +TEST_F(LiteSubscriptionManagerTest, testSyncSettingsWithoutSubscription) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + // No subscription field + + manager.syncSettings(settings); + + // Default max_lite_topic_size should remain 64 + // Try to subscribe with a topic longer than 64 characters + std::error_code ec; + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + std::string long_topic(65, 'a'); + manager.subscribeLite(long_topic, nullptr, ec); + EXPECT_EQ(ec, ErrorCode::IllegalLiteTopic); +} + +// ============================================================================ +// subscribeLite Tests +// ============================================================================ + +TEST_F(LiteSubscriptionManagerTest, testSubscribeLiteSuccess) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + // Set quota + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(10); + manager.syncSettings(settings); + + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + + std::error_code ec; + manager.subscribeLite("test-topic", nullptr, ec); + + EXPECT_FALSE(ec); + auto topic_set = manager.getLiteTopicSet(); + EXPECT_EQ(topic_set.size(), 1); + EXPECT_TRUE(topic_set.count("test-topic")); +} + +TEST_F(LiteSubscriptionManagerTest, testSubscribeLiteAlreadySubscribed) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(10); + manager.syncSettings(settings); + + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + + std::error_code ec; + manager.subscribeLite("test-topic", nullptr, ec); + EXPECT_FALSE(ec); + + // Subscribe again - should be idempotent + manager.subscribeLite("test-topic", nullptr, ec); + EXPECT_FALSE(ec); + + auto topic_set = manager.getLiteTopicSet(); + EXPECT_EQ(topic_set.size(), 1); +} + +TEST_F(LiteSubscriptionManagerTest, testSubscribeLiteWithOffsetOption) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(10); + manager.syncSettings(settings); + + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + + std::error_code ec; + auto offset = OffsetOption::ofOffset(100); + manager.subscribeLite("test-topic", &offset, ec); + + EXPECT_FALSE(ec); + auto topic_set = manager.getLiteTopicSet(); + EXPECT_TRUE(topic_set.count("test-topic")); +} + +TEST_F(LiteSubscriptionManagerTest, testSubscribeLiteBlankTopic) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + std::error_code ec; + manager.subscribeLite("", nullptr, ec); + EXPECT_EQ(ec, ErrorCode::IllegalLiteTopic); +} + +TEST_F(LiteSubscriptionManagerTest, testSubscribeLiteTopicTooLong) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + // Default max_lite_topic_size is 64 + std::string long_topic(65, 'a'); + + std::error_code ec; + manager.subscribeLite(long_topic, nullptr, ec); + EXPECT_EQ(ec, ErrorCode::IllegalLiteTopic); +} + +TEST_F(LiteSubscriptionManagerTest, testSubscribeLiteQuotaExceeded) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + // Set quota to 0 + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(0); + manager.syncSettings(settings); + + std::error_code ec; + manager.subscribeLite("test-topic", nullptr, ec); + EXPECT_EQ(ec, ErrorCode::LiteSubscriptionQuotaExceeded); +} + +TEST_F(LiteSubscriptionManagerTest, testSubscribeLiteNoEndpoints) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(10); + manager.syncSettings(settings); + + // No endpoints provider set - endpoints will be empty + std::error_code ec; + manager.subscribeLite("test-topic", nullptr, ec); + EXPECT_EQ(ec, ErrorCode::NotFound); +} + +// ============================================================================ +// unsubscribeLite Tests +// ============================================================================ + +TEST_F(LiteSubscriptionManagerTest, testUnsubscribeLiteSuccess) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(10); + manager.syncSettings(settings); + + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + + std::error_code ec; + manager.subscribeLite("test-topic", nullptr, ec); + EXPECT_FALSE(ec); + + manager.unsubscribeLite("test-topic", ec); + EXPECT_FALSE(ec); + + auto topic_set = manager.getLiteTopicSet(); + EXPECT_TRUE(topic_set.empty()); +} + +TEST_F(LiteSubscriptionManagerTest, testUnsubscribeLiteNotSubscribed) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + std::error_code ec; + manager.unsubscribeLite("non-existent-topic", ec); + // Should not set error, just return silently + EXPECT_FALSE(ec); + + auto topic_set = manager.getLiteTopicSet(); + EXPECT_TRUE(topic_set.empty()); +} + +// ============================================================================ +// onNotifyUnsubscribeLiteCommand Tests +// ============================================================================ + +TEST_F(LiteSubscriptionManagerTest, testOnNotifyUnsubscribeLiteCommand) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(10); + manager.syncSettings(settings); + + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + + std::error_code ec; + manager.subscribeLite("test-topic", nullptr, ec); + EXPECT_FALSE(ec); + + // Server pushes unsubscribe command + manager.onNotifyUnsubscribeLiteCommand("test-topic"); + + auto topic_set = manager.getLiteTopicSet(); + EXPECT_FALSE(topic_set.count("test-topic")); +} + +TEST_F(LiteSubscriptionManagerTest, testOnNotifyUnsubscribeLiteCommandWithEmptyTopic) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(10); + manager.syncSettings(settings); + + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + + std::error_code ec; + manager.subscribeLite("test-topic", nullptr, ec); + EXPECT_FALSE(ec); + + // Empty topic should not remove anything + manager.onNotifyUnsubscribeLiteCommand(""); + + auto topic_set = manager.getLiteTopicSet(); + EXPECT_TRUE(topic_set.count("test-topic")); +} + +// ============================================================================ +// Multiple Operations Tests +// ============================================================================ + +TEST_F(LiteSubscriptionManagerTest, testMultipleOperationsSequence) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(10); + manager.syncSettings(settings); + + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + + std::error_code ec; + + // Subscribe to multiple topics + manager.subscribeLite("topic1", nullptr, ec); + EXPECT_FALSE(ec); + + auto offset = OffsetOption::minOffset(); + manager.subscribeLite("topic2", &offset, ec); + EXPECT_FALSE(ec); + + // Subscribe to topic1 again (should be idempotent) + manager.subscribeLite("topic1", nullptr, ec); + EXPECT_FALSE(ec); + + auto topic_set = manager.getLiteTopicSet(); + EXPECT_EQ(topic_set.size(), 2); + EXPECT_TRUE(topic_set.count("topic1")); + EXPECT_TRUE(topic_set.count("topic2")); + + // Unsubscribe topic1 + manager.unsubscribeLite("topic1", ec); + EXPECT_FALSE(ec); + + topic_set = manager.getLiteTopicSet(); + EXPECT_EQ(topic_set.size(), 1); + EXPECT_FALSE(topic_set.count("topic1")); + EXPECT_TRUE(topic_set.count("topic2")); +} + +TEST_F(LiteSubscriptionManagerTest, testGetLiteTopicSetReturnsCopy) { + LiteSubscriptionManager manager(mock_client_manager_, client_config_, bind_topic_, group_); + + rmq::Settings settings; + settings.mutable_subscription()->set_lite_subscription_quota(10); + manager.syncSettings(settings); + + absl::flat_hash_set endpoints = {"localhost:8081"}; + manager.setEndpointsProvider([&endpoints]() { return endpoints; }); + + std::error_code ec; + manager.subscribeLite("topic1", nullptr, ec); + EXPECT_FALSE(ec); + + // Get set and modify it + auto topic_set = manager.getLiteTopicSet(); + topic_set.insert("should-not-appear"); + + // Original should not be affected + auto actual_set = manager.getLiteTopicSet(); + EXPECT_EQ(actual_set.size(), 1); + EXPECT_FALSE(actual_set.count("should-not-appear")); +} + +ROCKETMQ_NAMESPACE_END