Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,12 @@ class Connection {
/// @param newValue The new value to write.
virtual void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) = 0;

/// Atomically add an 8-byte value to a destination RegisteredMemory.
/// @param dst The destination RegisteredMemory.
/// @param dstOffset The offset in bytes from the start of the destination RegisteredMemory.
/// @param value The value to add.
virtual void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) = 0;

/// Flush any pending writes to the remote process.
/// @param timeoutUsec Timeout in microseconds. Default: -1 (no timeout)
virtual void flush(int64_t timeoutUsec = -1) = 0;
Expand Down
1 change: 1 addition & 0 deletions include/mscclpp/gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri
#define cudaIpcGetMemHandle(...) hipIpcGetMemHandle(__VA_ARGS__)
#define cudaIpcOpenMemHandle(...) hipIpcOpenMemHandle(__VA_ARGS__)
#define cudaIpcCloseMemHandle(...) hipIpcCloseMemHandle(__VA_ARGS__)
#define cudaLaunchKernel(...) hipLaunchKernel(__VA_ARGS__)

#define cuGetErrorString(...) hipDrvGetErrorString(__VA_ARGS__)
#define cuMemAddressReserve(...) hipMemAddressReserve(__VA_ARGS__)
Expand Down
2 changes: 1 addition & 1 deletion include/mscclpp/port_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ProxyService : public BaseProxyService {
/// Stop the proxy service.
void stopProxy();

private:
protected:
std::vector<std::shared_ptr<Host2DeviceSemaphore>> semaphores_;
std::vector<RegisteredMemory> memories_;
std::shared_ptr<Proxy> proxy_;
Expand Down
1 change: 1 addition & 0 deletions python/mscclpp/core_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ void register_core(nb::module_& m) {
self->updateAndSync(dst, dstOffset, (uint64_t*)src, newValue);
},
nb::arg("dst"), nb::arg("dstOffset"), nb::arg("src"), nb::arg("newValue"))
.def("atomic_add", &Connection::atomicAdd, nb::arg("dst"), nb::arg("dstOffset"), nb::arg("value"))
.def("flush", &Connection::flush, nb::call_guard<nb::gil_scoped_release>(), nb::arg("timeoutUsec") = (int64_t)3e7)
.def("transport", &Connection::transport)
.def("remote_transport", &Connection::remoteTransport)
Expand Down
30 changes: 30 additions & 0 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <thread>

#include "api.h"
#include "connection_kernels.hpp"
#include "context.hpp"
#include "debug.h"
#include "endpoint.hpp"
Expand Down Expand Up @@ -141,6 +142,17 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
#endif
}

void CudaIpcConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) {
validateTransport(dst, remoteTransport());

uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.data()) + dstOffset);
void* args[] = {reinterpret_cast<void**>(&dstPtr), &value};

stream_->launch(connectionAtomicAddKernelFunc(), dim3(1), dim3(1), args, 0);

INFO(MSCCLPP_P2P, "CudaIpcConnection atomicAdd: value %lu to %p", value, dstPtr);
}

void CudaIpcConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
Expand Down Expand Up @@ -244,6 +256,19 @@ void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint6
#endif
}

void IBConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) {
validateTransport(dst, remoteTransport());
auto dstTransportInfo = getImpl(dst).getTransportInfo(remoteTransport());
if (dstTransportInfo.ibLocal) {
throw Error("dst is local, which is not supported", ErrorCode::InvalidUsage);
}

auto dstMrInfo = dstTransportInfo.ibMrInfo;
qp_.lock()->stageAtomicAdd(dstTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset, value, /*signaled=*/true);
qp_.lock()->postSend();
INFO(MSCCLPP_NET, "IBConnection atomicAdd: value %lu to %p", value, (uint8_t*)dstMrInfo.addr + dstOffset);
}

void IBConnection::flush(int64_t timeoutUsec) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
Expand Down Expand Up @@ -409,6 +434,11 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset,
#endif
}

void EthernetConnection::atomicAdd([[maybe_unused]] RegisteredMemory dst, [[maybe_unused]] uint64_t dstOffset,
[[maybe_unused]] uint64_t value) {
throw mscclpp::Error("EthernetConnection does not support atomicAdd", ErrorCode::InvalidUsage);
}

void EthernetConnection::flush(int64_t) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0);
Expand Down
19 changes: 19 additions & 0 deletions src/connection_kernels.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include <mscclpp/atomic_device.hpp>

#include "connection_kernels.hpp"

namespace mscclpp {

__global__ void connectionAtomicAddKernel(uint64_t* dst, uint64_t value) {
atomicFetchAdd(dst, value, memoryOrderRelaxed);
}

const void* connectionAtomicAddKernelFunc() {
static const void* func = reinterpret_cast<const void*>(&connectionAtomicAddKernel);
return func;
}

} // namespace mscclpp
5 changes: 5 additions & 0 deletions src/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ void CudaIpcStream::memcpyH2D(void *dst, const void *src, size_t nbytes) {
dirty_ = true;
}

void CudaIpcStream::launch(const void *func, dim3 gridDim, dim3 blockDim, void **args, size_t sharedMem) {
setStreamIfNeeded();
MSCCLPP_CUDATHROW(cudaLaunchKernel(func, gridDim, blockDim, args, sharedMem, *stream_));
}

void CudaIpcStream::sync() {
setStreamIfNeeded();
if (dirty_) {
Expand Down
9 changes: 9 additions & 0 deletions src/include/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ class CudaIpcConnection : public Connection {

void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;

void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;

void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override;

void flush(int64_t timeoutUsec) override;
};

Expand All @@ -51,8 +54,11 @@ class IBConnection : public Connection {

void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;

void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;

void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override;

void flush(int64_t timeoutUsec) override;
};

Expand Down Expand Up @@ -82,8 +88,11 @@ class EthernetConnection : public Connection {

void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;

void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;

void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override;

void flush(int64_t timeoutUsec) override;
};

Expand Down
13 changes: 13 additions & 0 deletions src/include/connection_kernels.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#ifndef MSCCLPP_CONNECTION_KERNEL_HPP_
#define MSCCLPP_CONNECTION_KERNEL_HPP_

namespace mscclpp {

const void *connectionAtomicAddKernelFunc();

} // namespace mscclpp

#endif // MSCCLPP_CONNECTION_KERNEL_HPP_
2 changes: 2 additions & 0 deletions src/include/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class CudaIpcStream {

void memcpyH2D(void *dst, const void *src, size_t nbytes);

void launch(const void *func, dim3 gridDim, dim3 blockDim, void **args, size_t sharedMem);

void sync();

operator cudaStream_t() const { return *stream_; }
Expand Down
95 changes: 94 additions & 1 deletion test/mp_unit/communicator_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,97 @@ TEST_F(CommunicatorTest, WriteWithHostSemaphores) {

ASSERT_TRUE(testWriteCorrectness());
communicator->bootstrap()->barrier();
}
}

TEST_F(CommunicatorTest, ConcurrentAtomicAdd) {
if (gEnv->rank >= numRanksToUse) return;

mscclpp::TransportFlags cudaIpcOrIb = mscclpp::AllIBTransports | mscclpp::Transport::CudaIpc;

// Check if any connections support atomicAdd
bool hasAtomicAddSupport = false;
int numAtomicAddConnections = 0;
for (auto& entry : connections) {
auto& conn = entry.second;
if (cudaIpcOrIb.has(conn->transport())) {
hasAtomicAddSupport = true;
if (entry.first == 0) { // Only count connections to rank 0
numAtomicAddConnections++;
}
}
}

if (!hasAtomicAddSupport) {
GTEST_SKIP() << "No connections support atomicAdd in this configuration";
return;
}

// Initialize device buffers with zeros for atomic add test
size_t dataCount = deviceBufferSize / sizeof(uint64_t);
for (int n = 0; n < (int)devicePtr.size(); n++) {
std::vector<uint64_t> hostBuffer(dataCount, 0);
mscclpp::gpuMemcpy<uint64_t>(reinterpret_cast<uint64_t*>(devicePtr[n].get()), hostBuffer.data(), dataCount,
cudaMemcpyHostToDevice);
}
communicator->bootstrap()->barrier();

// All ranks (except rank 0) atomic add to offset 0 in rank 0's memory
if (gEnv->rank != 0) {
auto it = connections.find(0);
if (it != connections.end()) {
auto& conn = it->second;
auto& rank0Memory = remoteMemory[0].at(0);

// Check if connection to rank 0 supports atomicAdd
if (cudaIpcOrIb.has(conn->transport())) {
try {
// Each rank adds its rank value to offset 0 in rank 0's memory
uint64_t valueToAdd = gEnv->rank;
conn->atomicAdd(rank0Memory, 0, valueToAdd);
conn->flush();
} catch (const mscclpp::Error& e) {
if (e.getErrorCode() == mscclpp::ErrorCode::InvalidUsage) {
// Connection doesn't support atomicAdd, skip
} else {
throw;
}
}
}
}
}
communicator->bootstrap()->barrier();

// Only rank 0 checks the result
if (gEnv->rank == 0) {
// Calculate expected sum: sum of ranks that successfully added (excluding rank 0)
uint64_t expectedSum = 0;
for (int i = 1; i < gEnv->worldSize; i++) {
auto it = connections.find(i);
if (it != connections.end()) {
auto& conn = it->second;
if (cudaIpcOrIb.has(conn->transport())) {
expectedSum += i; // Each rank i adds its rank value
}
}
}

// Poll until the atomic additions are complete
bool ready = false;
int niter = 0;
do {
std::vector<uint64_t> hostBuffer(dataCount, 0);
mscclpp::gpuMemcpy<uint64_t>(hostBuffer.data(), reinterpret_cast<uint64_t*>(devicePtr[0].get()), dataCount,
cudaMemcpyDeviceToHost);

uint64_t actualSum = hostBuffer[0];
ready = (actualSum == expectedSum);

niter++;
if (niter == 10000) {
FAIL() << "Polling is stuck. Expected sum: " << expectedSum << ", Actual sum: " << actualSum;
}
} while (!ready);
}

communicator->bootstrap()->barrier();
}
Loading