Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/common/ref_resource_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#include <type_traits> // for is_reference_v, remove_reference_t, is_same_v
#include <utility> // for swap, move

#include "io.h" // for ResourceHandler, AlignedResourceReadStream, MallocResource
#include "io.h" // for ResourceHandler, AlignedResourceReadStream, MallocResource
#include "threading_utils.h" // for ParallelForBlock
#include "xgboost/context.h" // for Context
#include "xgboost/logging.h"
#include "xgboost/span.h" // for Span

Expand Down Expand Up @@ -166,6 +168,22 @@ template <typename T>
return ref;
}

/**
* @brief Make a fixed size `RefResourceView` with malloc resource.
*
* Use n_threads to initialize the storage
*/
template <typename T>
[[nodiscard]] RefResourceView<T> MakeFixedVecWithMalloc(Context const* ctx, std::size_t n_elements,
T const& init) {
auto resource = std::make_shared<common::MallocResource>(n_elements * sizeof(T));
auto ref = RefResourceView{resource->DataAs<T>(), n_elements, resource};
common::ParallelForBlock(n_elements, ctx->Threads(), [&](auto&& block) {
std::fill_n(ref.data() + block.begin(), block.Size(), init);
});
return ref;
}

template <typename T>
class ReallocVector : public RefResourceView<T> {
static_assert(!std::is_reference_v<T>);
Expand Down
80 changes: 46 additions & 34 deletions src/common/threading_utils.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2019-2025, XGBoost Contributors
* Copyright 2019-2026, XGBoost Contributors
*/
#ifndef XGBOOST_COMMON_THREADING_UTILS_H_
#define XGBOOST_COMMON_THREADING_UTILS_H_
Expand Down Expand Up @@ -103,9 +103,7 @@ class BlockedSpace2d {
}

// Amount of blocks(tasks) in a space
[[nodiscard]] std::size_t Size() const {
return ranges_.size();
}
[[nodiscard]] std::size_t Size() const { return ranges_.size(); }

// get index of the first dimension of i-th block(task)
[[nodiscard]] std::size_t GetFirstDimension(std::size_t i) const {
Expand Down Expand Up @@ -136,7 +134,6 @@ class BlockedSpace2d {
std::vector<std::size_t> first_dimension_;
};


// Wrapper to implement nested parallelism with simple omp parallel for
template <typename Func>
void ParallelFor2d(const BlockedSpace2d& space, std::int32_t n_threads, Func&& func) {
Expand Down Expand Up @@ -200,48 +197,48 @@ void ParallelFor(Index size, std::int32_t n_threads, Sched sched, Func&& fn) {

dmlc::OMPException exc;
switch (sched.sched) {
case Sched::kAuto: {
case Sched::kAuto: {
#pragma omp parallel for num_threads(n_threads)
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
}
break;
}
case Sched::kDynamic: {
if (sched.chunk == 0) {
#pragma omp parallel for num_threads(n_threads) schedule(dynamic)
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
}
} else {
break;
}
case Sched::kDynamic: {
if (sched.chunk == 0) {
#pragma omp parallel for num_threads(n_threads) schedule(dynamic)
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
}
} else {
#pragma omp parallel for num_threads(n_threads) schedule(dynamic, sched.chunk)
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
}
}
break;
}
break;
}
case Sched::kStatic: {
if (sched.chunk == 0) {
case Sched::kStatic: {
if (sched.chunk == 0) {
#pragma omp parallel for num_threads(n_threads) schedule(static)
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
}
} else {
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
}
} else {
#pragma omp parallel for num_threads(n_threads) schedule(static, sched.chunk)
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
}
}
break;
}
break;
}
case Sched::kGuided: {
case Sched::kGuided: {
#pragma omp parallel for num_threads(n_threads) schedule(guided)
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
for (OmpInd i = 0; i < length; ++i) {
exc.Run(fn, i);
}
break;
}
break;
}
}
exc.Rethrow();
}
Expand Down Expand Up @@ -273,6 +270,21 @@ void ParallelFor1d(Index size, std::int32_t n_threads, Func&& fn) {
});
}

/** @brief Use n_threads as the number of blocks. */
template <typename Index, typename Func>
void ParallelForBlock(Index size, std::int32_t n_threads, Func&& fn) {
static_assert(std::is_void_v<std::invoke_result_t<Func, common::Range1d>>);
std::size_t blk_size = size / n_threads + (size % n_threads > 0);
ParallelFor(n_threads, n_threads, [&](auto tid) {
auto blk_beg = tid * blk_size;
auto blk_end = std::min((tid + 1) * blk_size, size);
if (blk_end <= blk_beg) {
return;
}
fn(common::Range1d{blk_beg, blk_end});
});
}

inline std::int32_t OmpGetThreadLimit() {
std::int32_t limit = omp_get_thread_limit();
CHECK_GE(limit, 1) << "Invalid thread limit for OpenMP.";
Expand Down
40 changes: 21 additions & 19 deletions src/data/gradient_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ GHistIndexMatrix::GHistIndexMatrix(Context const *ctx, DMatrix *p_fmat, bst_bin_
cut = common::SketchOnDMatrix(ctx, p_fmat, max_bins_per_feat, sorted_sketch, hess);

const uint32_t nbins = cut.Ptrs().back();
hit_count = common::MakeFixedVecWithMalloc(nbins, std::size_t{0});
hit_count = common::MakeFixedVecWithMalloc(ctx, nbins, std::size_t{0});
hit_count_tloc_.resize(ctx->Threads() * nbins, 0);

size_t new_size = 1;
for (const auto &batch : p_fmat->GetBatches<SparsePage>()) {
new_size += batch.Size();
}

row_ptr = common::MakeFixedVecWithMalloc(new_size, std::size_t{0});
row_ptr = common::MakeFixedVecWithMalloc(ctx, new_size, std::size_t{0});

const bool isDense = p_fmat->IsDense();
this->isDense_ = isDense;
auto ft = p_fmat->Info().feature_types.ConstHostSpan();

for (const auto &batch : p_fmat->GetBatches<SparsePage>()) {
this->PushBatch(batch, ft, ctx->Threads());
this->PushBatch(ctx, batch, ft);
}
this->columns_ = std::make_unique<common::ColumnMatrix>();

Expand Down Expand Up @@ -83,32 +83,34 @@ GHistIndexMatrix::GHistIndexMatrix(Context const *, MetaInfo const &, EllpackPag

GHistIndexMatrix::~GHistIndexMatrix() = default;

void GHistIndexMatrix::PushBatch(SparsePage const &batch, common::Span<FeatureType const> ft,
int32_t n_threads) {
void GHistIndexMatrix::PushBatch(Context const *ctx, SparsePage const &batch,
common::Span<FeatureType const> ft) {
auto page = batch.GetView();
auto it = common::MakeIndexTransformIter([&](std::size_t ridx) { return page[ridx].size(); });
common::PartialSum(n_threads, it, it + page.Size(), static_cast<size_t>(0), row_ptr.begin());
common::PartialSum(ctx->Threads(), it, it + page.Size(), static_cast<size_t>(0), row_ptr.begin());
data::SparsePageAdapterBatch adapter_batch{page};
auto is_valid = [](auto) { return true; }; // SparsePage always contains valid entries
PushBatchImpl(n_threads, adapter_batch, 0, is_valid, ft);
auto is_valid = [](auto) {
return true;
}; // SparsePage always contains valid entries
PushBatchImpl(ctx, adapter_batch, 0, is_valid, ft);
}

GHistIndexMatrix::GHistIndexMatrix(SparsePage const &batch, common::Span<FeatureType const> ft,
common::HistogramCuts cuts, bst_bin_t max_bins_per_feat,
bool is_dense, double sparse_thresh, std::int32_t n_threads)
GHistIndexMatrix::GHistIndexMatrix(Context const *ctx, SparsePage const &batch,
common::Span<FeatureType const> ft, common::HistogramCuts cuts,
bst_bin_t max_bins_per_feat, bool is_dense, double sparse_thresh)
: cut{std::move(cuts)},
max_numeric_bins_per_feat{max_bins_per_feat},
base_rowid{batch.base_rowid},
isDense_{is_dense} {
CHECK_GE(n_threads, 1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the n_threads >= 1 checked elseware ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the number of threads comes from the Context class, it should go through this check:

n_threads = std::max(n_threads, 1);

CHECK_EQ(row_ptr.size(), 0);
row_ptr = common::MakeFixedVecWithMalloc(batch.Size() + 1, std::size_t{0});

const uint32_t nbins = cut.Ptrs().back();
hit_count = common::MakeFixedVecWithMalloc(nbins, std::size_t{0});
auto n_threads = ctx->Threads();
hit_count_tloc_.resize(n_threads * nbins, 0);

this->PushBatch(batch, ft, n_threads);
this->PushBatch(ctx, batch, ft);
this->columns_ = std::make_unique<common::ColumnMatrix>();
if (!std::isnan(sparse_thresh)) {
this->columns_->InitFromSparse(batch, *this, sparse_thresh, n_threads);
Expand Down Expand Up @@ -140,8 +142,8 @@ void GHistIndexMatrix::ResizeColumns(double sparse_thresh) {
this->columns_ = std::make_unique<common::ColumnMatrix>(*this, sparse_thresh);
}

void GHistIndexMatrix::ResizeIndex(const size_t n_index, const bool isDense) {
auto make_index = [this, n_index](auto t, common::BinTypeSize t_size) {
void GHistIndexMatrix::ResizeIndex(Context const *ctx, const size_t n_index, const bool isDense) {
auto make_index = [this, ctx, n_index](auto t, common::BinTypeSize t_size) {
// Must resize instead of allocating a new one. This function is called everytime a
// new batch is pushed, and we grow the size accordingly without loosing the data in
// the previous batches.
Expand All @@ -153,7 +155,7 @@ void GHistIndexMatrix::ResizeIndex(const size_t n_index, const bool isDense) {
decltype(this->data) new_vec;
if (!resource) {
CHECK(this->data.empty());
new_vec = common::MakeFixedVecWithMalloc(n_bytes, std::uint8_t{0});
new_vec = common::MakeFixedVecWithMalloc(ctx, n_bytes, std::uint8_t{0});
} else {
CHECK(resource->Type() == common::ResourceHandler::kMalloc);
auto malloc_resource = std::dynamic_pointer_cast<common::MallocResource>(resource);
Expand All @@ -165,8 +167,8 @@ void GHistIndexMatrix::ResizeIndex(const size_t n_index, const bool isDense) {
new_vec = {new_ptr, n_bytes / sizeof(std::uint8_t), malloc_resource};
}
this->data = std::move(new_vec);
this->index = common::Index{common::Span{data.data(), static_cast<size_t>(data.size())},
t_size};
this->index =
common::Index{common::Span{data.data(), static_cast<size_t>(data.size())}, t_size};
};

if ((MaxNumBinPerFeat() - 1 <= static_cast<int>(std::numeric_limits<uint8_t>::max())) &&
Expand Down Expand Up @@ -195,7 +197,7 @@ bst_bin_t GHistIndexMatrix::GetGindex(size_t ridx, size_t fidx) const {
return static_cast<bst_bin_t>(this->index[begin + fidx]);
}
auto end = RowIdx(ridx + 1);
auto const& cut_ptrs = cut.Ptrs();
auto const &cut_ptrs = cut.Ptrs();
auto f_begin = cut_ptrs[fidx];
auto f_end = cut_ptrs[fidx + 1];
return BinarySearchBin(begin, end, this->index, f_begin, f_end);
Expand Down
2 changes: 1 addition & 1 deletion src/data/gradient_index.cu
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ GHistIndexMatrix::GHistIndexMatrix(Context const* ctx, MetaInfo const& info,
this->cut.Values();
this->cut.MinValues();

this->ResizeIndex(info.num_nonzero_, page->IsDense());
this->ResizeIndex(ctx, info.num_nonzero_, page->IsDense());
if (page->IsDense()) {
this->index.SetBinOffset(page->Cuts().Ptrs());
}
Expand Down
19 changes: 9 additions & 10 deletions src/data/gradient_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class GHistIndexMatrix {
/**
* @brief Push a sparse page into the index matrix.
*/
void PushBatch(SparsePage const& batch, common::Span<FeatureType const> ft,
std::int32_t n_threads);
void PushBatch(Context const* ctx, SparsePage const& batch, common::Span<FeatureType const> ft);

template <typename Batch, typename BinIdxType, typename GetOffset, typename IsValid>
void SetIndexData(common::Span<BinIdxType> index_data_span, size_t rbegin,
Expand Down Expand Up @@ -112,16 +111,17 @@ class GHistIndexMatrix {
}

template <typename Batch, typename IsValid>
void PushBatchImpl(int32_t n_threads, Batch const& batch, size_t rbegin, IsValid&& is_valid,
void PushBatchImpl(Context const* ctx, Batch const& batch, size_t rbegin, IsValid&& is_valid,
common::Span<FeatureType const> ft) {
// The number of threads is pegged to the batch size. If the OMP block is parallelized
// on anything other than the batch/block size, it should be reassigned
auto n_threads = ctx->Threads();
size_t batch_threads =
std::max(static_cast<size_t>(1), std::min(batch.Size(), static_cast<size_t>(n_threads)));

auto n_bins_total = cut.TotalBins();
const size_t n_index = row_ptr[rbegin + batch.Size()]; // number of entries in this page
ResizeIndex(n_index, isDense_);
ResizeIndex(ctx, n_index, isDense_);
if (isDense_) {
index.SetBinOffset(cut.Ptrs());
}
Expand Down Expand Up @@ -174,8 +174,7 @@ class GHistIndexMatrix {
* @brief Constructor for Quantile DMatrix. Initialize basic information and prepare
* for push batch.
*/
GHistIndexMatrix(MetaInfo const& info, common::HistogramCuts&& cuts,
bst_bin_t max_bin_per_feat);
GHistIndexMatrix(MetaInfo const& info, common::HistogramCuts&& cuts, bst_bin_t max_bin_per_feat);

/**
* @brief Constructor for the external memory Quantile DMatrix. Initialize basic
Expand All @@ -194,9 +193,9 @@ class GHistIndexMatrix {
/**
* @brief Constructor for external memory.
*/
GHistIndexMatrix(SparsePage const& page, common::Span<FeatureType const> ft,
GHistIndexMatrix(Context const* ctx, SparsePage const& page, common::Span<FeatureType const> ft,
common::HistogramCuts cuts, bst_bin_t max_bins_per_feat, bool is_dense,
double sparse_thresh, std::int32_t n_threads);
double sparse_thresh);
GHistIndexMatrix(); // also for ext mem, empty ctor so that we can read the cache back.

/**
Expand All @@ -220,7 +219,7 @@ class GHistIndexMatrix {
common::PartialSum(n_threads, it, it + batch.Size(), prev_sum, row_ptr.begin() + rbegin);
auto is_valid = data::IsValidFunctor{missing};

PushBatchImpl(ctx->Threads(), batch, rbegin, is_valid, ft);
PushBatchImpl(ctx, batch, rbegin, is_valid, ft);

if (rbegin + batch.Size() == n_samples_total) {
// finished
Expand All @@ -233,7 +232,7 @@ class GHistIndexMatrix {
void PushAdapterBatchColumns(Context const* ctx, Batch const& batch, float missing,
size_t rbegin);

void ResizeIndex(const size_t n_index, const bool isDense);
void ResizeIndex(Context const* ctx, const size_t n_index, const bool isDense);

void GetFeatureCounts(size_t* counts) const {
auto nfeature = cut.Ptrs().size() - 1;
Expand Down
6 changes: 4 additions & 2 deletions src/data/gradient_index_page_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
namespace xgboost::data {
void GradientIndexPageSource::Fetch() {
if (!this->ReadCache()) {
auto ctx = Context{};
ctx.Init(Args{{"nthread", std::to_string(nthreads_)}});
// source is initialized to be the 0th page during construction, so when count_ is 0
// there's no need to increment the source.
if (this->count_ != 0 && !this->sync_) {
Expand All @@ -24,8 +26,8 @@ void GradientIndexPageSource::Fetch() {
CHECK_EQ(this->count_, this->source_->Iter());
auto const& csr = this->source_->Page();
CHECK_NE(this->cuts_.Values().size(), 0);
this->page_.reset(new GHistIndexMatrix{*csr, feature_types_, cuts_, max_bin_per_feat_,
is_dense_, sparse_thresh_, nthreads_});
this->page_.reset(new GHistIndexMatrix{&ctx, *csr, feature_types_, cuts_, max_bin_per_feat_,
is_dense_, sparse_thresh_});
this->WriteCache();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/data/gradient_index_page_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class GradientIndexPageSource
double sparse_thresh_;

public:
GradientIndexPageSource(float missing, std::int32_t nthreads, bst_feature_t n_features,
GradientIndexPageSource(Context const* ctx, float missing, bst_feature_t n_features,
bst_idx_t n_batches, std::shared_ptr<Cache> cache, BatchParam param,
common::HistogramCuts cuts, bool is_dense,
common::Span<FeatureType const> feature_types,
std::shared_ptr<SparsePageSource> source)
: PageSourceIncMixIn(missing, nthreads, n_features, n_batches, cache,
: PageSourceIncMixIn(missing, ctx->Threads(), n_features, n_batches, cache,
std::isnan(param.sparse_thresh)),
is_dense_{is_dense},
max_bin_per_feat_{param.max_bin},
Expand Down
Loading
Loading