diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 6679b91c4..614fe2d8e 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -597,6 +597,12 @@ class Connection { /// @param newValue The new value to write. virtual void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) = 0; + /// Atomically add an 8-byte value to a destination RegisteredMemory. + /// @param dst The destination RegisteredMemory. + /// @param dstOffset The offset in bytes from the start of the destination RegisteredMemory. + /// @param value The value to add. + virtual void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) = 0; + /// Flush any pending writes to the remote process. /// @param timeoutUsec Timeout in microseconds. Default: -1 (no timeout) virtual void flush(int64_t timeoutUsec = -1) = 0; diff --git a/include/mscclpp/gpu.hpp b/include/mscclpp/gpu.hpp index c0c108ccb..a012e44ed 100644 --- a/include/mscclpp/gpu.hpp +++ b/include/mscclpp/gpu.hpp @@ -88,6 +88,7 @@ constexpr auto CU_MEM_ACCESS_FLAGS_PROT_READWRITE = hipMemAccessFlagsProtReadWri #define cudaIpcGetMemHandle(...) hipIpcGetMemHandle(__VA_ARGS__) #define cudaIpcOpenMemHandle(...) hipIpcOpenMemHandle(__VA_ARGS__) #define cudaIpcCloseMemHandle(...) hipIpcCloseMemHandle(__VA_ARGS__) +#define cudaLaunchKernel(...) hipLaunchKernel(__VA_ARGS__) #define cuGetErrorString(...) hipDrvGetErrorString(__VA_ARGS__) #define cuMemAddressReserve(...) hipMemAddressReserve(__VA_ARGS__) diff --git a/include/mscclpp/port_channel.hpp b/include/mscclpp/port_channel.hpp index 0b7c5de20..d653e4821 100644 --- a/include/mscclpp/port_channel.hpp +++ b/include/mscclpp/port_channel.hpp @@ -73,7 +73,7 @@ class ProxyService : public BaseProxyService { /// Stop the proxy service. void stopProxy(); - private: + protected: std::vector> semaphores_; std::vector memories_; std::shared_ptr proxy_; diff --git a/python/mscclpp/core_py.cpp b/python/mscclpp/core_py.cpp index 9fb803683..cfcc7f023 100644 --- a/python/mscclpp/core_py.cpp +++ b/python/mscclpp/core_py.cpp @@ -148,6 +148,7 @@ void register_core(nb::module_& m) { self->updateAndSync(dst, dstOffset, (uint64_t*)src, newValue); }, nb::arg("dst"), nb::arg("dstOffset"), nb::arg("src"), nb::arg("newValue")) + .def("atomic_add", &Connection::atomicAdd, nb::arg("dst"), nb::arg("dstOffset"), nb::arg("value")) .def("flush", &Connection::flush, nb::call_guard(), nb::arg("timeoutUsec") = (int64_t)3e7) .def("transport", &Connection::transport) .def("remote_transport", &Connection::remoteTransport) diff --git a/src/connection.cc b/src/connection.cc index b3165a863..c3ac74852 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -13,6 +13,7 @@ #include #include "api.h" +#include "connection_kernels.hpp" #include "context.hpp" #include "debug.h" #include "endpoint.hpp" @@ -141,6 +142,17 @@ void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, #endif } +void CudaIpcConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) { + validateTransport(dst, remoteTransport()); + + uint64_t* dstPtr = reinterpret_cast(reinterpret_cast(dst.data()) + dstOffset); + void* args[] = {reinterpret_cast(&dstPtr), &value}; + + stream_->launch(connectionAtomicAddKernelFunc(), dim3(1), dim3(1), args, 0); + + INFO(MSCCLPP_P2P, "CudaIpcConnection atomicAdd: value %lu to %p", value, dstPtr); +} + void CudaIpcConnection::flush(int64_t timeoutUsec) { #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY) NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_CUDA_IPC_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0); @@ -244,6 +256,19 @@ void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint6 #endif } +void IBConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) { + validateTransport(dst, remoteTransport()); + auto dstTransportInfo = getImpl(dst).getTransportInfo(remoteTransport()); + if (dstTransportInfo.ibLocal) { + throw Error("dst is local, which is not supported", ErrorCode::InvalidUsage); + } + + auto dstMrInfo = dstTransportInfo.ibMrInfo; + qp_.lock()->stageAtomicAdd(dstTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset, value, /*signaled=*/true); + qp_.lock()->postSend(); + INFO(MSCCLPP_NET, "IBConnection atomicAdd: value %lu to %p", value, (uint8_t*)dstMrInfo.addr + dstOffset); +} + void IBConnection::flush(int64_t timeoutUsec) { #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_IB_FLUSH_ENTRY) NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_IB_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0); @@ -409,6 +434,11 @@ void EthernetConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, #endif } +void EthernetConnection::atomicAdd([[maybe_unused]] RegisteredMemory dst, [[maybe_unused]] uint64_t dstOffset, + [[maybe_unused]] uint64_t value) { + throw mscclpp::Error("EthernetConnection does not support atomicAdd", ErrorCode::InvalidUsage); +} + void EthernetConnection::flush(int64_t) { #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY) NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_FLUSH_ENTRY, 0, 0, *NpKit::GetCpuTimestamp(), 0); diff --git a/src/connection_kernels.cu b/src/connection_kernels.cu new file mode 100644 index 000000000..a56f75081 --- /dev/null +++ b/src/connection_kernels.cu @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include + +#include "connection_kernels.hpp" + +namespace mscclpp { + +__global__ void connectionAtomicAddKernel(uint64_t* dst, uint64_t value) { + atomicFetchAdd(dst, value, memoryOrderRelaxed); +} + +const void* connectionAtomicAddKernelFunc() { + static const void* func = reinterpret_cast(&connectionAtomicAddKernel); + return func; +} + +} // namespace mscclpp diff --git a/src/context.cc b/src/context.cc index b58987b20..a2abe92dc 100644 --- a/src/context.cc +++ b/src/context.cc @@ -35,6 +35,11 @@ void CudaIpcStream::memcpyH2D(void *dst, const void *src, size_t nbytes) { dirty_ = true; } +void CudaIpcStream::launch(const void *func, dim3 gridDim, dim3 blockDim, void **args, size_t sharedMem) { + setStreamIfNeeded(); + MSCCLPP_CUDATHROW(cudaLaunchKernel(func, gridDim, blockDim, args, sharedMem, *stream_)); +} + void CudaIpcStream::sync() { setStreamIfNeeded(); if (dirty_) { diff --git a/src/include/connection.hpp b/src/include/connection.hpp index ae6ea8d0f..9c7876f7e 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -28,8 +28,11 @@ class CudaIpcConnection : public Connection { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; + void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; + void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override; + void flush(int64_t timeoutUsec) override; }; @@ -51,8 +54,11 @@ class IBConnection : public Connection { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; + void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; + void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override; + void flush(int64_t timeoutUsec) override; }; @@ -82,8 +88,11 @@ class EthernetConnection : public Connection { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; + void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; + void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, uint64_t value) override; + void flush(int64_t timeoutUsec) override; }; diff --git a/src/include/connection_kernels.hpp b/src/include/connection_kernels.hpp new file mode 100644 index 000000000..f99dc65f3 --- /dev/null +++ b/src/include/connection_kernels.hpp @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef MSCCLPP_CONNECTION_KERNEL_HPP_ +#define MSCCLPP_CONNECTION_KERNEL_HPP_ + +namespace mscclpp { + +const void *connectionAtomicAddKernelFunc(); + +} // namespace mscclpp + +#endif // MSCCLPP_CONNECTION_KERNEL_HPP_ diff --git a/src/include/context.hpp b/src/include/context.hpp index 51cf3c375..ed624d989 100644 --- a/src/include/context.hpp +++ b/src/include/context.hpp @@ -28,6 +28,8 @@ class CudaIpcStream { void memcpyH2D(void *dst, const void *src, size_t nbytes); + void launch(const void *func, dim3 gridDim, dim3 blockDim, void **args, size_t sharedMem); + void sync(); operator cudaStream_t() const { return *stream_; } diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index 212f31fec..ad2187246 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -287,4 +287,97 @@ TEST_F(CommunicatorTest, WriteWithHostSemaphores) { ASSERT_TRUE(testWriteCorrectness()); communicator->bootstrap()->barrier(); -} \ No newline at end of file +} + +TEST_F(CommunicatorTest, ConcurrentAtomicAdd) { + if (gEnv->rank >= numRanksToUse) return; + + mscclpp::TransportFlags cudaIpcOrIb = mscclpp::AllIBTransports | mscclpp::Transport::CudaIpc; + + // Check if any connections support atomicAdd + bool hasAtomicAddSupport = false; + int numAtomicAddConnections = 0; + for (auto& entry : connections) { + auto& conn = entry.second; + if (cudaIpcOrIb.has(conn->transport())) { + hasAtomicAddSupport = true; + if (entry.first == 0) { // Only count connections to rank 0 + numAtomicAddConnections++; + } + } + } + + if (!hasAtomicAddSupport) { + GTEST_SKIP() << "No connections support atomicAdd in this configuration"; + return; + } + + // Initialize device buffers with zeros for atomic add test + size_t dataCount = deviceBufferSize / sizeof(uint64_t); + for (int n = 0; n < (int)devicePtr.size(); n++) { + std::vector hostBuffer(dataCount, 0); + mscclpp::gpuMemcpy(reinterpret_cast(devicePtr[n].get()), hostBuffer.data(), dataCount, + cudaMemcpyHostToDevice); + } + communicator->bootstrap()->barrier(); + + // All ranks (except rank 0) atomic add to offset 0 in rank 0's memory + if (gEnv->rank != 0) { + auto it = connections.find(0); + if (it != connections.end()) { + auto& conn = it->second; + auto& rank0Memory = remoteMemory[0].at(0); + + // Check if connection to rank 0 supports atomicAdd + if (cudaIpcOrIb.has(conn->transport())) { + try { + // Each rank adds its rank value to offset 0 in rank 0's memory + uint64_t valueToAdd = gEnv->rank; + conn->atomicAdd(rank0Memory, 0, valueToAdd); + conn->flush(); + } catch (const mscclpp::Error& e) { + if (e.getErrorCode() == mscclpp::ErrorCode::InvalidUsage) { + // Connection doesn't support atomicAdd, skip + } else { + throw; + } + } + } + } + } + communicator->bootstrap()->barrier(); + + // Only rank 0 checks the result + if (gEnv->rank == 0) { + // Calculate expected sum: sum of ranks that successfully added (excluding rank 0) + uint64_t expectedSum = 0; + for (int i = 1; i < gEnv->worldSize; i++) { + auto it = connections.find(i); + if (it != connections.end()) { + auto& conn = it->second; + if (cudaIpcOrIb.has(conn->transport())) { + expectedSum += i; // Each rank i adds its rank value + } + } + } + + // Poll until the atomic additions are complete + bool ready = false; + int niter = 0; + do { + std::vector hostBuffer(dataCount, 0); + mscclpp::gpuMemcpy(hostBuffer.data(), reinterpret_cast(devicePtr[0].get()), dataCount, + cudaMemcpyDeviceToHost); + + uint64_t actualSum = hostBuffer[0]; + ready = (actualSum == expectedSum); + + niter++; + if (niter == 10000) { + FAIL() << "Polling is stuck. Expected sum: " << expectedSum << ", Actual sum: " << actualSum; + } + } while (!ready); + } + + communicator->bootstrap()->barrier(); +}