Skip to content

Commit a5224ea

Browse files
feat(datafusion): Add $manifests system table (#287)
1 parent f4754f6 commit a5224ea

7 files changed

Lines changed: 390 additions & 1 deletion

File tree

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Mirrors Java [ManifestsTable](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java).
19+
20+
use std::any::Any;
21+
use std::sync::{Arc, OnceLock};
22+
23+
use async_trait::async_trait;
24+
use datafusion::arrow::array::{new_null_array, Int64Array, RecordBatch, StringArray};
25+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
26+
use datafusion::catalog::Session;
27+
use datafusion::datasource::memory::MemorySourceConfig;
28+
use datafusion::datasource::{TableProvider, TableType};
29+
use datafusion::error::Result as DFResult;
30+
use datafusion::logical_expr::Expr;
31+
use datafusion::physical_plan::ExecutionPlan;
32+
use paimon::spec::{ManifestFileMeta, ManifestList};
33+
use paimon::table::{SnapshotManager, Table};
34+
35+
use crate::error::to_datafusion_error;
36+
37+
pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
38+
Ok(Arc::new(ManifestsTable { table }))
39+
}
40+
41+
fn manifests_schema() -> SchemaRef {
42+
static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
43+
SCHEMA
44+
.get_or_init(|| {
45+
Arc::new(Schema::new(vec![
46+
Field::new("file_name", DataType::Utf8, false),
47+
Field::new("file_size", DataType::Int64, false),
48+
Field::new("num_added_files", DataType::Int64, false),
49+
Field::new("num_deleted_files", DataType::Int64, false),
50+
Field::new("schema_id", DataType::Int64, false),
51+
Field::new("min_partition_stats", DataType::Utf8, true),
52+
Field::new("max_partition_stats", DataType::Utf8, true),
53+
Field::new("min_row_id", DataType::Int64, true),
54+
Field::new("max_row_id", DataType::Int64, true),
55+
]))
56+
})
57+
.clone()
58+
}
59+
60+
#[derive(Debug)]
61+
struct ManifestsTable {
62+
table: Table,
63+
}
64+
65+
#[async_trait]
66+
impl TableProvider for ManifestsTable {
67+
fn as_any(&self) -> &dyn Any {
68+
self
69+
}
70+
71+
fn schema(&self) -> SchemaRef {
72+
manifests_schema()
73+
}
74+
75+
fn table_type(&self) -> TableType {
76+
TableType::View
77+
}
78+
79+
async fn scan(
80+
&self,
81+
_state: &dyn Session,
82+
projection: Option<&Vec<usize>>,
83+
_filters: &[Expr],
84+
_limit: Option<usize>,
85+
) -> DFResult<Arc<dyn ExecutionPlan>> {
86+
let metas = collect_manifests(&self.table)
87+
.await
88+
.map_err(to_datafusion_error)?;
89+
90+
let n = metas.len();
91+
let mut file_names: Vec<String> = Vec::with_capacity(n);
92+
let mut file_sizes = Vec::with_capacity(n);
93+
let mut num_added = Vec::with_capacity(n);
94+
let mut num_deleted = Vec::with_capacity(n);
95+
let mut schema_ids = Vec::with_capacity(n);
96+
let mut min_row_ids: Vec<Option<i64>> = Vec::with_capacity(n);
97+
let mut max_row_ids: Vec<Option<i64>> = Vec::with_capacity(n);
98+
99+
for meta in metas {
100+
file_names.push(meta.file_name().to_string());
101+
file_sizes.push(meta.file_size());
102+
num_added.push(meta.num_added_files());
103+
num_deleted.push(meta.num_deleted_files());
104+
schema_ids.push(meta.schema_id());
105+
min_row_ids.push(meta.min_row_id());
106+
max_row_ids.push(meta.max_row_id());
107+
}
108+
109+
let schema = manifests_schema();
110+
let batch = RecordBatch::try_new(
111+
schema.clone(),
112+
vec![
113+
Arc::new(StringArray::from(file_names)),
114+
Arc::new(Int64Array::from(file_sizes)),
115+
Arc::new(Int64Array::from(num_added)),
116+
Arc::new(Int64Array::from(num_deleted)),
117+
Arc::new(Int64Array::from(schema_ids)),
118+
new_null_array(&DataType::Utf8, n),
119+
new_null_array(&DataType::Utf8, n),
120+
Arc::new(Int64Array::from(min_row_ids)),
121+
Arc::new(Int64Array::from(max_row_ids)),
122+
],
123+
)?;
124+
125+
Ok(MemorySourceConfig::try_new_exec(
126+
&[vec![batch]],
127+
schema,
128+
projection.cloned(),
129+
)?)
130+
}
131+
}
132+
133+
async fn collect_manifests(table: &Table) -> paimon::Result<Vec<ManifestFileMeta>> {
134+
let file_io = table.file_io();
135+
let sm = SnapshotManager::new(file_io.clone(), table.location().to_string());
136+
let snapshot = match sm.get_latest_snapshot().await? {
137+
Some(s) => s,
138+
None => return Ok(Vec::new()),
139+
};
140+
141+
let base_path = sm.manifest_path(snapshot.base_manifest_list());
142+
let delta_path = sm.manifest_path(snapshot.delta_manifest_list());
143+
let changelog_path = snapshot
144+
.changelog_manifest_list()
145+
.map(|c| sm.manifest_path(c));
146+
let base_fut = ManifestList::read(file_io, &base_path);
147+
let delta_fut = ManifestList::read(file_io, &delta_path);
148+
let changelog_fut = async {
149+
match &changelog_path {
150+
Some(p) => ManifestList::read(file_io, p).await,
151+
None => Ok(Vec::new()),
152+
}
153+
};
154+
let (base, delta, changelog) = futures::try_join!(base_fut, delta_fut, changelog_fut)?;
155+
let mut metas = base;
156+
metas.extend(delta);
157+
metas.extend(changelog);
158+
Ok(metas)
159+
}

crates/integrations/datafusion/src/system_tables/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use paimon::table::Table;
2929

3030
use crate::error::to_datafusion_error;
3131

32+
mod manifests;
3233
mod options;
3334
mod schemas;
3435
mod snapshots;
@@ -37,6 +38,7 @@ mod tags;
3738
type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
3839

3940
const TABLES: &[(&str, Builder)] = &[
41+
("manifests", manifests::build),
4042
("options", options::build),
4143
("schemas", schemas::build),
4244
("snapshots", snapshots::build),
@@ -131,6 +133,9 @@ mod tests {
131133
assert!(is_registered("tags"));
132134
assert!(is_registered("Tags"));
133135
assert!(is_registered("TAGS"));
136+
assert!(is_registered("manifests"));
137+
assert!(is_registered("Manifests"));
138+
assert!(is_registered("MANIFESTS"));
134139
assert!(!is_registered("nonsense"));
135140
}
136141

crates/integrations/datafusion/tests/system_tables.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,3 +410,83 @@ async fn test_tags_system_table_with_seeded_tags() {
410410
assert_eq!(names, vec!["v1".to_string(), "v2".to_string()]);
411411
assert_eq!(snap_ids, vec![earliest.id(), earliest.id()]);
412412
}
413+
414+
#[tokio::test]
415+
async fn test_manifests_system_table() {
416+
let (ctx, catalog, _tmp) = create_context().await;
417+
let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$manifests");
418+
let batches = run_sql(&ctx, &sql).await;
419+
420+
assert!(!batches.is_empty(), "$manifests should return ≥1 batch");
421+
let arrow_schema = batches[0].schema();
422+
let expected_columns = [
423+
("file_name", DataType::Utf8),
424+
("file_size", DataType::Int64),
425+
("num_added_files", DataType::Int64),
426+
("num_deleted_files", DataType::Int64),
427+
("schema_id", DataType::Int64),
428+
("min_partition_stats", DataType::Utf8),
429+
("max_partition_stats", DataType::Utf8),
430+
("min_row_id", DataType::Int64),
431+
("max_row_id", DataType::Int64),
432+
];
433+
for (i, (name, dtype)) in expected_columns.iter().enumerate() {
434+
let field = arrow_schema.field(i);
435+
assert_eq!(field.name(), name, "column {i} name");
436+
assert_eq!(field.data_type(), dtype, "column {i} type");
437+
}
438+
439+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
440+
assert!(total_rows > 0, "fixture should have at least one manifest");
441+
442+
for batch in &batches {
443+
let names = batch
444+
.column(0)
445+
.as_any()
446+
.downcast_ref::<StringArray>()
447+
.expect("file_name is Utf8");
448+
let sizes = batch
449+
.column(1)
450+
.as_any()
451+
.downcast_ref::<Int64Array>()
452+
.expect("file_size is Int64");
453+
for i in 0..batch.num_rows() {
454+
assert!(!names.value(i).is_empty(), "file_name must be non-empty");
455+
assert!(sizes.value(i) >= 0, "file_size must be non-negative");
456+
}
457+
}
458+
459+
let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string());
460+
let table = catalog.get_table(&identifier).await.unwrap();
461+
let sm =
462+
paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string());
463+
let latest = sm
464+
.get_latest_snapshot()
465+
.await
466+
.unwrap()
467+
.expect("fixture has snapshots");
468+
let mut expected = paimon::spec::ManifestList::read(
469+
table.file_io(),
470+
&sm.manifest_path(latest.base_manifest_list()),
471+
)
472+
.await
473+
.unwrap()
474+
.len();
475+
expected += paimon::spec::ManifestList::read(
476+
table.file_io(),
477+
&sm.manifest_path(latest.delta_manifest_list()),
478+
)
479+
.await
480+
.unwrap()
481+
.len();
482+
if let Some(changelog) = latest.changelog_manifest_list() {
483+
expected += paimon::spec::ManifestList::read(table.file_io(), &sm.manifest_path(changelog))
484+
.await
485+
.unwrap()
486+
.len();
487+
}
488+
assert_eq!(
489+
total_rows, expected,
490+
"$manifests rows should match base + delta + changelog manifest entries of the latest snapshot"
491+
);
492+
}

crates/paimon/src/spec/avro/manifest_file_meta_decode.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ impl AvroRecordDecode for ManifestFileMeta {
3333
let mut num_deleted_files: Option<i64> = None;
3434
let mut partition_stats: Option<BinaryTableStats> = None;
3535
let mut schema_id: Option<i64> = None;
36+
let mut min_row_id: Option<i64> = None;
37+
let mut max_row_id: Option<i64> = None;
3638

3739
for field in &writer_schema.fields {
3840
match field.name.as_str() {
@@ -50,6 +52,8 @@ impl AvroRecordDecode for ManifestFileMeta {
5052
decode_nullable_binary_table_stats(cursor, &field.schema, field.nullable)?;
5153
}
5254
"_SCHEMA_ID" => schema_id = Some(read_long_field(cursor, field.nullable)?),
55+
"_MIN_ROW_ID" => min_row_id = read_optional_long(cursor, field.nullable)?,
56+
"_MAX_ROW_ID" => max_row_id = read_optional_long(cursor, field.nullable)?,
5357
_ => skip_nullable_field(cursor, &field.schema, field.nullable)?,
5458
}
5559
}
@@ -62,10 +66,22 @@ impl AvroRecordDecode for ManifestFileMeta {
6266
num_deleted_files.unwrap_or(0),
6367
partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])),
6468
schema_id.unwrap_or(0),
69+
min_row_id,
70+
max_row_id,
6571
))
6672
}
6773
}
6874

75+
fn read_optional_long(cursor: &mut AvroCursor, nullable: bool) -> crate::Result<Option<i64>> {
76+
if nullable {
77+
let idx = cursor.read_union_index()?;
78+
if idx == 0 {
79+
return Ok(None);
80+
}
81+
}
82+
Ok(Some(cursor.read_long()?))
83+
}
84+
6985
/// Decode a nullable BinaryTableStats: union ["null", record] or direct record.
7086
pub(crate) fn decode_nullable_binary_table_stats(
7187
cursor: &mut AvroCursor,

crates/paimon/src/spec/manifest_file_meta.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,22 @@ pub struct ManifestFileMeta {
5050
/// schema id when writing this manifest file.
5151
#[serde(rename = "_SCHEMA_ID")]
5252
schema_id: i64,
53+
54+
/// minimum row id covered by this manifest, when row tracking is enabled.
55+
#[serde(
56+
rename = "_MIN_ROW_ID",
57+
default,
58+
skip_serializing_if = "Option::is_none"
59+
)]
60+
min_row_id: Option<i64>,
61+
62+
/// maximum row id covered by this manifest, when row tracking is enabled.
63+
#[serde(
64+
rename = "_MAX_ROW_ID",
65+
default,
66+
skip_serializing_if = "Option::is_none"
67+
)]
68+
max_row_id: Option<i64>,
5369
}
5470

5571
impl ManifestFileMeta {
@@ -94,6 +110,18 @@ impl ManifestFileMeta {
94110
self.version
95111
}
96112

113+
/// Get the minimum row id covered by this manifest (None when row tracking is disabled).
114+
#[inline]
115+
pub fn min_row_id(&self) -> Option<i64> {
116+
self.min_row_id
117+
}
118+
119+
/// Get the maximum row id covered by this manifest (None when row tracking is disabled).
120+
#[inline]
121+
pub fn max_row_id(&self) -> Option<i64> {
122+
self.max_row_id
123+
}
124+
97125
#[inline]
98126
pub fn new(
99127
file_name: String,
@@ -111,10 +139,13 @@ impl ManifestFileMeta {
111139
num_deleted_files,
112140
partition_stats,
113141
schema_id,
142+
min_row_id: None,
143+
max_row_id: None,
114144
}
115145
}
116146

117147
#[inline]
148+
#[allow(clippy::too_many_arguments)]
118149
pub(crate) fn new_with_version(
119150
version: i32,
120151
file_name: String,
@@ -123,6 +154,8 @@ impl ManifestFileMeta {
123154
num_deleted_files: i64,
124155
partition_stats: BinaryTableStats,
125156
schema_id: i64,
157+
min_row_id: Option<i64>,
158+
max_row_id: Option<i64>,
126159
) -> ManifestFileMeta {
127160
Self {
128161
version,
@@ -132,6 +165,8 @@ impl ManifestFileMeta {
132165
num_deleted_files,
133166
partition_stats,
134167
schema_id,
168+
min_row_id,
169+
max_row_id,
135170
}
136171
}
137172
}
@@ -156,7 +191,9 @@ pub const MANIFEST_FILE_META_SCHEMA: &str = r#"["null", {
156191
{"name": "_NULL_COUNTS", "type": ["null", {"type": "array", "items": ["null", "long"]}], "default": null}
157192
]
158193
}], "default": null},
159-
{"name": "_SCHEMA_ID", "type": "long"}
194+
{"name": "_SCHEMA_ID", "type": "long"},
195+
{"name": "_MIN_ROW_ID", "type": ["null", "long"], "default": null},
196+
{"name": "_MAX_ROW_ID", "type": ["null", "long"], "default": null}
160197
]
161198
}]"#;
162199

0 commit comments

Comments
 (0)