Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
sudo apt-get install -y gcc g++
mkdir -p build
cd build
cmake ../ -DCMAKE_VERBOSE_MAKEFILE=ON -DBUILD_TEST=ON ${{matrix.cmake_args}} -DOPENSSL_ROOT_DIR=/opt/openssl/
cmake ../ -DCMAKE_VERBOSE_MAKEFILE=ON -DBUILD_TEST=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo ${{matrix.cmake_args}} -DOPENSSL_ROOT_DIR=/opt/openssl/
make
- name: Test
run: |
Expand Down
75 changes: 74 additions & 1 deletion gloo/test/base_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#include "gloo/test/base_test.h"
#include "gloo/test/openssl_utils.h"

#if GLOO_HAVE_TRANSPORT_IBVERBS
#include <infiniband/verbs.h>
#endif

namespace gloo {
namespace test {

Expand Down Expand Up @@ -49,6 +53,72 @@ const std::vector<Transport> kTransportsForFunctionAlgorithms = {
#endif
};

#if GLOO_HAVE_TRANSPORT_IBVERBS
static bool probeIbverbs() {
int numDevices = 0;
struct ibv_device** deviceList = ibv_get_device_list(&numDevices);
if (!deviceList || numDevices == 0) {
if (deviceList)
ibv_free_device_list(deviceList);
return false;
}
struct ibv_context* ctx = ibv_open_device(deviceList[0]);
ibv_free_device_list(deviceList);
if (!ctx) {
return false;
}
struct ibv_pd* pd = ibv_alloc_pd(ctx);
if (!pd) {
ibv_close_device(ctx);
return false;
}
struct ibv_comp_channel* channel = ibv_create_comp_channel(ctx);
if (!channel) {
ibv_dealloc_pd(pd);
ibv_close_device(ctx);
return false;
}
struct ibv_cq* cq = ibv_create_cq(ctx, 64, nullptr, channel, 0);
if (!cq) {
ibv_destroy_comp_channel(channel);
ibv_dealloc_pd(pd);
ibv_close_device(ctx);
return false;
}
struct ibv_qp_init_attr qpAttr{};
qpAttr.send_cq = cq;
qpAttr.recv_cq = cq;
qpAttr.cap.max_send_wr = 16;
qpAttr.cap.max_recv_wr = 16;
qpAttr.cap.max_send_sge = 1;
qpAttr.cap.max_recv_sge = 1;
qpAttr.qp_type = IBV_QPT_RC;
struct ibv_qp* qp = ibv_create_qp(pd, &qpAttr);
if (!qp) {
ibv_destroy_cq(cq);
ibv_destroy_comp_channel(channel);
ibv_dealloc_pd(pd);
ibv_close_device(ctx);
return false;
}
ibv_destroy_qp(qp);
ibv_destroy_cq(cq);
ibv_destroy_comp_channel(channel);
ibv_dealloc_pd(pd);
ibv_close_device(ctx);
return true;
}
#endif

bool ibverbsAvailable() {
#if GLOO_HAVE_TRANSPORT_IBVERBS
static bool available = probeIbverbs();
return available;
#else
return false;
#endif
}

std::shared_ptr<::gloo::transport::Device> createDevice(Transport transport) {
#if GLOO_HAVE_TRANSPORT_TCP
if (transport == Transport::TCP) {
Expand Down Expand Up @@ -76,11 +146,14 @@ std::shared_ptr<::gloo::transport::Device> createDevice(Transport transport) {
#endif
#if GLOO_HAVE_TRANSPORT_IBVERBS
if (transport == Transport::IBVERBS) {
if (!ibverbsAvailable()) {
return nullptr;
}
gloo::transport::ibverbs::attr attr;
attr.port = 1;
try {
return ::gloo::transport::ibverbs::CreateDevice(attr);
} catch (const InvalidOperationException& e) {
} catch (const std::exception& e) {
GLOO_INFO("IBVERBS not available: ", e.what());
}
}
Expand Down
32 changes: 29 additions & 3 deletions gloo/test/base_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <gtest/gtest.h>

#include <atomic>
#include <exception>
#include <functional>
#include <stdexcept>
Expand Down Expand Up @@ -75,6 +76,14 @@ extern const std::vector<Transport> kTransportsForClassAlgorithms;
extern const std::vector<Transport> kTransportsForFunctionAlgorithms;
extern const std::vector<Transport> kTransportsForRDMA;

// Returns true if ibverbs is available with functional RDMA hardware.
// Probes once using raw ibverbs APIs (through ibv_create_qp) and caches
// the result. On CI runners without real RDMA hardware, rdma-core software
// providers let device open / PD alloc / CQ creation succeed but QP creation
// fails. Creating a gloo Device (which starts a background thread) on such
// systems causes segfaults after fork() in TransportMultiProcTest.
bool ibverbsAvailable();

std::shared_ptr<::gloo::transport::Device> createDevice(Transport transport);

class BaseTest : public ::testing::Test {
Expand Down Expand Up @@ -115,18 +124,31 @@ class BaseTest : public ::testing::Test {
Barrier barrier(size);
auto store = std::make_shared<::gloo::rendezvous::HashStore>();

// Track whether workers found the transport unavailable so we can
// call GTEST_SKIP() from the main thread after joining.
// GTEST_SKIP() is not safe to call from worker threads — concurrent
// calls race on GTest internals and can cause "terminate called
// recursively" (SIGABRT / exit code 134).
std::atomic<bool> transportUnavailable{false};

spawnThreads(size, [&](int rank) {
auto context =
std::make_shared<::gloo::rendezvous::Context>(rank, size, base);

// Create device per thread to avoid collisions then they are using the
// Create device per thread to avoid collisions when they are using the
// socket address.
auto device = device_creator(transport);
if (!device) {
GTEST_SKIP() << "Skipping test: transport not available";
transportUnavailable.store(true);
return;
}

try {
context->connectFullMesh(store, device);
} catch (const std::exception&) {
transportUnavailable.store(true);
return;
}
context->connectFullMesh(store, device);

try {
fn(context);
Expand All @@ -150,6 +172,10 @@ class BaseTest : public ::testing::Test {
context->closeConnections();
}
});

if (transportUnavailable.load()) {
GTEST_SKIP() << "Skipping test: transport not available";
}
}

void spawn(
Expand Down
15 changes: 15 additions & 0 deletions gloo/test/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,25 @@

// One-time init to use EPIPE errors instead of SIGPIPE
#ifndef _WIN32
#include <execinfo.h>
#include <signal.h>
#include <unistd.h>
#include <cstdio>

namespace {

static void segfault_handler(int sig) {
void* array[30];
int size = backtrace(array, 30);
fprintf(stderr, "[DIAG] Signal %d caught, backtrace:\n", sig);
backtrace_symbols_fd(array, size, STDERR_FILENO);
_exit(128 + sig);
}

struct Initializer {
Initializer() {
signal(SIGPIPE, SIG_IGN);
signal(SIGSEGV, segfault_handler);
}
};
Initializer initializer;
Expand Down
Loading