Skip to content

Commit db44ddd

Browse files
[V1.0.6] Refactored and added new logic for builder
1 parent cf13dd5 commit db44ddd

File tree

13 files changed

+623
-132
lines changed

13 files changed

+623
-132
lines changed

engine/.DS_Store

6 KB
Binary file not shown.

engine/Cargo.lock

Lines changed: 455 additions & 126 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
[workspace]
2+
members = ["py_lib"]
3+
14
[package]
25
name = "engine"
36
version = "0.1.0"
@@ -12,4 +15,8 @@ tempfile = "3.3.0"
1215
async-trait = "0.1"
1316
object_store = { version = "0.11.2", features=["aws"] }
1417
regex = "1.11.1"
15-
url = "2.3.1"
18+
url = "2.3.1"
19+
20+
[lib]
21+
name = "engine"
22+
crate-type = ["rlib"]

engine/py_lib/Cargo.toml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "py_lib"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
pyo3 = { version = "0.18", features = ["extension-module"] }
8+
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
9+
arrow = "54.0.0"
10+
arrow-array = "54.0.0"
11+
futures = "0.3"
12+
13+
engine = {path = "../"}
14+
15+
[lib]
16+
name = "sa_rust"
17+
crate-type = ["cdylib"]

engine/py_lib/src/lib.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use pyo3::prelude::PyResult;
2+
use pyo3::exceptions::PyRuntimeError;
3+
use engine::builder::pipelines;
4+
use pyo3::prelude::*;
5+
use pyo3_asyncio::tokio::future_into_py;
6+
7+
8+
async fn py_sa_to_arrow_ipc_pipeline(stm: &str) -> PyResult<Vec<u8>> {
9+
Ok(
10+
pipelines::sa_to_arrow_ipc_pipeline(stm).await.map_err(|e| {
11+
PyRuntimeError::new_err(format!("Failed to convert PyError: {}", e))
12+
}
13+
)?
14+
)
15+
}
16+
17+
18+
#[pyfunction]
19+
fn execute_sql<'py>(
20+
py: Python<'py>,
21+
query: String,
22+
) -> PyResult<&'py PyAny> {
23+
future_into_py(py, async move { py_sa_to_arrow_ipc_pipeline(query.as_str()).await })
24+
}
25+
26+
27+
/// Python module definition
28+
#[warn(unused_variables)]
29+
#[pymodule]
30+
fn sa_rust(py: Python, m: &PyModule) -> PyResult<()> {
31+
m.add_function(wrap_pyfunction!(execute_sql, m)?)?;
32+
Ok(())
33+
}

engine/py_lib/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
fn main() {
2+
println!("Hello, world!");
3+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use engine::builder::pipelines::sa_to_arrow_ipc_pipeline;
2+
use datafusion::common::Result;
3+
4+
#[tokio::main]
5+
async fn main() -> Result<()>{
6+
let stm: &str = r#"SELECT
7+
st."id" AS "Student Id",
8+
st."name" AS "Student Name",
9+
st."age" AS "Student Age",
10+
s."subject" AS "Subject",
11+
s."score" AS "Score"
12+
FROM
13+
"file:///Users/nathanngo/Projects/my/SQLAnyWhere/engine/.data/bin/ex-local-storage-application/students.csv" AS st
14+
JOIN
15+
"file:///Users/nathanngo/Projects/my/SQLAnyWhere/engine/.data/bin/ex-local-storage-application/scores.csv" AS s
16+
ON
17+
st.id = s.student_id
18+
"#;
19+
let buffer: Vec<u8> = sa_to_arrow_ipc_pipeline(stm).await?;
20+
println!("{:?}", buffer);
21+
Ok(())
22+
}

engine/src/builder/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod pipelines;

engine/src/builder/pipelines.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use crate::datafusion::SaDataFusion;
2+
use crate::object_storage::{SaS3, SaLocalStorage};
3+
use crate::helper;
4+
use std::sync::Arc;
5+
use datafusion::common::Result;
6+
use datafusion::datasource::file_format::{
7+
FileFormat,
8+
csv::CsvFormat,
9+
parquet::ParquetFormat
10+
};
11+
use datafusion::prelude::DataFrame;
12+
use datafusion::arrow::array::RecordBatch;
13+
use datafusion::arrow::ipc::writer::StreamWriter;
14+
use std::env;
15+
16+
17+
async fn sa_query(sa_datafusion: SaDataFusion, stm: &str) -> Result<DataFrame> {
18+
let uris: Vec<&str> = helper::sql_parser(stm);
19+
for uri in uris {
20+
println!("[sa_query]: Detected URI: {}", uri);
21+
let file_format: Arc<dyn FileFormat> = if uri.ends_with(".csv") {
22+
Arc::new(CsvFormat::default())
23+
} else if uri.ends_with(".parquet") {
24+
Arc::new(ParquetFormat::default())
25+
} else {
26+
panic!("[sa_query]: Unsupported file format")
27+
};
28+
29+
if uri.starts_with("file://") {
30+
let local_storage: SaLocalStorage = SaLocalStorage::new_with_file_uri(uri)
31+
.init_table_provider(
32+
&sa_datafusion,
33+
file_format,
34+
Some(false)
35+
).await?;
36+
sa_datafusion.register_sa_storage(Arc::new(local_storage)).await?;
37+
38+
}
39+
else if uri.starts_with("s3://") {
40+
let s3_region: String = env::var("AWS_S3_REGION").unwrap_or("us-east-1".to_string());
41+
let s3_storage: SaS3 = SaS3::new_from_s3_uri(uri)
42+
.init_table_provider(
43+
s3_region.as_str(),
44+
&sa_datafusion,
45+
file_format,
46+
Some(false)
47+
).await?;
48+
sa_datafusion.register_sa_storage(Arc::new(s3_storage)).await?;
49+
}
50+
else {
51+
panic!("[sa_query]: Unsupported file protocal")
52+
}
53+
}
54+
let df: DataFrame = sa_datafusion.execute_sql(&stm.to_string()).await?;
55+
Ok(df)
56+
}
57+
58+
59+
pub async fn sa_to_dataframe_pipeline(stm: &str) -> Result<DataFrame> {
60+
let sa_datafusion: SaDataFusion = SaDataFusion::new();
61+
Ok(sa_query(sa_datafusion, stm).await?)
62+
}
63+
64+
65+
pub async fn sa_to_arrow_ipc_pipeline(stm: &str) -> Result<Vec<u8>> {
66+
let sa_datafusion: SaDataFusion = SaDataFusion::new();
67+
let df: DataFrame = sa_query(sa_datafusion, stm).await?;
68+
let record_batches: Vec<RecordBatch> = df.collect().await?;
69+
// Serialize RecordBatches to Arrow IPC
70+
let mut buffer: Vec<u8> = Vec::new();
71+
{
72+
let mut writer: StreamWriter<&mut Vec<u8>> = StreamWriter::try_new(&mut buffer, &record_batches[0].schema())?;
73+
for batch in record_batches {
74+
writer.write(&batch)?
75+
}
76+
writer.finish()?;
77+
}
78+
79+
Ok(buffer)
80+
}

engine/src/datafusion/datafusion.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use datafusion::execution::SessionState;
99
use url::Url;
1010

1111

12+
#[derive(Clone)]
1213
pub struct SaDataFusion {
1314
pub ctx: SessionContext,
1415
}
@@ -27,7 +28,6 @@ impl SaDataFusion {
2728
self.ctx.sql(stm).await
2829
}
2930

30-
3131
pub fn register_object_store(&self, url: &Url, object_store: Arc<dyn ObjectStore>) {
3232
self.ctx.runtime_env().register_object_store(url, object_store);
3333
}

0 commit comments

Comments
 (0)