@@ -29,7 +29,7 @@ use crate::{
2929use arrow:: array:: { Array , RecordBatch , UInt32Array } ;
3030use arrow:: compute:: { take, TakeOptions } ;
3131use arrow:: datatypes:: DataType as ArrowDataType ;
32- use datafusion:: common:: { Result as DataFusionResult , ScalarValue } ;
32+ use datafusion:: common:: { DataFusionError , Result as DataFusionResult , ScalarValue } ;
3333use datafusion:: execution:: disk_manager:: DiskManagerMode ;
3434use datafusion:: execution:: memory_pool:: MemoryPool ;
3535use datafusion:: execution:: runtime_env:: RuntimeEnvBuilder ;
@@ -59,6 +59,7 @@ use datafusion_spark::function::string::concat::SparkConcat;
5959use datafusion_spark:: function:: string:: space:: SparkSpace ;
6060use futures:: poll;
6161use futures:: stream:: StreamExt ;
62+ use futures:: FutureExt ;
6263use jni:: objects:: JByteBuffer ;
6364use jni:: sys:: { jlongArray, JNI_FALSE } ;
6465use jni:: {
@@ -570,10 +571,29 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
570571 let ( tx, rx) = mpsc:: channel ( 2 ) ;
571572 let mut stream = stream;
572573 get_runtime ( ) . spawn ( async move {
573- while let Some ( batch) = stream. next ( ) . await {
574- if tx. send ( batch) . await . is_err ( ) {
575- break ;
574+ let result = std:: panic:: AssertUnwindSafe ( async {
575+ while let Some ( batch) = stream. next ( ) . await {
576+ if tx. send ( batch) . await . is_err ( ) {
577+ break ;
578+ }
576579 }
580+ } )
581+ . catch_unwind ( )
582+ . await ;
583+
584+ if let Err ( panic) = result {
585+ let msg = match panic. downcast_ref :: < & str > ( ) {
586+ Some ( s) => s. to_string ( ) ,
587+ None => match panic. downcast_ref :: < String > ( ) {
588+ Some ( s) => s. clone ( ) ,
589+ None => "unknown panic" . to_string ( ) ,
590+ } ,
591+ } ;
592+ let _ = tx
593+ . send ( Err ( DataFusionError :: Execution ( format ! (
594+ "native panic: {msg}"
595+ ) ) ) )
596+ . await ;
577597 }
578598 } ) ;
579599 exec_context. batch_receiver = Some ( rx) ;
0 commit comments