Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ find_package(gRPC CONFIG REQUIRED)
find_package(absl REQUIRED)
find_package(OpenSSL REQUIRED)

# Thread safety annotation macros
# Map legacy names to ABSL_-prefixed names (newer abseil >= 20230125)
# so that existing code using GUARDED_BY, LOCKS_EXCLUDED, etc. continues to work.
# Function-style macros cannot be passed via -D on the command line, so we
# force-include a compatibility header that defines them instead.
add_compile_options(
"-include"
"${CMAKE_CURRENT_SOURCE_DIR}/source/base/include/thread_annotations_compat.h"
)

if(NOT EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/proto/apache/rocketmq/v2/definition.proto")
message(FATAL_ERROR "Proto files not found. Run: git submodule update --init --recursive")
endif()
Expand Down
22 changes: 22 additions & 0 deletions cpp/examples/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,26 @@ cc_binary(
"//source/rocketmq:rocketmq_library",
"@com_github_gflags_gflags//:gflags",
]
)

cc_binary(
name = "example_lite_push_consumer",
srcs = [
"ExampleLitePushConsumer.cpp",
],
deps = [
"//source/rocketmq:rocketmq_library",
"@com_github_gflags_gflags//:gflags",
],
)

cc_binary(
name = "example_lite_producer",
srcs = [
"ExampleLiteProducer.cpp",
],
deps = [
"//source/rocketmq:rocketmq_library",
"@com_github_gflags_gflags//:gflags",
],
)
4 changes: 3 additions & 1 deletion cpp/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ add_example(example_producer_with_timed_message ExampleProducerWithTimedMessage.
add_example(example_producer_with_priority_message ExampleProducerWithPriorityMessage.cpp)
add_example(example_producer_with_transactional_message ExampleProducerWithTransactionalMessage.cpp)
add_example(example_push_consumer ExamplePushConsumer.cpp)
add_example(example_simple_consumer ExampleSimpleConsumer.cpp)
add_example(example_simple_consumer ExampleSimpleConsumer.cpp)
add_example(example_lite_push_consumer ExampleLitePushConsumer.cpp)
add_example(example_lite_producer ExampleLiteProducer.cpp)
90 changes: 90 additions & 0 deletions cpp/examples/ExampleLiteProducer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <iostream>
#include <thread>

#include "gflags/gflags.h"
#include "rocketmq/Logger.h"
#include "rocketmq/Producer.h"

using namespace ROCKETMQ_NAMESPACE;

DEFINE_string(topic, "LiteTopic", "Parent topic for lite messages");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
DEFINE_int32(message_count, 5, "Number of lite messages to send");

int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);

auto& logger = getLogger();
logger.setConsoleLevel(Level::Info);
logger.setLevel(Level::Info);
logger.init();

std::cout << "=== Lite Producer Example ===" << std::endl;

CredentialsProviderPtr credentials_provider;
if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret);
}

auto producer = Producer::newBuilder()
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withRequestTimeout(std::chrono::seconds(3))
.withCredentialsProvider(credentials_provider)
.withSsl(FLAGS_tls)
.build())
.build();

std::cout << "Producer started successfully" << std::endl;

// Send lite topic messages
for (int i = 1; i <= FLAGS_message_count; ++i) {
std::string lite_topic_name = "lite-topic-" + std::to_string(i);
std::string message_body = "This is a lite message for Apache RocketMQ, index=" + std::to_string(i);

auto message = Message::newBuilder()
.withTopic(FLAGS_topic)
.withBody(message_body)
.withKeys({"key-" + std::to_string(i)})
.withLiteTopic(lite_topic_name)
.build();

std::error_code ec;
auto send_receipt = producer.send(std::move(message), ec);

if (ec) {
std::cerr << "Failed to send message " << i << ": " << ec.message() << std::endl;
} else {
std::cout << "Message " << i << " sent successfully" << std::endl;
std::cout << " - Topic: " << FLAGS_topic << std::endl;
std::cout << " - Lite Topic: " << lite_topic_name << std::endl;
std::cout << " - Message ID: " << send_receipt.message_id << std::endl;
}

std::this_thread::sleep_for(std::chrono::seconds(1));
}

std::cout << "All messages sent!" << std::endl;

return EXIT_SUCCESS;
}
105 changes: 105 additions & 0 deletions cpp/examples/ExampleLitePushConsumer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <iostream>
#include <system_error>
#include <thread>

#include "gflags/gflags.h"
#include "rocketmq/LitePushConsumer.h"
#include "rocketmq/Logger.h"
#include "rocketmq/OffsetOption.h"

using namespace ROCKETMQ_NAMESPACE;

DEFINE_string(topic, "LiteTopic", "Bind topic to which lite topics belong");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_string(group, "LitePushConsumer", "GroupId, created through your instance management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");

int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);

auto& logger = getLogger();
logger.setConsoleLevel(Level::Info);
logger.setLevel(Level::Info);
logger.init();

auto listener = [](const Message& message) {
std::cout << "Received a message[topic=" << message.topic()
<< ", liteTopic=" << message.liteTopic()
<< ", MsgId=" << message.id()
<< ", body=" << message.body() << "]" << std::endl;
return ConsumeResult::SUCCESS;
};

CredentialsProviderPtr credentials_provider;
if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret);
}

// Build LitePushConsumer
auto lite_consumer = LitePushConsumer::newBuilder()
.bindTopic(FLAGS_topic)
.withGroup(FLAGS_group)
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withRequestTimeout(std::chrono::seconds(3))
.withCredentialsProvider(credentials_provider)
.withSsl(FLAGS_tls)
.build())
.withConsumeThreads(4)
.withListener(listener)
.build();

std::cout << "LitePushConsumer started, group=" << FLAGS_group
<< ", bindTopic=" << FLAGS_topic << std::endl;

// Subscribe to lite topics
std::error_code ec;
lite_consumer.subscribeLite("lite-topic-1", ec);
if (ec) {
std::cerr << "Failed to subscribe lite-topic-1: " << ec.message() << std::endl;
} else {
std::cout << "Subscribed to lite-topic-1" << std::endl;
}

lite_consumer.subscribeLite("lite-topic-2", OffsetOption::lastOffset(), ec);
if (ec) {
std::cerr << "Failed to subscribe lite-topic-2: " << ec.message() << std::endl;
} else {
std::cout << "Subscribed to lite-topic-2 with LAST offset" << std::endl;
}

// Print current lite topic set
auto topic_set = lite_consumer.getLiteTopicSet();
std::cout << "Current lite topic set size: " << topic_set.size() << std::endl;
for (const auto& topic : topic_set) {
std::cout << " - " << topic << std::endl;
}

// Keep running
std::this_thread::sleep_for(std::chrono::minutes(30));

// Shutdown
lite_consumer.shutdown();
std::cout << "LitePushConsumer shutdown" << std::endl;

return EXIT_SUCCESS;
}
15 changes: 15 additions & 0 deletions cpp/include/rocketmq/ErrorCode.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ enum class ErrorCode : int {
IllegalFilterExpression = 40010,
InvalidReceiptHandle = 40011,

/**
* @brief Format of lite topic is illegal.
*/
IllegalLiteTopic = 40020,

/**
* @brief Message property conflicts with its type.
*/
Expand Down Expand Up @@ -164,6 +169,16 @@ enum class ErrorCode : int {
*/
TooManyRequests = 42900,

/**
* @brief Lite topic quota exceeded.
*/
LiteTopicQuotaExceeded = 42901,

/**
* @brief Lite subscription quota exceeded.
*/
LiteSubscriptionQuotaExceeded = 42902,

/**
* @brief The server is unwilling to process the request because either an
* individual header field, or all the header fields collectively, are too
Expand Down
Loading
Loading