Skip to content

Commit 9ff131f

Browse files
feat(datafusion): Add $snapshots system table (#264)
1 parent 553e4a3 commit 9ff131f

5 files changed

Lines changed: 323 additions & 1 deletion

File tree

crates/integrations/datafusion/src/system_tables/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,15 @@ use crate::error::to_datafusion_error;
3131

3232
mod options;
3333
mod schemas;
34+
mod snapshots;
3435

3536
type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
3637

37-
const TABLES: &[(&str, Builder)] = &[("options", options::build), ("schemas", schemas::build)];
38+
const TABLES: &[(&str, Builder)] = &[
39+
("options", options::build),
40+
("schemas", schemas::build),
41+
("snapshots", snapshots::build),
42+
];
3843

3944
/// Parse a Paimon object name into `(base_table, optional system_table_name)`.
4045
///
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Mirrors Java [SnapshotsTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java).
19+
20+
use std::any::Any;
21+
use std::sync::{Arc, OnceLock};
22+
23+
use async_trait::async_trait;
24+
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray, TimestampMillisecondArray};
25+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
26+
use datafusion::catalog::Session;
27+
use datafusion::datasource::memory::MemorySourceConfig;
28+
use datafusion::datasource::{TableProvider, TableType};
29+
use datafusion::error::Result as DFResult;
30+
use datafusion::logical_expr::Expr;
31+
use datafusion::physical_plan::ExecutionPlan;
32+
use paimon::table::{SnapshotManager, Table};
33+
34+
use crate::error::to_datafusion_error;
35+
36+
pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
37+
Ok(Arc::new(SnapshotsTable { table }))
38+
}
39+
40+
fn snapshots_schema() -> SchemaRef {
41+
static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
42+
SCHEMA
43+
.get_or_init(|| {
44+
Arc::new(Schema::new(vec![
45+
Field::new("snapshot_id", DataType::Int64, false),
46+
Field::new("schema_id", DataType::Int64, false),
47+
Field::new("commit_user", DataType::Utf8, false),
48+
Field::new("commit_identifier", DataType::Int64, false),
49+
Field::new("commit_kind", DataType::Utf8, false),
50+
Field::new(
51+
"commit_time",
52+
DataType::Timestamp(TimeUnit::Millisecond, None),
53+
false,
54+
),
55+
Field::new("base_manifest_list", DataType::Utf8, false),
56+
Field::new("delta_manifest_list", DataType::Utf8, false),
57+
Field::new("changelog_manifest_list", DataType::Utf8, true),
58+
Field::new("total_record_count", DataType::Int64, true),
59+
Field::new("delta_record_count", DataType::Int64, true),
60+
Field::new("changelog_record_count", DataType::Int64, true),
61+
Field::new("watermark", DataType::Int64, true),
62+
Field::new("next_row_id", DataType::Int64, true),
63+
]))
64+
})
65+
.clone()
66+
}
67+
68+
#[derive(Debug)]
69+
struct SnapshotsTable {
70+
table: Table,
71+
}
72+
73+
#[async_trait]
74+
impl TableProvider for SnapshotsTable {
75+
fn as_any(&self) -> &dyn Any {
76+
self
77+
}
78+
79+
fn schema(&self) -> SchemaRef {
80+
snapshots_schema()
81+
}
82+
83+
fn table_type(&self) -> TableType {
84+
TableType::View
85+
}
86+
87+
async fn scan(
88+
&self,
89+
_state: &dyn Session,
90+
projection: Option<&Vec<usize>>,
91+
_filters: &[Expr],
92+
_limit: Option<usize>,
93+
) -> DFResult<Arc<dyn ExecutionPlan>> {
94+
let sm = SnapshotManager::new(
95+
self.table.file_io().clone(),
96+
self.table.location().to_string(),
97+
);
98+
let snapshots = sm.list_all().await.map_err(to_datafusion_error)?;
99+
100+
let n = snapshots.len();
101+
let mut snapshot_ids = Vec::with_capacity(n);
102+
let mut schema_ids = Vec::with_capacity(n);
103+
let mut commit_users: Vec<String> = Vec::with_capacity(n);
104+
let mut commit_identifiers = Vec::with_capacity(n);
105+
let mut commit_kinds: Vec<String> = Vec::with_capacity(n);
106+
let mut commit_times = Vec::with_capacity(n);
107+
let mut base_manifest_lists: Vec<String> = Vec::with_capacity(n);
108+
let mut delta_manifest_lists: Vec<String> = Vec::with_capacity(n);
109+
let mut changelog_manifest_lists: Vec<Option<String>> = Vec::with_capacity(n);
110+
let mut total_record_counts: Vec<Option<i64>> = Vec::with_capacity(n);
111+
let mut delta_record_counts: Vec<Option<i64>> = Vec::with_capacity(n);
112+
let mut changelog_record_counts: Vec<Option<i64>> = Vec::with_capacity(n);
113+
let mut watermarks: Vec<Option<i64>> = Vec::with_capacity(n);
114+
let mut next_row_ids: Vec<Option<i64>> = Vec::with_capacity(n);
115+
116+
for snap in &snapshots {
117+
snapshot_ids.push(snap.id());
118+
schema_ids.push(snap.schema_id());
119+
commit_users.push(snap.commit_user().to_string());
120+
commit_identifiers.push(snap.commit_identifier());
121+
commit_kinds.push(snap.commit_kind().to_string());
122+
commit_times.push(snap.time_millis() as i64);
123+
base_manifest_lists.push(snap.base_manifest_list().to_string());
124+
delta_manifest_lists.push(snap.delta_manifest_list().to_string());
125+
changelog_manifest_lists.push(snap.changelog_manifest_list().map(str::to_string));
126+
total_record_counts.push(snap.total_record_count());
127+
delta_record_counts.push(snap.delta_record_count());
128+
changelog_record_counts.push(snap.changelog_record_count());
129+
watermarks.push(snap.watermark());
130+
next_row_ids.push(snap.next_row_id());
131+
}
132+
133+
let schema = snapshots_schema();
134+
let batch = RecordBatch::try_new(
135+
schema.clone(),
136+
vec![
137+
Arc::new(Int64Array::from(snapshot_ids)),
138+
Arc::new(Int64Array::from(schema_ids)),
139+
Arc::new(StringArray::from(commit_users)),
140+
Arc::new(Int64Array::from(commit_identifiers)),
141+
Arc::new(StringArray::from(commit_kinds)),
142+
Arc::new(TimestampMillisecondArray::from(commit_times)),
143+
Arc::new(StringArray::from(base_manifest_lists)),
144+
Arc::new(StringArray::from(delta_manifest_lists)),
145+
Arc::new(StringArray::from(changelog_manifest_lists)),
146+
Arc::new(Int64Array::from(total_record_counts)),
147+
Arc::new(Int64Array::from(delta_record_counts)),
148+
Arc::new(Int64Array::from(changelog_record_counts)),
149+
Arc::new(Int64Array::from(watermarks)),
150+
Arc::new(Int64Array::from(next_row_ids)),
151+
],
152+
)?;
153+
154+
Ok(MemorySourceConfig::try_new_exec(
155+
&[vec![batch]],
156+
schema,
157+
projection.cloned(),
158+
)?)
159+
}
160+
}

crates/integrations/datafusion/tests/system_tables.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,3 +251,83 @@ async fn test_missing_base_table_for_system_table_errors() {
251251
"expected error to mention both base table and system name, got: {msg}"
252252
);
253253
}
254+
255+
#[tokio::test]
256+
async fn test_snapshots_system_table() {
257+
let (ctx, catalog, _tmp) = create_context().await;
258+
let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$snapshots");
259+
let batches = run_sql(&ctx, &sql).await;
260+
261+
assert!(!batches.is_empty(), "$snapshots should return ≥1 batch");
262+
263+
let arrow_schema = batches[0].schema();
264+
let expected_columns = [
265+
("snapshot_id", DataType::Int64),
266+
("schema_id", DataType::Int64),
267+
("commit_user", DataType::Utf8),
268+
("commit_identifier", DataType::Int64),
269+
("commit_kind", DataType::Utf8),
270+
(
271+
"commit_time",
272+
DataType::Timestamp(TimeUnit::Millisecond, None),
273+
),
274+
("base_manifest_list", DataType::Utf8),
275+
("delta_manifest_list", DataType::Utf8),
276+
("changelog_manifest_list", DataType::Utf8),
277+
("total_record_count", DataType::Int64),
278+
("delta_record_count", DataType::Int64),
279+
("changelog_record_count", DataType::Int64),
280+
("watermark", DataType::Int64),
281+
("next_row_id", DataType::Int64),
282+
];
283+
for (i, (name, dtype)) in expected_columns.iter().enumerate() {
284+
let field = arrow_schema.field(i);
285+
assert_eq!(field.name(), name, "column {i} name");
286+
assert_eq!(field.data_type(), dtype, "column {i} type");
287+
}
288+
289+
// Row count must match the snapshot directory listing.
290+
let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string());
291+
let table = catalog
292+
.get_table(&identifier)
293+
.await
294+
.expect("fixture table should load");
295+
let sm =
296+
paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string());
297+
let all = sm.list_all().await.expect("list_all should succeed");
298+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
299+
assert_eq!(
300+
total_rows,
301+
all.len(),
302+
"$snapshots rows should match list_all() length"
303+
);
304+
305+
// snapshot_id column must be ascending.
306+
let mut ids: Vec<i64> = Vec::new();
307+
for batch in &batches {
308+
let col = batch
309+
.column(0)
310+
.as_any()
311+
.downcast_ref::<Int64Array>()
312+
.expect("snapshot_id is Int64");
313+
for i in 0..batch.num_rows() {
314+
ids.push(col.value(i));
315+
}
316+
}
317+
let mut sorted = ids.clone();
318+
sorted.sort_unstable();
319+
assert_eq!(ids, sorted, "snapshot_id should be ascending");
320+
321+
// commit_kind column must contain a known variant.
322+
let last_batch = batches.last().unwrap();
323+
let kind_col = last_batch
324+
.column(4)
325+
.as_any()
326+
.downcast_ref::<StringArray>()
327+
.expect("commit_kind is Utf8");
328+
let kind = kind_col.value(last_batch.num_rows() - 1);
329+
assert!(
330+
["APPEND", "COMPACT", "OVERWRITE", "ANALYZE"].contains(&kind),
331+
"unexpected commit_kind: {kind}"
332+
);
333+
}

crates/paimon/src/spec/snapshot.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ pub enum CommitKind {
3535
ANALYZE,
3636
}
3737

38+
impl std::fmt::Display for CommitKind {
39+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40+
match self {
41+
Self::APPEND => write!(f, "APPEND"),
42+
Self::COMPACT => write!(f, "COMPACT"),
43+
Self::OVERWRITE => write!(f, "OVERWRITE"),
44+
Self::ANALYZE => write!(f, "ANALYZE"),
45+
}
46+
}
47+
}
48+
3849
/// Snapshot for paimon.
3950
///
4051
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/Snapshot.java#L68>.

crates/paimon/src/table/snapshot_manager.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
//! Reference:[org.apache.paimon.utils.SnapshotManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java).
2121
use crate::io::FileIO;
2222
use crate::spec::Snapshot;
23+
use futures::future::try_join_all;
2324
use std::str;
2425

2526
const SNAPSHOT_DIR: &str = "snapshot";
@@ -150,6 +151,37 @@ impl SnapshotManager {
150151
self.find_by_list_files(i64::min).await
151152
}
152153

154+
/// List all snapshot ids sorted ascending. Returns an empty vector when
155+
/// the snapshot directory does not exist.
156+
pub async fn list_all_ids(&self) -> crate::Result<Vec<i64>> {
157+
let snapshot_dir = self.snapshot_dir();
158+
let statuses = match self.file_io.list_status(&snapshot_dir).await {
159+
Ok(s) => s,
160+
Err(crate::Error::IoUnexpected { ref source, .. })
161+
if source.kind() == opendal::ErrorKind::NotFound =>
162+
{
163+
return Ok(Vec::new());
164+
}
165+
Err(e) => return Err(e),
166+
};
167+
let mut ids: Vec<i64> = statuses
168+
.into_iter()
169+
.filter(|s| !s.is_dir)
170+
.filter_map(|s| {
171+
let name = s.path.rsplit('/').next().unwrap_or(&s.path);
172+
name.strip_prefix(SNAPSHOT_PREFIX)?.parse::<i64>().ok()
173+
})
174+
.collect();
175+
ids.sort_unstable();
176+
Ok(ids)
177+
}
178+
179+
/// List all snapshots sorted by id ascending.
180+
pub async fn list_all(&self) -> crate::Result<Vec<Snapshot>> {
181+
let ids = self.list_all_ids().await?;
182+
try_join_all(ids.into_iter().map(|id| self.get_snapshot(id))).await
183+
}
184+
153185
/// Get a snapshot by id.
154186
pub async fn get_snapshot(&self, snapshot_id: i64) -> crate::Result<Snapshot> {
155187
let snapshot_path = self.snapshot_path(snapshot_id);
@@ -366,4 +398,38 @@ mod tests {
366398
let hint = sm.read_hint(&sm.latest_hint_path()).await;
367399
assert_eq!(hint, Some(42));
368400
}
401+
402+
#[tokio::test]
403+
async fn test_list_all_ids_empty() {
404+
let (_, sm) = setup("memory:/test_list_empty").await;
405+
assert!(sm.list_all_ids().await.unwrap().is_empty());
406+
}
407+
408+
#[tokio::test]
409+
async fn test_list_all_ids_missing_dir_returns_empty() {
410+
let file_io = test_file_io();
411+
let sm = SnapshotManager::new(file_io, "memory:/test_list_missing".to_string());
412+
assert!(sm.list_all_ids().await.unwrap().is_empty());
413+
assert!(sm.list_all().await.unwrap().is_empty());
414+
}
415+
416+
#[tokio::test]
417+
async fn test_list_all_ids_sorted() {
418+
let (_, sm) = setup("memory:/test_list_sorted").await;
419+
for id in [3, 1, 2] {
420+
sm.commit_snapshot(&test_snapshot(id)).await.unwrap();
421+
}
422+
assert_eq!(sm.list_all_ids().await.unwrap(), vec![1, 2, 3]);
423+
}
424+
425+
#[tokio::test]
426+
async fn test_list_all_loads_in_order() {
427+
let (_, sm) = setup("memory:/test_list_all").await;
428+
for id in [2, 1, 3] {
429+
sm.commit_snapshot(&test_snapshot(id)).await.unwrap();
430+
}
431+
let snaps = sm.list_all().await.unwrap();
432+
let ids: Vec<i64> = snaps.iter().map(|s| s.id()).collect();
433+
assert_eq!(ids, vec![1, 2, 3]);
434+
}
369435
}

0 commit comments

Comments
 (0)