Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,6 @@ using o2::monitoring::tags::Value;

namespace o2::framework::readers
{
auto setEOSCallback(InitContext& ic)
{
ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
[](EndOfStreamContext& eosc) {
auto& control = eosc.services().get<ControlService>();
control.endOfStream();
control.readyToQuit(QuitRequest::Me);
});
}

template <typename O>
static inline auto extractTypedOriginal(ProcessingContext& pc)
{
/// FIXME: this should be done in invokeProcess() as some of the originals may be compound tables
return O{pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable()};
}

template <typename... Os>
static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
{
return std::make_tuple(extractTypedOriginal<Os>(pc)...);
}

AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const& ctx)
{
// aod-parent-base-path-replacement is now a workflow option, so it needs to be
Expand Down
25 changes: 17 additions & 8 deletions Framework/AnalysisSupport/src/AODReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "Framework/DataProcessingHelpers.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/DataSpecViews.h"
#include "Framework/ConfigContext.h"
#include "Framework/DanglingEdgesContext.h"

Expand All @@ -29,6 +30,7 @@ struct Buildable {
bool exclusive = false;
std::string binding;
std::vector<std::string> labels;
std::vector<framework::ConcreteDataMatcher> matchers;
header::DataOrigin origin;
header::DataDescription description;
header::DataHeader::SubSpecificationType version;
Expand All @@ -52,6 +54,7 @@ struct Buildable {

for (auto const& r : records) {
labels.emplace_back(r.label);
matchers.emplace_back(r.matcher);
}
outputSchema = std::make_shared<arrow::Schema>([](std::vector<o2::soa::IndexRecord> const& recs) {
std::vector<std::shared_ptr<arrow::Field>> fields;
Expand All @@ -68,6 +71,7 @@ struct Buildable {
return {
exclusive,
labels,
matchers,
records,
outputSchema,
origin,
Expand Down Expand Up @@ -105,6 +109,7 @@ namespace
struct Spawnable {
std::string binding;
std::vector<std::string> labels;
std::vector<framework::ConcreteDataMatcher> matchers;
std::vector<expressions::Projector> projectors;
std::vector<std::shared_ptr<gandiva::Expression>> expressions;
std::shared_ptr<arrow::Schema> outputSchema;
Expand Down Expand Up @@ -132,14 +137,17 @@ struct Spawnable {
o2::framework::addLabelToSchema(outputSchema, binding.c_str());

std::vector<std::shared_ptr<arrow::Schema>> schemas;
for (auto& i : spec.metadata) {
if (i.name.starts_with("input-schema:")) {
labels.emplace_back(i.name.substr(13));
iws.clear();
auto json = i.defaultValue.get<std::string>();
iws.str(json);
schemas.emplace_back(ArrowJSONHelpers::read(iws));
}
for (auto const& i : spec.metadata | views::filter_string_params_starts_with("input-schema:")) {
labels.emplace_back(i.name.substr(13));
iws.clear();
auto json = i.defaultValue.get<std::string>();
iws.str(json);
schemas.emplace_back(ArrowJSONHelpers::read(iws));
}
for (auto const& i : spec.metadata | views::filter_string_params_starts_with("input:") | std::ranges::views::transform([](auto const& param) {
return DataSpecUtils::fromMetadataString(param.defaultValue.template get<std::string>());
})) {
matchers.emplace_back(std::get<ConcreteDataMatcher>(i.matcher));
}

std::vector<std::shared_ptr<arrow::Field>> fields;
Expand Down Expand Up @@ -169,6 +177,7 @@ struct Spawnable {
return {
binding,
labels,
matchers,
expressions,
makeProjector(),
outputSchema,
Expand Down
7 changes: 3 additions & 4 deletions Framework/AnalysisSupport/src/AODWriterHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,12 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
}

// get the TableConsumer and corresponding arrow table
auto msg = pc.inputs().get(ref.spec->binding);
if (msg.header == nullptr) {
if (ref.header == nullptr) {
LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec));
continue;
}
auto s = pc.inputs().get<TableConsumer>(ref.spec->binding);
auto table = s->asArrowTable();

auto table = pc.inputs().get<TableConsumer>(std::get<ConcreteDataMatcher>(ref.spec->matcher))->asArrowTable();
if (!table->Validate().ok()) {
LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName);
continue;
Expand Down
5 changes: 3 additions & 2 deletions Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
if (m.name.starts_with("input:")) {
auto name = m.name.substr(6);
schemaMetadata->Append("sourceTable", name);
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
continue;
}
// Ignore the non ccdb: entries
Expand All @@ -109,13 +110,13 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
for (auto& schema : schemas) {
std::vector<CCDBFetcherHelper::FetchOp> ops;
auto inputBinding = *schema->metadata()->Get("sourceTable");
auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher"));
auto outRouteDesc = *schema->metadata()->Get("outputRoute");
std::string outBinding = *schema->metadata()->Get("outputBinding");
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
"Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
auto ref = inputs.get<TableConsumer>(inputBinding);
auto table = ref->asArrowTable();
auto table = inputs.get<TableConsumer>(inputMatcher)->asArrowTable();
// FIXME: make the fTimestamp column configurable.
auto timestampColumn = table->GetColumnByName("fTimestamp");
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
Expand Down
63 changes: 54 additions & 9 deletions Framework/Core/include/Framework/ASoA.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#ifndef O2_FRAMEWORK_ASOA_H_
#define O2_FRAMEWORK_ASOA_H_

#include "Framework/ConcreteDataMatcher.h"
#include "Framework/Pack.h" // IWYU pragma: export
#include "Framework/FunctionalHelpers.h" // IWYU pragma: export
#include "Headers/DataHeader.h" // IWYU pragma: export
Expand Down Expand Up @@ -375,6 +376,12 @@ consteval const char* signature()
return o2::aod::Hash<R.desc_hash>::str;
}

template <soa::TableRef R>
constexpr framework::ConcreteDataMatcher matcher()
{
return {origin<R>(), description(signature<R>()), R.version};
}

/// hash identification concepts
template <typename T>
concept is_aod_hash = requires(T t) { t.hash; t.str; };
Expand Down Expand Up @@ -1393,6 +1400,12 @@ static constexpr std::pair<bool, std::string> hasKey(std::string const& key)
return {hasColumnForKey(typename aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::columns{}, key), aod::label<ref>()};
}

template <TableRef ref>
static constexpr std::pair<bool, framework::ConcreteDataMatcher> hasKeyM(std::string const& key)
{
return {hasColumnForKey(typename aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::columns{}, key), aod::matcher<ref>()};
}

template <typename... C>
static constexpr auto haveKey(framework::pack<C...>, std::string const& key)
{
Expand Down Expand Up @@ -1427,6 +1440,31 @@ static constexpr std::string getLabelFromTypeForKey(std::string const& key)
O2_BUILTIN_UNREACHABLE();
}

template <with_originals T, bool OPT = false>
static constexpr framework::ConcreteDataMatcher getMatcherFromTypeForKey(std::string const& key)
{
if constexpr (T::originals.size() == 1) {
auto locate = hasKeyM<T::originals[0]>(key);
if (locate.first) {
return locate.second;
}
} else {
auto locate = [&]<size_t... Is>(std::index_sequence<Is...>) {
return std::vector{hasKeyM<T::originals[Is]>(key)...};
}(std::make_index_sequence<T::originals.size()>{});
auto it = std::find_if(locate.begin(), locate.end(), [](auto const& x) { return x.first; });
if (it != locate.end()) {
return it->second;
}
}
if constexpr (!OPT) {
notFoundColumn(getLabelFromType<std::decay_t<T>>().data(), key.data());
} else {
return framework::ConcreteDataMatcher{header::DataOrigin{"AOD"}, header::DataDescription{"[MISSING]"}, 0};
}
O2_BUILTIN_UNREACHABLE();
}

template <typename B, typename... C>
consteval static bool hasIndexTo(framework::pack<C...>&&)
{
Expand Down Expand Up @@ -1477,15 +1515,18 @@ struct PreslicePolicyGeneral : public PreslicePolicyBase {
std::span<const int64_t> getSliceFor(int value) const;
};

template <typename T, typename Policy, bool OPT = false>
template <typename T>
concept is_preslice_policy = std::derived_from<T, PreslicePolicyBase>;

template <typename T, is_preslice_policy Policy, bool OPT = false>
struct PresliceBase : public Policy {
constexpr static bool optional = OPT;
using target_t = T;
using policy_t = Policy;
const std::string binding;

PresliceBase(expressions::BindingNode index_)
: Policy{PreslicePolicyBase{{o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name})}, Entry(o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name}), std::string{index_.name})}, {}}
: Policy{PreslicePolicyBase{{o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name})}, Entry(o2::soa::getLabelFromTypeForKey<T, OPT>(std::string{index_.name}), o2::soa::getMatcherFromTypeForKey<T, OPT>(std::string{index_.name}), std::string{index_.name})}, {}}
{
}

Expand Down Expand Up @@ -1520,7 +1561,11 @@ template <typename T>
using PresliceOptional = PresliceBase<T, PreslicePolicySorted, true>;

template <typename T>
concept is_preslice = std::derived_from<T, PreslicePolicyBase>;
concept is_preslice = std::derived_from<T, PreslicePolicyBase>&&
requires(T)
{
T::optional;
};

/// Can be user to group together a number of Preslice declaration
/// to avoid the limit of 100 data members per task
Expand Down Expand Up @@ -1667,10 +1712,10 @@ auto doFilteredSliceBy(T const* table, o2::framework::PresliceBase<C, framework:
return prepareFilteredSlice(table, slice, offset);
}

template <typename T>
template <soa::is_table T>
auto doSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
{
auto localCache = cache.ptr->getCacheFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
auto localCache = cache.ptr->getCacheFor({"", o2::soa::getMatcherFromTypeForKey<T>(node.name), node.name});
auto [offset, count] = localCache.getSliceFor(value);
auto t = typename T::self_t({table->asArrowTable()->Slice(static_cast<uint64_t>(offset), count)}, static_cast<uint64_t>(offset));
if (t.tableSize() != 0) {
Expand All @@ -1679,19 +1724,19 @@ auto doSliceByCached(T const* table, framework::expressions::BindingNode const&
return t;
}

template <typename T>
template <soa::is_filtered_table T>
auto doFilteredSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
{
auto localCache = cache.ptr->getCacheFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
auto localCache = cache.ptr->getCacheFor({"", o2::soa::getMatcherFromTypeForKey<T>(node.name), node.name});
auto [offset, count] = localCache.getSliceFor(value);
auto slice = table->asArrowTable()->Slice(static_cast<uint64_t>(offset), count);
return prepareFilteredSlice(table, slice, offset);
}

template <typename T>
template <soa::is_table T>
auto doSliceByCachedUnsorted(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
{
auto localCache = cache.ptr->getCacheUnsortedFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
auto localCache = cache.ptr->getCacheUnsortedFor({"", o2::soa::getMatcherFromTypeForKey<T>(node.name), node.name});
if constexpr (soa::is_filtered_table<T>) {
auto t = typename T::self_t({table->asArrowTable()}, localCache.getSliceFor(value));
if (t.tableSize() != 0) {
Expand Down
7 changes: 5 additions & 2 deletions Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace o2::soa
{
struct IndexRecord {
std::string label;
framework::ConcreteDataMatcher matcher;
std::string columnLabel;
IndexKind kind;
int pos;
Expand Down Expand Up @@ -142,6 +143,7 @@ std::vector<std::shared_ptr<arrow::Table>> extractSources(ProcessingContext& pc,
struct Spawner {
std::string binding;
std::vector<std::string> labels;
std::vector<framework::ConcreteDataMatcher> matchers;
std::vector<std::shared_ptr<gandiva::Expression>> expressions;
std::shared_ptr<gandiva::Projector> projector = nullptr;
std::shared_ptr<arrow::Schema> schema = nullptr;
Expand All @@ -157,6 +159,7 @@ struct Spawner {
struct Builder {
bool exclusive;
std::vector<std::string> labels;
std::vector<framework::ConcreteDataMatcher> matchers;
std::vector<o2::soa::IndexRecord> records;
std::shared_ptr<arrow::Schema> outputSchema;
header::DataOrigin origin;
Expand Down Expand Up @@ -258,9 +261,9 @@ inline constexpr auto getIndexMapping()
([&idx]<TableRef ref, typename C>() mutable {
constexpr auto pos = o2::aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::template getIndexPosToKey<Key>();
if constexpr (pos == -1) {
idx.emplace_back(o2::aod::label<ref>(), C::columnLabel(), IndexKind::IdxSelf, pos);
idx.emplace_back(o2::aod::label<ref>(), o2::aod::matcher<ref>(), C::columnLabel(), IndexKind::IdxSelf, pos);
} else {
idx.emplace_back(o2::aod::label<ref>(), C::columnLabel(), getIndexKind<typename C::type>(), pos);
idx.emplace_back(o2::aod::label<ref>(), o2::aod::matcher<ref>(), C::columnLabel(), getIndexKind<typename C::type>(), pos);
}
}.template operator()<refs[Is], typename framework::pack_element_t<Is, indices>>(),
...);
Expand Down
8 changes: 4 additions & 4 deletions Framework/Core/include/Framework/AnalysisManagers.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ template <size_t N, std::array<soa::TableRef, N> refs>
static inline auto extractOriginals(ProcessingContext& pc)
{
return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
return {pc.inputs().get<TableConsumer>(o2::aod::matcher<refs[Is]>())->asArrowTable()...};
}(std::make_index_sequence<refs.size()>());
}
} // namespace
Expand Down Expand Up @@ -151,7 +151,7 @@ template <typename T>
concept with_base_table = requires { T::base_specs(); };

template <with_base_table T>
bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
bool requestInputs(std::vector<InputSpec>& inputs, T const& /*entity*/)
{
auto base_specs = T::base_specs();
for (auto base_spec : base_specs) {
Expand Down Expand Up @@ -586,7 +586,7 @@ bool registerCache(T& preslice, Cache& bsks, Cache&)
return true;
}
}
auto locate = std::find_if(bsks.begin(), bsks.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
auto locate = std::find(bsks.begin(), bsks.end(), preslice.getBindingKey());
if (locate == bsks.end()) {
bsks.emplace_back(preslice.getBindingKey());
} else if (locate->enabled == false) {
Expand All @@ -604,7 +604,7 @@ bool registerCache(T& preslice, Cache&, Cache& bsksU)
return true;
}
}
auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
auto locate = std::find(bsksU.begin(), bsksU.end(), preslice.getBindingKey());
if (locate == bsksU.end()) {
bsksU.emplace_back(preslice.getBindingKey());
} else if (locate->enabled == false) {
Expand Down
8 changes: 4 additions & 4 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ struct AnalysisDataProcessorBuilder {
auto key = std::string{"fIndex"} + o2::framework::cutString(soa::getLabelFromType<std::decay_t<G>>());
([&bk, &bku, &key, enabled]() mutable {
if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
auto binding = soa::getLabelFromTypeForKey<std::decay_t<As>>(key);
Entry e{soa::getLabelFromTypeForKey<std::decay_t<As>>(key), soa::getMatcherFromTypeForKey<std::decay_t<As>>(key), key, enabled};
if constexpr (o2::soa::is_smallgroups<std::decay_t<As>>) {
framework::updatePairList(bku, binding, key, enabled);
framework::updatePairList(bku, e);
} else {
framework::updatePairList(bk, binding, key, enabled);
framework::updatePairList(bk, e);
}
}
}(),
Expand Down Expand Up @@ -214,7 +214,7 @@ struct AnalysisDataProcessorBuilder {
template <soa::TableRef R>
static auto extractTableFromRecord(InputRecord& record)
{
auto table = record.get<TableConsumer>(o2::aod::label<R>())->asArrowTable();
auto table = record.get<TableConsumer>(o2::aod::matcher<R>())->asArrowTable();
if (table->num_rows() == 0) {
table = makeEmptyTable<R>();
}
Expand Down
Loading