Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/velox_backend_x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,10 @@ jobs:

cd /work
bash dev/builddeps-veloxbe.sh --run_setup_script=OFF --build_arrow=OFF --build_tests=ON --build_benchmarks=ON --enable_gpu=ON

cd /work/cpp/build && ctest -V

cd /work
rm -rf ep/build-velox/build/velox_ep
build/mvn clean package -Pbackends-velox -Pspark-3.4 -DskipTests
ccache -s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,12 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(true)

val CUDF_CONCURRENT_GPU_TASKS =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cudf.concurrentGpuTasks")
.doc("The number of concurrent GPU tasks to run.")
.intConf
.createWithDefault(1)

val CUDF_BATCH_SIZE =
buildConf("spark.gluten.sql.columnar.backend.velox.cudf.batchSize")
.doc("Cudf input batch size after shuffle reader")
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "utils/qat/QatCodec.h"
#endif
#ifdef GLUTEN_ENABLE_GPU
#include "cudf/GpuLock.h"
#include "operators/plannodes/CudfVectorStream.h"
#include "velox/experimental/cudf/CudfConfig.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h"
Expand Down Expand Up @@ -168,6 +169,7 @@ void VeloxBackend::init(

#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
configureGpuTaskConcurrency(backendConf_->get<uint32_t>(kCudfConcurrentGpuTasks, kCudfConcurrentGpuTasksDefault));
std::unordered_map<std::string, std::string> options = {
{velox::cudf_velox::CudfConfig::kCudfEnabled, "true"},
{velox::cudf_velox::CudfConfig::kCudfDebugEnabled, backendConf_->get(kDebugCudf, kDebugCudfDefault)},
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ const std::string kCudfMemoryResourceDefault =
const std::string kCudfMemoryPercent = "spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent";
const std::string kCudfMemoryPercentDefault = "50";

// Maximum number of concurrent tasks allowed to execute GPU work.
const std::string kCudfConcurrentGpuTasks = "spark.gluten.sql.columnar.backend.velox.cudf.concurrentGpuTasks";
const uint32_t kCudfConcurrentGpuTasksDefault = 1;

/// Preferred size of batches in bytes to be returned by operators.
const std::string kVeloxPreferredBatchBytes = "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes";

Expand Down
70 changes: 42 additions & 28 deletions cpp/velox/cudf/GpuLock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
#include <glog/logging.h>
#include <condition_variable>
#include <mutex>
#include <optional>
#include <stdexcept>

namespace gluten {

namespace {
thread_local bool gThreadGpuHolder = false;

struct GpuLockState {
std::mutex gGpuMutex;
std::condition_variable gGpuCv;
std::optional<std::thread::id> gGpuOwner;
size_t maxConcurrentTasks{1};
size_t activeTasks{0};
};

GpuLockState& getGpuLockState() {
Expand All @@ -37,51 +39,63 @@ GpuLockState& getGpuLockState() {
}
} // namespace

void configureGpuTaskConcurrency(size_t maxConcurrentTasks) {
if (maxConcurrentTasks <= 0) {
throw std::invalid_argument("configureGpuTaskConcurrency() requires maxConcurrentTasks > 0");
}

std::lock_guard<std::mutex> lock(getGpuLockState().gGpuMutex);
getGpuLockState().maxConcurrentTasks = maxConcurrentTasks;
getGpuLockState().gGpuCv.notify_all();
}

void lockGpu() {
std::thread::id tid = std::this_thread::get_id();
std::unique_lock<std::mutex> lock(getGpuLockState().gGpuMutex);
if (getGpuLockState().gGpuOwner == tid) {
// Reentrant call from the same thread — do nothing
if (gThreadGpuHolder) {
return;
}

// Wait until the GPU lock becomes available
getGpuLockState().gGpuCv.wait(lock, [] { return !getGpuLockState().gGpuOwner.has_value(); });
std::unique_lock<std::mutex> lock(getGpuLockState().gGpuMutex);
getGpuLockState().gGpuCv.wait(
lock, [] { return getGpuLockState().activeTasks < getGpuLockState().maxConcurrentTasks; });

// Acquire ownership
getGpuLockState().gGpuOwner = tid;
++getGpuLockState().activeTasks;
gThreadGpuHolder = true;
}

bool tryLockGpu() {
std::thread::id tid = std::this_thread::get_id();
std::unique_lock<std::mutex> lock(getGpuLockState().gGpuMutex);
if (getGpuLockState().gGpuOwner == tid) {
if (gThreadGpuHolder) {
return true;
}
if (getGpuLockState().gGpuOwner.has_value()) {
return false;

std::unique_lock<std::mutex> lock(getGpuLockState().gGpuMutex);

// Check if a permit is available without blocking
if (getGpuLockState().activeTasks < getGpuLockState().maxConcurrentTasks) {
++getGpuLockState().activeTasks;
gThreadGpuHolder = true;
return true;
}
getGpuLockState().gGpuOwner = tid;
return true;

return false;
}

void unlockGpu() {
std::thread::id tid = std::this_thread::get_id();
std::unique_lock<std::mutex> lock(getGpuLockState().gGpuMutex);
if (!getGpuLockState().gGpuOwner.has_value()) {
LOG(INFO) << "unlockGpu() called when no thread holds the lock!";
if (!gThreadGpuHolder) {
LOG(INFO) << "unlockGpu() called by non-owner thread!" << std::endl;
return;
}

if (getGpuLockState().gGpuOwner != tid) {
throw std::runtime_error("unlockGpu() called by other thread!");
}
gThreadGpuHolder = false;

// Release ownership
getGpuLockState().gGpuOwner = std::nullopt;
{
std::lock_guard<std::mutex> lock(getGpuLockState().gGpuMutex);
if (getGpuLockState().activeTasks == 0) {
throw std::runtime_error("unlockGpu() called with no active GPU tasks!");
}

--getGpuLockState().activeTasks;
}

// Notify one waiting thread
lock.unlock();
getGpuLockState().gGpuCv.notify_one();
}

Expand Down
12 changes: 11 additions & 1 deletion cpp/velox/cudf/GpuLock.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@

#pragma once

#include <cstddef>
#include <thread>

namespace gluten {

/// Acquire the GPU lock, blocking until available. Reentrant for the same thread.
/// Configure the maximum number of concurrent GPU tasks.
/// Must be greater than 0.
void configureGpuTaskConcurrency(size_t maxConcurrentTasks);

/// Acquire a GPU execution permit (reentrant within the same thread).
void lockGpu();

/// Try to acquire a GPU execution permit without blocking (reentrant within the same thread).
/// Returns true if the permit was acquired, false otherwise.
bool tryLockGpu();

/// Release a GPU execution permit held by the current thread.
void unlockGpu();

} // namespace gluten
37 changes: 15 additions & 22 deletions cpp/velox/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,20 @@ function(add_velox_test TEST_EXEC)
endfunction()

if(ENABLE_GPU)
function(import_library TARGET_NAME LIB_PATH)
if(NOT EXISTS ${LIB_PATH})
message(FATAL_ERROR "Library does not exist: ${LIB_PATH}")
endif()
add_library(${TARGET_NAME} STATIC IMPORTED)
set_target_properties(${TARGET_NAME} PROPERTIES IMPORTED_LOCATION
${LIB_PATH})
endfunction()
import_library(
facebook::velox::velox_cudf_expression
${VELOX_BUILD_PATH}/velox/experimental/cudf/expression/libvelox_cudf_expression.a
)
import_library(
facebook::velox::velox_cudf_exec
${VELOX_BUILD_PATH}/velox/experimental/cudf/exec/libvelox_cudf_exec.a)
import_library(
facebook::velox::velox_cudf_vector
${VELOX_BUILD_PATH}/velox/experimental/cudf/vector/libvelox_cudf_vector.a)
import_library(
facebook::velox::velox_cudf_hive_connector
${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/hive/libvelox_cudf_hive_connector.a
)

function(add_velox_gpu_test TEST_EXEC)
set(options)
Expand All @@ -61,20 +67,6 @@ if(ENABLE_GPU)
add_executable(${TEST_EXEC} ${SOURCES} ${VELOX_TEST_COMMON_SRCS})
target_include_directories(${TEST_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox
${CMAKE_SOURCE_DIR}/src)
import_library(
facebook::velox::velox_cudf_expression
${VELOX_BUILD_PATH}/velox/experimental/cudf/expression/libvelox_cudf_expression.a
)
import_library(
facebook::velox::velox_cudf_exec
${VELOX_BUILD_PATH}/velox/experimental/cudf/exec/libvelox_cudf_exec.a)
import_library(
facebook::velox::velox_cudf_vector
${VELOX_BUILD_PATH}/velox/experimental/cudf/vector/libvelox_cudf_vector.a)
import_library(
facebook::velox::velox_cudf_hive_connector
${VELOX_BUILD_PATH}/velox/experimental/cudf/connectors/hive/libvelox_cudf_hive_connector.a
)
target_compile_definitions(
${TEST_EXEC} PUBLIC LIBCUDACXX_ENABLE_EXPERIMENTAL_MEMORY_RESOURCE)
target_link_libraries(
Expand All @@ -94,6 +86,7 @@ if(ENABLE_GPU)

add_velox_gpu_test(velox_gpu_shuffle_writer_test SOURCES
VeloxGpuShuffleWriterTest.cc)
add_velox_gpu_test(velox_gpu_lock_test SOURCES GpuLockTest.cc)
endif()

set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc FilePathGenerator.cc)
Expand Down
123 changes: 123 additions & 0 deletions cpp/velox/tests/GpuLockTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "cudf/GpuLock.h"

#include <atomic>
#include <chrono>
#include <future>
#include <stdexcept>
#include <thread>

#include <gtest/gtest.h>

namespace gluten {

TEST(GpuLockTest, configureRejectsZeroConcurrency) {
configureGpuTaskConcurrency(1);
EXPECT_THROW(configureGpuTaskConcurrency(0), std::invalid_argument);
}

TEST(GpuLockTest, sameThreadLockIsReentrant) {
configureGpuTaskConcurrency(1);
lockGpu();
EXPECT_TRUE(tryLockGpu());

unlockGpu();
EXPECT_TRUE(tryLockGpu());

unlockGpu();
}

TEST(GpuLockTest, tryLockFailsWhilePermitHeldByAnotherThread) {
configureGpuTaskConcurrency(1);
lockGpu();

auto acquired = std::async(std::launch::async, [] { return tryLockGpu(); });
EXPECT_FALSE(acquired.get());

unlockGpu();
}

TEST(GpuLockTest, lockBlocksUntilPermitIsReleased) {
configureGpuTaskConcurrency(1);
lockGpu();

std::promise<void> workerStarted;
auto workerStartedFuture = workerStarted.get_future();
std::atomic<bool> workerAcquired{false};

std::thread worker([&] {
workerStarted.set_value();
EXPECT_FALSE(tryLockGpu());
lockGpu();
workerAcquired.store(true);
unlockGpu();
});

workerStartedFuture.wait();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_FALSE(workerAcquired.load());

unlockGpu();
worker.join();

EXPECT_TRUE(workerAcquired.load());
}

TEST(GpuLockTest, allowsUpToConfiguredConcurrentTasks) {
configureGpuTaskConcurrency(2);

std::promise<void> firstLocked;
std::promise<void> secondLocked;
auto firstLockedFuture = firstLocked.get_future();
auto secondLockedFuture = secondLocked.get_future();
std::promise<void> releaseWorkers;
auto releaseWorkersFuture = releaseWorkers.get_future().share();
std::atomic<int> acquiredCount{0};

std::thread first([&] {
lockGpu();
EXPECT_TRUE(tryLockGpu());
acquiredCount.fetch_add(1);
firstLocked.set_value();
releaseWorkersFuture.wait();
unlockGpu();
});

std::thread second([&] {
lockGpu();
EXPECT_TRUE(tryLockGpu());
acquiredCount.fetch_add(1);
secondLocked.set_value();
releaseWorkersFuture.wait();
unlockGpu();
});

firstLockedFuture.wait();
secondLockedFuture.wait();
EXPECT_EQ(acquiredCount.load(), 2);

auto thirdAcquire = std::async(std::launch::async, [] { return tryLockGpu(); });
EXPECT_FALSE(thirdAcquire.get());

releaseWorkers.set_value();
first.join();
second.join();
}

} // namespace gluten
1 change: 1 addition & 0 deletions docs/velox-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | ⚓ Static | 0 | Set prefetch cache min pct for velox file scan |
| spark.gluten.sql.columnar.backend.velox.checkUsageLeak | ⚓ Static | true | Enable check memory usage leak. |
| spark.gluten.sql.columnar.backend.velox.cudf.batchSize | 🔄 Dynamic | 2147483647 | Cudf input batch size after shuffle reader |
| spark.gluten.sql.columnar.backend.velox.cudf.concurrentGpuTasks | ⚓ Static | 1 | The number of concurrent GPU tasks to run. |
| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | ⚓ Static | false | Enable cudf table scan |
| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation | ⚓ Static | true | Heuristics you can apply to validate a cuDF/GPU plan and only offload when the entire stage can be fully and profitably executed on GPU |
| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | ⚓ Static | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. |
Expand Down
Loading