From b8511386cde2cfb61241504c0490a4f4a16985cd Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 27 May 2026 15:13:33 +0100 Subject: [PATCH 1/2] support configure gpu concurrency --- .github/workflows/velox_backend_x86.yml | 4 + .../apache/gluten/config/VeloxConfig.scala | 6 + cpp/velox/compute/VeloxBackend.cc | 2 + cpp/velox/config/VeloxConfig.h | 4 + cpp/velox/cudf/GpuLock.cc | 70 ++++++---- cpp/velox/cudf/GpuLock.h | 12 +- cpp/velox/tests/CMakeLists.txt | 37 +++--- cpp/velox/tests/GpuLockTest.cc | 123 ++++++++++++++++++ docs/velox-configuration.md | 1 + 9 files changed, 208 insertions(+), 51 deletions(-) create mode 100644 cpp/velox/tests/GpuLockTest.cc diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index 3c9efc44bd7..090ff48bd20 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -1268,6 +1268,10 @@ jobs: cd /work bash dev/builddeps-veloxbe.sh --run_setup_script=OFF --build_arrow=OFF --build_tests=ON --build_benchmarks=ON --enable_gpu=ON + + cd /work/cpp/build && ctest -R "velox_gpu.*" + + cd /work rm -rf ep/build-velox/build/velox_ep build/mvn clean package -Pbackends-velox -Pspark-3.4 -DskipTests ccache -s diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 52c964dfe25..3760f7881f6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -743,6 +743,12 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(true) + val CUDF_CONCURRENT_GPU_TASKS = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.concurrentGpuTasks") + .doc("The number of concurrent GPU tasks to run.") + .intConf + .createWithDefault(1) + val CUDF_BATCH_SIZE = buildConf("spark.gluten.sql.columnar.backend.velox.cudf.batchSize") .doc("Cudf input batch size after shuffle reader") diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 801fc9d8358..fcdd81bfbc0 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -30,6 +30,7 @@ #include "utils/qat/QatCodec.h" #endif #ifdef GLUTEN_ENABLE_GPU +#include "cudf/GpuLock.h" #include "operators/plannodes/CudfVectorStream.h" #include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h" @@ -168,6 +169,7 @@ void VeloxBackend::init( #ifdef GLUTEN_ENABLE_GPU if (backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { + configureGpuTaskConcurrency(backendConf_->get(kCudfConcurrentGpuTasks, kCudfConcurrentGpuTasksDefault)); std::unordered_map options = { {velox::cudf_velox::CudfConfig::kCudfEnabled, "true"}, {velox::cudf_velox::CudfConfig::kCudfDebugEnabled, backendConf_->get(kDebugCudf, kDebugCudfDefault)}, diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 89a9c4e05a4..d374a9b35c6 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -211,6 +211,10 @@ const std::string kCudfMemoryResourceDefault = const std::string kCudfMemoryPercent = "spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent"; const std::string kCudfMemoryPercentDefault = "50"; +// Maximum number of concurrent tasks allowed to execute GPU work. +const std::string kCudfConcurrentGpuTasks = "spark.gluten.sql.columnar.backend.velox.cudf.concurrentGpuTasks"; +const uint32_t kCudfConcurrentGpuTasksDefault = 1; + /// Preferred size of batches in bytes to be returned by operators. const std::string kVeloxPreferredBatchBytes = "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes"; diff --git a/cpp/velox/cudf/GpuLock.cc b/cpp/velox/cudf/GpuLock.cc index f7caaf57f58..e03ab8ac2f3 100644 --- a/cpp/velox/cudf/GpuLock.cc +++ b/cpp/velox/cudf/GpuLock.cc @@ -19,16 +19,18 @@ #include #include #include -#include #include namespace gluten { namespace { +thread_local bool gThreadGpuHolder = false; + struct GpuLockState { std::mutex gGpuMutex; std::condition_variable gGpuCv; - std::optional gGpuOwner; + size_t maxConcurrentTasks{1}; + size_t activeTasks{0}; }; GpuLockState& getGpuLockState() { @@ -37,51 +39,63 @@ GpuLockState& getGpuLockState() { } } // namespace +void configureGpuTaskConcurrency(size_t maxConcurrentTasks) { + if (maxConcurrentTasks <= 0) { + throw std::invalid_argument("configureGpuTaskConcurrency() requires maxConcurrentTasks > 0"); + } + + std::lock_guard lock(getGpuLockState().gGpuMutex); + getGpuLockState().maxConcurrentTasks = maxConcurrentTasks; + getGpuLockState().gGpuCv.notify_all(); +} + void lockGpu() { - std::thread::id tid = std::this_thread::get_id(); - std::unique_lock lock(getGpuLockState().gGpuMutex); - if (getGpuLockState().gGpuOwner == tid) { - // Reentrant call from the same thread — do nothing + if (gThreadGpuHolder) { return; } - // Wait until the GPU lock becomes available - getGpuLockState().gGpuCv.wait(lock, [] { return !getGpuLockState().gGpuOwner.has_value(); }); + std::unique_lock lock(getGpuLockState().gGpuMutex); + getGpuLockState().gGpuCv.wait( + lock, [] { return getGpuLockState().activeTasks < getGpuLockState().maxConcurrentTasks; }); - // Acquire ownership - getGpuLockState().gGpuOwner = tid; + ++getGpuLockState().activeTasks; + gThreadGpuHolder = true; } bool tryLockGpu() { - std::thread::id tid = std::this_thread::get_id(); - std::unique_lock lock(getGpuLockState().gGpuMutex); - if (getGpuLockState().gGpuOwner == tid) { + if (gThreadGpuHolder) { return true; } - if (getGpuLockState().gGpuOwner.has_value()) { - return false; + + std::unique_lock lock(getGpuLockState().gGpuMutex); + + // Check if a permit is available without blocking + if (getGpuLockState().activeTasks < getGpuLockState().maxConcurrentTasks) { + ++getGpuLockState().activeTasks; + gThreadGpuHolder = true; + return true; } - getGpuLockState().gGpuOwner = tid; - return true; + + return false; } void unlockGpu() { - std::thread::id tid = std::this_thread::get_id(); - std::unique_lock lock(getGpuLockState().gGpuMutex); - if (!getGpuLockState().gGpuOwner.has_value()) { - LOG(INFO) << "unlockGpu() called when no thread holds the lock!"; + if (!gThreadGpuHolder) { + LOG(INFO) << "unlockGpu() called by non-owner thread!" << std::endl; return; } - if (getGpuLockState().gGpuOwner != tid) { - throw std::runtime_error("unlockGpu() called by other thread!"); - } + gThreadGpuHolder = false; - // Release ownership - getGpuLockState().gGpuOwner = std::nullopt; + { + std::lock_guard lock(getGpuLockState().gGpuMutex); + if (getGpuLockState().activeTasks == 0) { + throw std::runtime_error("unlockGpu() called with no active GPU tasks!"); + } + + --getGpuLockState().activeTasks; + } - // Notify one waiting thread - lock.unlock(); getGpuLockState().gGpuCv.notify_one(); } diff --git a/cpp/velox/cudf/GpuLock.h b/cpp/velox/cudf/GpuLock.h index bbf55491a70..08730554198 100644 --- a/cpp/velox/cudf/GpuLock.h +++ b/cpp/velox/cudf/GpuLock.h @@ -17,13 +17,23 @@ #pragma once +#include #include namespace gluten { -/// Acquire the GPU lock, blocking until available. Reentrant for the same thread. +/// Configure the maximum number of concurrent GPU tasks. +/// Must be greater than 0. +void configureGpuTaskConcurrency(size_t maxConcurrentTasks); + +/// Acquire a GPU execution permit (reentrant within the same thread). void lockGpu(); + +/// Try to acquire a GPU execution permit without blocking (reentrant within the same thread). +/// Returns true if the permit was acquired, false otherwise. bool tryLockGpu(); + +/// Release a GPU execution permit held by the current thread. void unlockGpu(); } // namespace gluten diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index 00c0c2df69c..9999cb71198 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -37,14 +37,20 @@ function(add_velox_test TEST_EXEC) endfunction() if(ENABLE_GPU) - function(import_library TARGET_NAME LIB_PATH) - if(NOT EXISTS ${LIB_PATH}) - message(FATAL_ERROR "Library does not exist: ${LIB_PATH}") - endif() - add_library(${TARGET_NAME} STATIC IMPORTED) - set_target_properties(${TARGET_NAME} PROPERTIES IMPORTED_LOCATION - ${LIB_PATH}) - endfunction() + import_library( + facebook::velox::velox_cudf_expression + ${VELOX_BUILD_PATH}/velox/experimental/cudf/expression/libvelox_cudf_expression.a + ) + import_library( + facebook::velox::velox_cudf_exec + ${VELOX_BUILD_PATH}/velox/experimental/cudf/exec/libvelox_cudf_exec.a) + import_library( + facebook::velox::velox_cudf_vector + ${VELOX_BUILD_PATH}/velox/experimental/cudf/vector/libvelox_cudf_vector.a) + import_library( + facebook::velox::velox_cudf_hive_connector + ${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/hive/libvelox_cudf_hive_connector.a + ) function(add_velox_gpu_test TEST_EXEC) set(options) @@ -61,20 +67,6 @@ if(ENABLE_GPU) add_executable(${TEST_EXEC} ${SOURCES} ${VELOX_TEST_COMMON_SRCS}) target_include_directories(${TEST_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox ${CMAKE_SOURCE_DIR}/src) - import_library( - facebook::velox::velox_cudf_expression - ${VELOX_BUILD_PATH}/velox/experimental/cudf/expression/libvelox_cudf_expression.a - ) - import_library( - facebook::velox::velox_cudf_exec - ${VELOX_BUILD_PATH}/velox/experimental/cudf/exec/libvelox_cudf_exec.a) - import_library( - facebook::velox::velox_cudf_vector - ${VELOX_BUILD_PATH}/velox/experimental/cudf/vector/libvelox_cudf_vector.a) - import_library( - facebook::velox::velox_cudf_hive_connector - ${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/hive/libvelox_cudf_hive_connector.a - ) target_compile_definitions( ${TEST_EXEC} PUBLIC LIBCUDACXX_ENABLE_EXPERIMENTAL_MEMORY_RESOURCE) target_link_libraries( @@ -94,6 +86,7 @@ if(ENABLE_GPU) add_velox_gpu_test(velox_gpu_shuffle_writer_test SOURCES VeloxGpuShuffleWriterTest.cc) + add_velox_gpu_test(velox_gpu_lock_test SOURCES GpuLockTest.cc) endif() set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc FilePathGenerator.cc) diff --git a/cpp/velox/tests/GpuLockTest.cc b/cpp/velox/tests/GpuLockTest.cc new file mode 100644 index 00000000000..53a67b7bf9c --- /dev/null +++ b/cpp/velox/tests/GpuLockTest.cc @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cudf/GpuLock.h" + +#include +#include +#include +#include +#include + +#include + +namespace gluten { + +TEST(GpuLockTest, configureRejectsZeroConcurrency) { + configureGpuTaskConcurrency(1); + EXPECT_THROW(configureGpuTaskConcurrency(0), std::invalid_argument); +} + +TEST(GpuLockTest, sameThreadLockIsReentrant) { + configureGpuTaskConcurrency(1); + lockGpu(); + EXPECT_TRUE(tryLockGpu()); + + unlockGpu(); + EXPECT_TRUE(tryLockGpu()); + + unlockGpu(); +} + +TEST(GpuLockTest, tryLockFailsWhilePermitHeldByAnotherThread) { + configureGpuTaskConcurrency(1); + lockGpu(); + + auto acquired = std::async(std::launch::async, [] { return tryLockGpu(); }); + EXPECT_FALSE(acquired.get()); + + unlockGpu(); +} + +TEST(GpuLockTest, lockBlocksUntilPermitIsReleased) { + configureGpuTaskConcurrency(1); + lockGpu(); + + std::promise workerStarted; + auto workerStartedFuture = workerStarted.get_future(); + std::atomic workerAcquired{false}; + + std::thread worker([&] { + workerStarted.set_value(); + EXPECT_FALSE(tryLockGpu()); + lockGpu(); + workerAcquired.store(true); + unlockGpu(); + }); + + workerStartedFuture.wait(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_FALSE(workerAcquired.load()); + + unlockGpu(); + worker.join(); + + EXPECT_TRUE(workerAcquired.load()); +} + +TEST(GpuLockTest, allowsUpToConfiguredConcurrentTasks) { + configureGpuTaskConcurrency(2); + + std::promise firstLocked; + std::promise secondLocked; + auto firstLockedFuture = firstLocked.get_future(); + auto secondLockedFuture = secondLocked.get_future(); + std::promise releaseWorkers; + auto releaseWorkersFuture = releaseWorkers.get_future().share(); + std::atomic acquiredCount{0}; + + std::thread first([&] { + lockGpu(); + EXPECT_TRUE(tryLockGpu()); + acquiredCount.fetch_add(1); + firstLocked.set_value(); + releaseWorkersFuture.wait(); + unlockGpu(); + }); + + std::thread second([&] { + lockGpu(); + EXPECT_TRUE(tryLockGpu()); + acquiredCount.fetch_add(1); + secondLocked.set_value(); + releaseWorkersFuture.wait(); + unlockGpu(); + }); + + firstLockedFuture.wait(); + secondLockedFuture.wait(); + EXPECT_EQ(acquiredCount.load(), 2); + + auto thirdAcquire = std::async(std::launch::async, [] { return tryLockGpu(); }); + EXPECT_FALSE(thirdAcquire.get()); + + releaseWorkers.set_value(); + first.join(); + second.join(); +} + +} // namespace gluten diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index 7f727299564..92098c22e19 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -20,6 +20,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | ⚓ Static | 0 | Set prefetch cache min pct for velox file scan | | spark.gluten.sql.columnar.backend.velox.checkUsageLeak | ⚓ Static | true | Enable check memory usage leak. | | spark.gluten.sql.columnar.backend.velox.cudf.batchSize | 🔄 Dynamic | 2147483647 | Cudf input batch size after shuffle reader | +| spark.gluten.sql.columnar.backend.velox.cudf.concurrentGpuTasks | ⚓ Static | 1 | The number of concurrent GPU tasks to run. | | spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | ⚓ Static | false | Enable cudf table scan | | spark.gluten.sql.columnar.backend.velox.cudf.enableValidation | ⚓ Static | true | Heuristics you can apply to validate a cuDF/GPU plan and only offload when the entire stage can be fully and profitably executed on GPU | | spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | ⚓ Static | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. | From 856f199e94c51e2b89f13551d35bff479bdafab9 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 27 May 2026 15:41:35 +0100 Subject: [PATCH 2/2] run cpp test --- .github/workflows/velox_backend_x86.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index 090ff48bd20..8489869a8af 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -1269,7 +1269,7 @@ jobs: cd /work bash dev/builddeps-veloxbe.sh --run_setup_script=OFF --build_arrow=OFF --build_tests=ON --build_benchmarks=ON --enable_gpu=ON - cd /work/cpp/build && ctest -R "velox_gpu.*" + cd /work/cpp/build && ctest -V cd /work rm -rf ep/build-velox/build/velox_ep