Skip to content

Commit d0aa1ff

Browse files
authored
perf: Add Comet config for native Iceberg reader's data file concurrency (#3584)
1 parent ea5dc31 commit d0aa1ff

File tree

5 files changed

+24
-2
lines changed

5 files changed

+24
-2
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,17 @@ object CometConf extends ShimCometConf {
148148
.booleanConf
149149
.createWithDefault(false)
150150

151+
val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] =
152+
conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit")
153+
.category(CATEGORY_SCAN)
154+
.doc(
155+
"The number of Iceberg data files to read concurrently within a single task. " +
156+
"Higher values improve throughput for tables with many small files by overlapping " +
157+
"I/O latency, but increase memory usage. Values between 2 and 8 are suggested.")
158+
.intConf
159+
.checkValue(v => v > 0, "Data file concurrency limit must be positive")
160+
.createWithDefault(1)
161+
151162
val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
152163
conf("spark.comet.scan.csv.v2.enabled")
153164
.category(CATEGORY_TESTING)

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ pub struct IcebergScanExec {
6161
catalog_properties: HashMap<String, String>,
6262
/// Pre-planned file scan tasks
6363
tasks: Vec<FileScanTask>,
64+
/// Number of data files to read concurrently
65+
data_file_concurrency_limit: usize,
6466
/// Metrics
6567
metrics: ExecutionPlanMetricsSet,
6668
}
@@ -71,6 +73,7 @@ impl IcebergScanExec {
7173
schema: SchemaRef,
7274
catalog_properties: HashMap<String, String>,
7375
tasks: Vec<FileScanTask>,
76+
data_file_concurrency_limit: usize,
7477
) -> Result<Self, ExecutionError> {
7578
let output_schema = schema;
7679
let plan_properties = Self::compute_properties(Arc::clone(&output_schema), 1);
@@ -83,6 +86,7 @@ impl IcebergScanExec {
8386
plan_properties,
8487
catalog_properties,
8588
tasks,
89+
data_file_concurrency_limit,
8690
metrics,
8791
})
8892
}
@@ -158,7 +162,7 @@ impl IcebergScanExec {
158162

159163
let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io)
160164
.with_batch_size(batch_size)
161-
.with_data_file_concurrency_limit(context.session_config().target_partitions())
165+
.with_data_file_concurrency_limit(self.data_file_concurrency_limit)
162166
.with_row_selection_enabled(true)
163167
.build();
164168

native/core/src/execution/planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,12 +1176,14 @@ impl PhysicalPlanner {
11761176
.collect();
11771177
let metadata_location = common.metadata_location.clone();
11781178
let tasks = parse_file_scan_tasks_from_common(common, &scan.file_scan_tasks)?;
1179+
let data_file_concurrency_limit = common.data_file_concurrency_limit as usize;
11791180

11801181
let iceberg_scan = IcebergScanExec::new(
11811182
metadata_location,
11821183
required_schema,
11831184
catalog_properties,
11841185
tasks,
1186+
data_file_concurrency_limit,
11851187
)?;
11861188

11871189
Ok((

native/proto/src/proto/operator.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@ message IcebergScanCommon {
175175
repeated PartitionData partition_data_pool = 9;
176176
repeated DeleteFileList delete_files_pool = 10;
177177
repeated spark.spark_expression.Expr residual_pool = 11;
178+
179+
// Number of data files to read concurrently within a single task
180+
uint32 data_file_concurrency_limit = 12;
178181
}
179182

180183
message IcebergScan {

spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeExec}
3131
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceRDD, DataSourceRDDPartition}
3232
import org.apache.spark.sql.types._
3333

34-
import org.apache.comet.ConfigEntry
34+
import org.apache.comet.{CometConf, ConfigEntry}
3535
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
3636
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
3737
import org.apache.comet.serde.ExprOuterClass.Expr
@@ -757,6 +757,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
757757
var totalTasks = 0
758758

759759
commonBuilder.setMetadataLocation(metadata.metadataLocation)
760+
commonBuilder.setDataFileConcurrencyLimit(
761+
CometConf.COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT.get())
760762
metadata.catalogProperties.foreach { case (key, value) =>
761763
commonBuilder.putCatalogProperties(key, value)
762764
}

0 commit comments

Comments
 (0)