Skip to content

Commit 5949cee

Browse files
authored
feat: support arrow_result_version. (#769)
1 parent 9144be3 commit 5949cee

14 files changed

Lines changed: 162 additions & 67 deletions

File tree

.github/workflows/bindings.python.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ jobs:
138138
- name: Prepare
139139
working-directory: tests
140140
run: make up
141+
env:
142+
DATABEND_META_VERSION: v1.2.898-nightly
143+
DATABEND_QUERY_VERSION: v1.2.898-nightly
141144
- name: Download artifact
142145
uses: actions/download-artifact@v4
143146
with:

cli/src/main.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -360,11 +360,11 @@ pub async fn main() -> Result<()> {
360360
println!("Authenticate failed wrong password user {user}");
361361
return Ok(());
362362
}
363-
databend_driver::Error::Arrow(arrow::error::ArrowError::IpcError(ipc_err)) => {
364-
if ipc_err.contains("Unauthenticated") {
365-
println!("Authenticate failed wrong password user {user}");
366-
return Ok(());
367-
}
363+
databend_driver::Error::Arrow(arrow::error::ArrowError::IpcError(ipc_err))
364+
if ipc_err.contains("Unauthenticated") =>
365+
{
366+
println!("Authenticate failed wrong password user {user}");
367+
return Ok(());
368368
}
369369
_ => {}
370370
}

cli/src/session.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,15 @@ impl Session {
107107
}
108108
databend_driver::Error::Arrow(arrow::error::ArrowError::IpcError(
109109
ref ipc_err,
110-
)) => {
111-
if ipc_err.contains("Unauthenticated")
112-
|| ipc_err.contains("Connection refused")
113-
{
114-
return Err(err.into());
115-
}
110+
)) if (ipc_err.contains("Unauthenticated")
111+
|| ipc_err.contains("Connection refused")) =>
112+
{
113+
return Err(err.into());
116114
}
117115
databend_driver::Error::Api(databend_client::Error::Request(
118116
ref resp_err,
119-
)) => {
120-
if resp_err.contains("error sending request for url") {
121-
return Err(err.into());
122-
}
117+
)) if resp_err.contains("error sending request for url") => {
118+
return Err(err.into());
123119
}
124120
_ => {}
125121
}

core/src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ impl APIClient {
598598
// body
599599
let session_state = self.session_state();
600600
let need_sticky = session_state.need_sticky.unwrap_or(false);
601-
let req = QueryRequest::new(sql)
601+
let mut req = QueryRequest::new(sql)
602602
.with_pagination(self.make_pagination())
603603
.with_session(Some(session_state))
604604
.with_stage_attachment(stage_attachment_config);
@@ -609,6 +609,7 @@ impl APIClient {
609609
if self.capability.arrow_data && self.query_result_format == "arrow" && !force_json_body {
610610
debug!("accept arrow data");
611611
headers.insert(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_ARROW_OR_JSON));
612+
req = req.with_arrow();
612613
}
613614

614615
if need_sticky {

core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,6 @@ pub use presign::PresignedResponse;
4444
pub use response::QueryStats;
4545
pub use response::SchemaField;
4646
pub use settings::GeometryDataType;
47+
pub use settings::QueryResultFormatSettings;
4748
pub use settings::ResultFormatSettings;
4849
pub use stage::StageLocation;

core/src/pages.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@ use crate::client::QueryState;
1616
use crate::error::Result;
1717
use crate::response::QueryResponse;
1818
use crate::schema::Schema;
19-
use crate::settings::ResultFormatSettings;
19+
use crate::settings::{QueryResultFormatSettings, ResultFormatSettings};
2020
use crate::{APIClient, Error, QueryStats, SchemaField};
2121
use arrow_array::RecordBatch;
2222
use log::debug;
2323
use parking_lot::Mutex;
24-
use std::collections::BTreeMap;
2524
use std::future::Future;
2625
use std::mem;
2726
use std::pin::Pin;
@@ -36,7 +35,7 @@ pub struct Page {
3635
pub data: Vec<Vec<Option<String>>>,
3736
pub batches: Vec<RecordBatch>,
3837
pub stats: QueryStats,
39-
pub settings: Option<BTreeMap<String, String>>,
38+
pub settings: Option<QueryResultFormatSettings>,
4039
}
4140

4241
impl Page {
@@ -51,6 +50,9 @@ impl Page {
5150
}
5251

5352
pub fn update(&mut self, p: Page) {
53+
if self.settings.is_none() {
54+
self.settings = p.settings.clone();
55+
}
5456
self.raw_schema = p.raw_schema;
5557
if self.data.is_empty() {
5658
self.data = p.data
@@ -130,7 +132,7 @@ impl Pages {
130132
s.try_into()
131133
.map_err(|e| Error::Decode(format!("fail to decode string schema: {e}")))?
132134
};
133-
let settings = ResultFormatSettings::from_map(&page.settings)?;
135+
let settings = ResultFormatSettings::try_from(&page.settings)?;
134136

135137
self.add_back(page);
136138
let last_access_time = self.last_access_time.clone();

core/src/request.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ pub struct QueryRequest<'a> {
2626
pagination: Option<PaginationConfig>,
2727
#[serde(skip_serializing_if = "Option::is_none")]
2828
stage_attachment: Option<StageAttachmentConfig<'a>>,
29+
#[serde(skip_serializing_if = "Option::is_none")]
30+
arrow_result_version_max: Option<i64>,
2931
}
3032

3133
#[derive(Serialize, Debug)]
@@ -54,6 +56,7 @@ impl<'r, 't: 'r> QueryRequest<'r> {
5456
sql,
5557
pagination: None,
5658
stage_attachment: None,
59+
arrow_result_version_max: None,
5760
}
5861
}
5962

@@ -74,6 +77,11 @@ impl<'r, 't: 'r> QueryRequest<'r> {
7477
self.stage_attachment = stage_attachment;
7578
self
7679
}
80+
81+
pub fn with_arrow(mut self) -> Self {
82+
self.arrow_result_version_max = Some(2);
83+
self
84+
}
7785
}
7886

7987
#[cfg(test)]

core/src/response.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
use crate::error_code::ErrorCode;
1616
use crate::session::SessionState;
17+
use crate::settings::QueryResultFormatSettings;
1718
use serde::{Deserialize, Serialize};
18-
use std::collections::BTreeMap;
1919

2020
#[derive(Deserialize, Debug, Default)]
2121
pub struct QueryStats {
@@ -78,7 +78,7 @@ pub struct QueryResponse {
7878
pub schema: Vec<SchemaField>,
7979
pub data: Vec<Vec<Option<String>>>,
8080
pub state: String,
81-
pub settings: Option<BTreeMap<String, String>>,
81+
pub settings: Option<QueryResultFormatSettings>,
8282
pub error: Option<ErrorCode>,
8383
// make it optional for backward compatibility
8484
pub warnings: Option<Vec<String>>,

core/src/schema.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -471,21 +471,17 @@ fn parse_type_desc(s: &str) -> Result<TypeDesc<'_>> {
471471
start = i + 1;
472472
}
473473
}
474-
',' => {
475-
if depth == 1 {
476-
let s = &s[start..i];
477-
args.push(parse_type_desc(s)?);
478-
start = i + 1;
479-
}
474+
',' if depth == 1 => {
475+
let s = &s[start..i];
476+
args.push(parse_type_desc(s)?);
477+
start = i + 1;
480478
}
481-
' ' => {
482-
if depth == 0 {
483-
let s = &s[start..i];
484-
if !s.is_empty() {
485-
name = s;
486-
}
487-
start = i + 1;
479+
' ' if depth == 0 => {
480+
let s = &s[start..i];
481+
if !s.is_empty() {
482+
name = s;
488483
}
484+
start = i + 1;
489485
}
490486
_ => {}
491487
}

core/src/settings.rs

Lines changed: 108 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,51 +16,65 @@ use crate::error::Result;
1616
use crate::Error;
1717
use jiff::tz::TimeZone;
1818
use serde::Deserialize;
19-
use std::collections::BTreeMap;
2019
use std::str::FromStr;
2120

2221
#[derive(Debug, Clone)]
2322
pub struct ResultFormatSettings {
2423
pub geometry_output_format: GeometryDataType,
2524
pub timezone: TimeZone,
25+
pub arrow_result_version: Option<i64>,
26+
pub binary_output_format: BinaryFormat,
2627
}
2728

2829
impl Default for ResultFormatSettings {
2930
fn default() -> Self {
3031
Self {
3132
geometry_output_format: GeometryDataType::default(),
33+
binary_output_format: BinaryFormat::default(),
3234
timezone: TimeZone::UTC,
35+
arrow_result_version: None,
3336
}
3437
}
3538
}
3639

37-
impl ResultFormatSettings {
38-
pub fn from_map(settings: &Option<BTreeMap<String, String>>) -> Result<Self> {
39-
match settings {
40-
None => Ok(Default::default()),
41-
Some(settings) => {
42-
let timezone = match settings.get("timezone") {
43-
None => TimeZone::UTC,
44-
Some(t) => TimeZone::get(t).map_err(|e| Error::Decode(e.to_string()))?,
45-
};
46-
47-
let geometry_output_format = match settings.get("geometry_output_format") {
48-
None => GeometryDataType::default(),
49-
Some(t) => {
50-
GeometryDataType::from_str(t).map_err(|e| Error::Decode(e.to_string()))?
51-
}
52-
};
53-
54-
Ok(Self {
55-
timezone,
56-
geometry_output_format,
57-
})
58-
}
59-
}
40+
impl TryFrom<&Option<QueryResultFormatSettings>> for ResultFormatSettings {
41+
type Error = Error;
42+
43+
fn try_from(settings: &Option<QueryResultFormatSettings>) -> Result<Self> {
44+
let settings = settings.clone().unwrap_or_default();
45+
let timezone = match settings.timezone {
46+
None => TimeZone::UTC,
47+
Some(t) => TimeZone::get(&t).map_err(|e| Error::Decode(e.to_string()))?,
48+
};
49+
50+
let geometry_output_format = match settings.geometry_output_format {
51+
None => GeometryDataType::default(),
52+
Some(t) => GeometryDataType::from_str(&t).map_err(|e| Error::Decode(e.to_string()))?,
53+
};
54+
55+
let binary_output_format = match settings.binary_output_format {
56+
None => BinaryFormat::default(),
57+
Some(t) => BinaryFormat::from_str(&t).map_err(|e| Error::Decode(e.to_string()))?,
58+
};
59+
60+
Ok(Self {
61+
geometry_output_format,
62+
timezone,
63+
arrow_result_version: settings.arrow_result_version,
64+
binary_output_format,
65+
})
6066
}
6167
}
6268

63-
#[derive(Debug, Clone, Copy, Default, Deserialize)]
69+
#[derive(Debug, Clone, Default, Deserialize)]
70+
pub struct QueryResultFormatSettings {
71+
pub timezone: Option<String>,
72+
pub geometry_output_format: Option<String>,
73+
pub arrow_result_version: Option<i64>,
74+
pub binary_output_format: Option<String>,
75+
}
76+
77+
#[derive(Debug, Clone, Copy, Default, Deserialize, PartialEq, Eq)]
6478
pub enum GeometryDataType {
6579
WKB,
6680
WKT,
@@ -84,3 +98,72 @@ impl FromStr for GeometryDataType {
8498
}
8599
}
86100
}
101+
102+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
103+
pub enum BinaryFormat {
104+
#[default]
105+
Hex,
106+
Base64,
107+
Utf8,
108+
Utf8Lossy,
109+
}
110+
111+
impl FromStr for BinaryFormat {
112+
type Err = Error;
113+
114+
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
115+
match s.to_ascii_lowercase().as_str() {
116+
"hex" => Ok(BinaryFormat::Hex),
117+
"base64" => Ok(BinaryFormat::Base64),
118+
"utf-8" | "utf8" => Ok(BinaryFormat::Utf8),
119+
"utf-8-lossy" | "utf8-lossy" => Ok(BinaryFormat::Utf8Lossy),
120+
other => Err(Error::Decode(format!(
121+
"Invalid binary format '{other}', valid values: HEX | BASE64 | UTF-8 | UTF-8-LOSSY"
122+
))),
123+
}
124+
}
125+
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
use super::*;
130+
131+
#[test]
132+
fn deserialize_query_result_format_settings_from_strings() {
133+
let settings: QueryResultFormatSettings = serde_json::from_str(
134+
r#"{
135+
"timezone": "Asia/Shanghai",
136+
"geometry_output_format": "wkt",
137+
"arrow_result_version": 2,
138+
"binary_output_format": "utf-8"
139+
}"#,
140+
)
141+
.unwrap();
142+
143+
let settings = ResultFormatSettings::try_from(&Some(settings)).unwrap();
144+
assert_eq!(settings.geometry_output_format, GeometryDataType::WKT);
145+
assert_eq!(settings.arrow_result_version, Some(2));
146+
assert_eq!(settings.binary_output_format, BinaryFormat::Utf8);
147+
assert_eq!(settings.timezone.iana_name(), Some("Asia/Shanghai"));
148+
}
149+
150+
#[test]
151+
fn deserialize_query_result_format_settings_with_defaults() {
152+
let settings: QueryResultFormatSettings = serde_json::from_str(r#"{}"#).unwrap();
153+
154+
let settings = ResultFormatSettings::try_from(&Some(settings)).unwrap();
155+
assert_eq!(settings.geometry_output_format, GeometryDataType::default());
156+
assert_eq!(settings.arrow_result_version, None);
157+
assert_eq!(settings.binary_output_format, BinaryFormat::default());
158+
assert_eq!(settings.timezone.iana_name(), Some("UTC"));
159+
}
160+
161+
#[test]
162+
fn deserialize_query_result_format_settings_accepts_numeric_arrow_version() {
163+
let settings: QueryResultFormatSettings =
164+
serde_json::from_str(r#"{"arrow_result_version": 2}"#).unwrap();
165+
166+
let settings = ResultFormatSettings::try_from(&Some(settings)).unwrap();
167+
assert_eq!(settings.arrow_result_version, Some(2));
168+
}
169+
}

0 commit comments

Comments
 (0)