Skip to content

Commit acf5438

Browse files
authored
feat: introduce table scan to plan splits (#96)
1 parent 22bb7b7 commit acf5438

6 files changed

Lines changed: 215 additions & 9 deletions

File tree

crates/paimon/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ pub mod file_index;
2424
pub mod io;
2525
pub mod spec;
2626
mod table;
27-
pub use table::{DataSplit, Plan, Table};
27+
pub use table::{DataSplit, DataSplitBuilder, Plan, SnapshotManager, Table, TableScan};

crates/paimon/src/spec/manifest_entry.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub struct ManifestEntry {
4747
total_buckets: i32,
4848

4949
#[serde(rename = "_FILE")]
50-
file: DataFileMeta,
50+
pub(crate) file: DataFileMeta,
5151

5252
#[serde(rename = "_VERSION")]
5353
version: i32,
@@ -59,7 +59,8 @@ impl ManifestEntry {
5959
&self.kind
6060
}
6161

62-
fn partition(&self) -> &Vec<u8> {
62+
/// Partition bytes for this entry (for grouping splits).
63+
pub fn partition(&self) -> &[u8] {
6364
&self.partition
6465
}
6566

crates/paimon/src/spec/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,13 @@ pub use index_file_meta::*;
3939

4040
mod index_manifest;
4141
mod manifest;
42+
pub use manifest::Manifest;
4243
mod manifest_common;
44+
pub use manifest_common::FileKind;
4345
mod manifest_entry;
46+
pub use manifest_entry::ManifestEntry;
4447
mod objects_file;
48+
pub use objects_file::from_avro_bytes;
4549
mod stats;
4650
mod types;
4751
pub use types::*;

crates/paimon/src/table/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
2020
mod snapshot_manager;
2121
mod source;
22+
mod table_scan;
2223

23-
pub use source::{DataSplit, Plan};
24+
pub use snapshot_manager::SnapshotManager;
25+
pub use source::{DataSplit, DataSplitBuilder, Plan};
26+
pub use table_scan::TableScan;
2427

2528
use crate::catalog::Identifier;
2629
use crate::io::FileIO;
@@ -71,4 +74,11 @@ impl Table {
7174
pub fn file_io(&self) -> &FileIO {
7275
&self.file_io
7376
}
77+
78+
/// Create a table scan for full table read (no incremental, no predicate).
79+
///
80+
/// Reference: [pypaimon TableScan](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/table_scan.py).
81+
pub fn new_scan(&self) -> TableScan {
82+
TableScan::new(self.clone())
83+
}
7484
}

crates/paimon/src/table/source.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct DataSplit {
3434
partition: BinaryRow,
3535
bucket: i32,
3636
bucket_path: String,
37-
total_buckets: Option<i32>,
37+
total_buckets: i32,
3838
data_files: Vec<DataFileMeta>,
3939
}
4040

@@ -51,7 +51,7 @@ impl DataSplit {
5151
pub fn bucket_path(&self) -> &str {
5252
&self.bucket_path
5353
}
54-
pub fn total_buckets(&self) -> Option<i32> {
54+
pub fn total_buckets(&self) -> i32 {
5555
self.total_buckets
5656
}
5757

@@ -78,7 +78,7 @@ pub struct DataSplitBuilder {
7878
partition: Option<BinaryRow>,
7979
bucket: i32,
8080
bucket_path: Option<String>,
81-
total_buckets: Option<i32>,
81+
total_buckets: i32,
8282
data_files: Option<Vec<DataFileMeta>>,
8383
}
8484

@@ -89,7 +89,7 @@ impl DataSplitBuilder {
8989
partition: None,
9090
bucket: -1,
9191
bucket_path: None,
92-
total_buckets: None,
92+
total_buckets: -1,
9393
data_files: None,
9494
}
9595
}
@@ -110,10 +110,14 @@ impl DataSplitBuilder {
110110
self.bucket_path = Some(bucket_path);
111111
self
112112
}
113-
pub fn with_total_buckets(mut self, total_buckets: Option<i32>) -> Self {
113+
pub fn with_total_buckets(mut self, total_buckets: i32) -> Self {
114114
self.total_buckets = total_buckets;
115115
self
116116
}
117+
pub fn with_data_files(mut self, data_files: Vec<DataFileMeta>) -> Self {
118+
self.data_files = Some(data_files);
119+
self
120+
}
117121

118122
pub fn build(self) -> crate::Result<DataSplit> {
119123
if self.snapshot_id == -1 {
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! TableScan for full table scan.
19+
//!
20+
//! Reference: [pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/table_scan.py)
21+
//! and [FullStartingScanner](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/scanner/full_starting_scanner.py).
22+
23+
use super::Table;
24+
use crate::io::FileIO;
25+
use crate::spec::{BinaryRow, FileKind, ManifestEntry, Snapshot};
26+
use crate::table::source::{DataSplitBuilder, Plan};
27+
use crate::table::SnapshotManager;
28+
use crate::Error;
29+
use std::collections::{HashMap, HashSet};
30+
31+
/// Path segment for manifest directory under table.
32+
const MANIFEST_DIR: &str = "manifest";
33+
34+
/// Reads a manifest list file (Avro) and returns manifest file metas.
35+
async fn read_manifest_list(
36+
file_io: &FileIO,
37+
table_path: &str,
38+
list_name: &str,
39+
) -> crate::Result<Vec<crate::spec::ManifestFileMeta>> {
40+
let path = format!(
41+
"{}/{}/{}",
42+
table_path.trim_end_matches('/'),
43+
MANIFEST_DIR,
44+
list_name
45+
);
46+
let input = file_io.new_input(&path)?;
47+
if !input.exists().await? {
48+
return Ok(Vec::new());
49+
}
50+
let bytes = input.read().await?;
51+
crate::spec::from_avro_bytes::<crate::spec::ManifestFileMeta>(&bytes)
52+
}
53+
54+
/// Reads all manifest entries for a snapshot (base + delta manifest lists, then each manifest file).
55+
async fn read_all_manifest_entries(
56+
file_io: &FileIO,
57+
table_path: &str,
58+
snapshot: &Snapshot,
59+
) -> crate::Result<Vec<ManifestEntry>> {
60+
let mut manifest_files =
61+
read_manifest_list(file_io, table_path, snapshot.base_manifest_list()).await?;
62+
let delta = read_manifest_list(file_io, table_path, snapshot.delta_manifest_list()).await?;
63+
manifest_files.extend(delta);
64+
65+
let manifest_path_prefix = format!("{}/{}", table_path.trim_end_matches('/'), MANIFEST_DIR);
66+
let mut all_entries = Vec::new();
67+
// todo: consider use multiple-threads read manifest
68+
for meta in manifest_files {
69+
let path = format!("{}/{}", manifest_path_prefix, meta.file_name());
70+
let entries = crate::spec::Manifest::read(file_io, &path).await?;
71+
all_entries.extend(entries);
72+
}
73+
Ok(all_entries)
74+
}
75+
76+
/// Merges add/delete manifest entries: keeps only ADD entries whose (partition, bucket, file_name) is not in DELETE set.
77+
fn merge_manifest_entries(entries: Vec<ManifestEntry>) -> Vec<ManifestEntry> {
78+
let mut deleted = HashSet::new();
79+
let mut added = Vec::new();
80+
for e in entries {
81+
// follow python code to use partition, bucket, filename as duplicator
82+
let key = (
83+
e.partition().to_vec(),
84+
e.bucket(),
85+
e.file().file_name.clone(),
86+
);
87+
match e.kind() {
88+
FileKind::Add => added.push(e),
89+
FileKind::Delete => {
90+
deleted.insert(key);
91+
}
92+
}
93+
}
94+
added
95+
.into_iter()
96+
.filter(|e| {
97+
!deleted.contains(&(
98+
e.partition().to_vec(),
99+
e.bucket(),
100+
e.file().file_name.clone(),
101+
))
102+
})
103+
.collect()
104+
}
105+
106+
/// TableScan for full table scan (no incremental, no predicate).
107+
///
108+
/// Reference: [pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
109+
#[derive(Debug, Clone)]
110+
pub struct TableScan {
111+
table: Table,
112+
}
113+
114+
impl TableScan {
115+
pub fn new(table: Table) -> Self {
116+
Self { table }
117+
}
118+
119+
/// Plan the full scan: read latest snapshot, manifest list, manifest entries, then build one DataSplit per (partition, bucket).
120+
pub async fn plan(&self) -> crate::Result<Plan> {
121+
let file_io = self.table.file_io();
122+
let table_path = self.table.location();
123+
let snapshot_manager = SnapshotManager::new(file_io.clone(), table_path.to_string());
124+
125+
let snapshot = match snapshot_manager.get_latest_snapshot().await? {
126+
Some(s) => s,
127+
None => return Ok(Plan::new(Vec::new())),
128+
};
129+
Self::plan_snapshot(snapshot, file_io, table_path).await
130+
}
131+
132+
pub async fn plan_snapshot(
133+
snapshot: Snapshot,
134+
file_io: &FileIO,
135+
table_path: &str,
136+
) -> crate::Result<Plan> {
137+
let entries = read_all_manifest_entries(file_io, table_path, &snapshot).await?;
138+
let entries = merge_manifest_entries(entries);
139+
if entries.is_empty() {
140+
return Ok(Plan::new(Vec::new()));
141+
}
142+
143+
// Group by (partition, bucket). Key = (partition_bytes, bucket).
144+
let mut groups: HashMap<(Vec<u8>, i32), Vec<ManifestEntry>> = HashMap::new();
145+
for e in entries {
146+
let key = (e.partition().to_vec(), e.bucket());
147+
groups.entry(key).or_default().push(e);
148+
}
149+
150+
let snapshot_id = snapshot.id();
151+
let base_path = table_path;
152+
let mut splits = Vec::new();
153+
154+
for ((_partition, bucket), group_entries) in groups {
155+
let total_buckets = group_entries
156+
.first()
157+
.map(|e| e.total_buckets())
158+
.ok_or_else(|| Error::UnexpectedError {
159+
message: format!("Manifest entry group for bucket {bucket} is empty, cannot determine total_buckets"),
160+
source: None,
161+
})?;
162+
let mut data_files = Vec::new();
163+
164+
// currently, only group by splits by bucket
165+
// todo: consider use binpack to generate split
166+
for manifest_entry in group_entries {
167+
let ManifestEntry { file, .. } = manifest_entry;
168+
data_files.push(file);
169+
}
170+
171+
// todo: consider partitioned table
172+
let bucket_path = format!("{base_path}/bucket-{bucket}");
173+
let partition = BinaryRow::new(0);
174+
175+
let split = DataSplitBuilder::new()
176+
.with_snapshot(snapshot_id)
177+
.with_partition(partition)
178+
.with_bucket(bucket)
179+
.with_bucket_path(bucket_path)
180+
.with_total_buckets(total_buckets)
181+
.with_data_files(data_files)
182+
.build()?;
183+
splits.push(split);
184+
}
185+
Ok(Plan::new(splits))
186+
}
187+
}

0 commit comments

Comments
 (0)