Skip to content

Commit 277b805

Browse files
feat(scan): support timestamp-based snapshot lookup (#246)
1 parent c7492e6 commit 277b805

16 files changed

Lines changed: 491 additions & 0 deletions

include/paimon/defs.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,18 @@ struct PAIMON_EXPORT Options {
351351
/// engine when delete records are received. Default value is "false".
352352
static const char AGGREGATION_REMOVE_RECORD_ON_DELETE[];
353353

354+
/// "scan.timestamp-millis" - Optional timestamp used in case of "from-timestamp" scan mode.
355+
/// For batch sources, produces the latest snapshot earlier than or equal to the timestamp.
356+
/// For streaming sources, starts from the first snapshot at or after the timestamp.
357+
/// "scan.timestamp" can be used as an alternative string input for the same mode.
358+
static const char SCAN_TIMESTAMP_MILLIS[];
359+
360+
/// "scan.timestamp" - Optional timestamp string used in case of "from-timestamp" scan mode,
361+
/// as an alternative to "scan.timestamp-millis".
362+
/// It will be automatically converted to timestamp in unix milliseconds, using local time zone.
363+
/// Supported formats: yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, yyyy-MM-dd HH:mm:ss.SSS.
364+
static const char SCAN_TIMESTAMP[];
365+
354366
/// "scan.tag-name" - Optional tag name used in case of "from-snapshot" scan mode.
355367
static const char SCAN_TAG_NAME[];
356368
/// "write-only" - If set to "true", compactions and snapshot expiration will be skipped. This

include/paimon/table/source/startup_mode.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ class PAIMON_EXPORT StartupMode {
4848
/// specified by "scan.snapshot-id" but does not read new changes
4949
static const StartupMode FromSnapshotFull();
5050

51+
/// Starts from a timestamp specified by either "scan.timestamp-millis" or
52+
/// "scan.timestamp". For batch sources, produces the latest snapshot whose
53+
/// timestamp is <= the specified timestamp. For streaming sources, continuously
54+
/// reads changes starting from the first snapshot at or after the timestamp.
55+
static const StartupMode FromTimestamp();
56+
5157
public:
5258
std::string ToString() const;
5359
bool operator==(const StartupMode& other) const;

src/paimon/common/defs.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
8888
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
8989
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
9090
const char Options::AGGREGATION_REMOVE_RECORD_ON_DELETE[] = "aggregation.remove-record-on-delete";
91+
const char Options::SCAN_TIMESTAMP_MILLIS[] = "scan.timestamp-millis";
92+
const char Options::SCAN_TIMESTAMP[] = "scan.timestamp";
9193
const char Options::SCAN_TAG_NAME[] = "scan.tag-name";
9294
const char Options::WRITE_ONLY[] = "write-only";
9395
const char Options::COMPACTION_MIN_FILE_NUM[] = "compaction.min.file-num";

src/paimon/common/utils/string_utils.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,70 @@ Result<int32_t> StringUtils::StringToDate(const std::string& str) {
148148
return time / SECONDS_PER_DAY;
149149
}
150150

151+
/// Parses a timestamp string into unix milliseconds.
152+
/// Supported formats: "yyyy-MM-dd", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm:ss.SSS".
153+
/// Uses the default local time zone, consistent with Java Paimon behavior.
154+
Result<int64_t> StringUtils::StringToTimestampMillis(const std::string& str) {
155+
std::tm timeinfo{};
156+
timeinfo.tm_isdst = -1;
157+
158+
// Try "yyyy-MM-dd HH:mm:ss" first (also matches "yyyy-MM-dd HH:mm:ss.SSS")
159+
std::istringstream ss(str);
160+
ss >> std::get_time(&timeinfo, "%Y-%m-%d %H:%M:%S");
161+
int32_t millis_part = 0;
162+
163+
if (!ss.fail()) {
164+
// Check for optional fractional seconds ".SSS"
165+
if (ss.peek() == '.') {
166+
ss.get();
167+
std::string frac;
168+
while (frac.size() < 3 && ss.peek() != std::char_traits<char>::eof() &&
169+
std::isdigit(static_cast<unsigned char>(ss.peek()))) {
170+
frac += static_cast<char>(ss.get());
171+
}
172+
if (frac.empty()) {
173+
return Status::Invalid(
174+
fmt::format("failed to convert string '{}' to timestamp, "
175+
"expected digits after '.'",
176+
str));
177+
}
178+
// Pad to 3 digits: "1" -> 100, "12" -> 120, "123" -> 123
179+
while (frac.size() < 3) {
180+
frac += '0';
181+
}
182+
auto parsed = StringToValue<int32_t>(frac);
183+
if (parsed) {
184+
millis_part = parsed.value();
185+
}
186+
}
187+
} else {
188+
// Fall back to "yyyy-MM-dd" (date only, time defaults to 00:00:00)
189+
ss.clear();
190+
ss.str(str);
191+
timeinfo = std::tm{};
192+
timeinfo.tm_isdst = -1;
193+
ss >> std::get_time(&timeinfo, "%Y-%m-%d");
194+
if (ss.fail()) {
195+
return Status::Invalid(
196+
fmt::format("failed to convert string '{}' to timestamp, "
197+
"supported formats: yyyy-MM-dd, yyyy-MM-dd HH:mm:ss, "
198+
"yyyy-MM-dd HH:mm:ss.SSS",
199+
str));
200+
}
201+
}
202+
203+
if (ss.peek() != std::char_traits<char>::eof()) {
204+
return Status::Invalid(
205+
fmt::format("failed to convert string '{}' to timestamp, "
206+
"unexpected trailing characters",
207+
str));
208+
}
209+
210+
std::time_t time = mktime(&timeinfo);
211+
if (time == -1) {
212+
return Status::Invalid(fmt::format("failed to convert string '{}' to timestamp", str));
213+
}
214+
return static_cast<int64_t>(time) * 1000 + millis_part;
215+
}
216+
151217
} // namespace paimon

src/paimon/common/utils/string_utils.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ class PAIMON_EXPORT StringUtils {
141141

142142
static Result<int32_t> StringToDate(const std::string& str);
143143

144+
static Result<int64_t> StringToTimestampMillis(const std::string& str);
145+
144146
template <typename T>
145147
static std::optional<T> StringToValue(const std::string& str);
146148
};

src/paimon/common/utils/string_utils_test.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "gtest/gtest.h"
2323
#include "paimon/status.h"
2424
#include "paimon/testing/utils/testharness.h"
25+
#include "paimon/testing/utils/timezone_guard.h"
2526

2627
namespace paimon::test {
2728
class StringUtilsTest : public ::testing::Test {
@@ -401,6 +402,53 @@ TEST_F(StringUtilsTest, TestStringToDate) {
401402
ASSERT_NOK(StringUtils::StringToDate("1970-XX-02"));
402403
}
403404

405+
TEST_F(StringUtilsTest, TestStringToTimestampMillis) {
406+
TimezoneGuard tz_guard("Asia/Shanghai");
407+
// "yyyy-MM-dd HH:mm:ss" format
408+
{
409+
ASSERT_OK_AND_ASSIGN(int64_t millis,
410+
StringUtils::StringToTimestampMillis("1970-01-01 00:00:00"));
411+
ASSERT_EQ(millis, -28800000);
412+
}
413+
// "yyyy-MM-dd HH:mm:ss.SSS" format
414+
{
415+
ASSERT_OK_AND_ASSIGN(int64_t millis1,
416+
StringUtils::StringToTimestampMillis("2023-06-01 00:00:00.000"));
417+
ASSERT_OK_AND_ASSIGN(int64_t millis2,
418+
StringUtils::StringToTimestampMillis("2023-06-01 00:00:00.123"));
419+
ASSERT_EQ(millis2 - millis1, 123);
420+
}
421+
// "yyyy-MM-dd" format (date only, time defaults to 00:00:00)
422+
{
423+
ASSERT_OK_AND_ASSIGN(int64_t millis1, StringUtils::StringToTimestampMillis("2023-06-01"));
424+
ASSERT_OK_AND_ASSIGN(int64_t millis2,
425+
StringUtils::StringToTimestampMillis("2023-06-01 00:00:00"));
426+
ASSERT_EQ(millis1, millis2);
427+
}
428+
// Fractional second padding: "1" -> 100ms, "12" -> 120ms
429+
{
430+
ASSERT_OK_AND_ASSIGN(int64_t millis_base,
431+
StringUtils::StringToTimestampMillis("2023-06-01 12:00:00.000"));
432+
ASSERT_OK_AND_ASSIGN(int64_t millis_1,
433+
StringUtils::StringToTimestampMillis("2023-06-01 12:00:00.1"));
434+
ASSERT_EQ(millis_1 - millis_base, 100);
435+
ASSERT_OK_AND_ASSIGN(int64_t millis_12,
436+
StringUtils::StringToTimestampMillis("2023-06-01 12:00:00.12"));
437+
ASSERT_EQ(millis_12 - millis_base, 120);
438+
}
439+
// Invalid strings
440+
ASSERT_NOK(StringUtils::StringToTimestampMillis(""));
441+
ASSERT_NOK(StringUtils::StringToTimestampMillis("not-a-date"));
442+
ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-XX-01 00:00:00"));
443+
// Trailing garbage
444+
ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 00:00:00abc"));
445+
ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 00:00:00.12xyz"));
446+
ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 00:00:00 "));
447+
ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 00:00:00.12 "));
448+
// Trailing dot with no digits
449+
ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 00:00:00."));
450+
}
451+
404452
TEST_F(StringUtilsTest, TestVectorToString) {
405453
class A {
406454
public:

src/paimon/core/core_options.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ struct CoreOptions::Impl {
369369
std::shared_ptr<FileFormat> manifest_file_format;
370370

371371
std::optional<int64_t> scan_snapshot_id;
372+
std::optional<int64_t> scan_timestamp_millis;
372373
ExpireConfig expire_config;
373374
std::vector<std::string> sequence_field;
374375
std::vector<std::string> remove_record_on_sequence_group;
@@ -657,6 +658,19 @@ struct CoreOptions::Impl {
657658
Status ParseScanAndBranchOptions(const ConfigParser& parser) {
658659
// Parse scan.snapshot-id - optional snapshot id for "from-snapshot" scan mode
659660
PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_SNAPSHOT_ID, &scan_snapshot_id));
661+
// Parse scan.timestamp-millis and scan.timestamp
662+
std::string scan_timestamp_str;
663+
PAIMON_RETURN_NOT_OK(parser.ParseString(Options::SCAN_TIMESTAMP, &scan_timestamp_str));
664+
PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_TIMESTAMP_MILLIS, &scan_timestamp_millis));
665+
if (scan_timestamp_millis != std::nullopt && !scan_timestamp_str.empty()) {
666+
return Status::Invalid(
667+
"scan.timestamp-millis and scan.timestamp cannot be set at the same time");
668+
}
669+
if (!scan_timestamp_str.empty()) {
670+
PAIMON_ASSIGN_OR_RAISE(int64_t millis,
671+
StringUtils::StringToTimestampMillis(scan_timestamp_str));
672+
scan_timestamp_millis = millis;
673+
}
660674
// Parse scan.mode - scanning behavior of the source, default "default"
661675
PAIMON_RETURN_NOT_OK(parser.ParseStartupMode(&startup_mode));
662676
// Parse scan.fallback-branch - fallback branch when partition not found
@@ -942,6 +956,9 @@ int64_t CoreOptions::GetSourceSplitOpenFileCost() const {
942956
std::optional<int64_t> CoreOptions::GetScanSnapshotId() const {
943957
return impl_->scan_snapshot_id;
944958
}
959+
std::optional<int64_t> CoreOptions::GetScanTimestampMillis() const {
960+
return impl_->scan_timestamp_millis;
961+
}
945962
int64_t CoreOptions::GetManifestTargetFileSize() const {
946963
return impl_->manifest_target_file_size;
947964
}
@@ -963,6 +980,9 @@ StartupMode CoreOptions::GetStartupMode() const {
963980
if (GetScanSnapshotId() != std::nullopt || GetScanTagName() != std::nullopt) {
964981
return StartupMode::FromSnapshot();
965982
}
983+
if (GetScanTimestampMillis() != std::nullopt) {
984+
return StartupMode::FromTimestamp();
985+
}
966986
return StartupMode::LatestFull();
967987
}
968988
return impl_->startup_mode;

src/paimon/core/core_options.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class PAIMON_EXPORT CoreOptions {
7575
int64_t GetSourceSplitTargetSize() const;
7676
int64_t GetSourceSplitOpenFileCost() const;
7777
std::optional<int64_t> GetScanSnapshotId() const;
78+
std::optional<int64_t> GetScanTimestampMillis() const;
7879

7980
int64_t GetManifestTargetFileSize() const;
8081
StartupMode GetStartupMode() const;

src/paimon/core/core_options_test.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "paimon/status.h"
3030
#include "paimon/testing/mock/mock_file_system.h"
3131
#include "paimon/testing/utils/testharness.h"
32+
#include "paimon/testing/utils/timezone_guard.h"
3233
namespace paimon::test {
3334

3435
TEST(CoreOptionsTest, TestDefaultValue) {
@@ -598,4 +599,51 @@ TEST(CoreOptionsTest, TestNormalizeValueInCoreOption) {
598599
ASSERT_TRUE(core_options.SequenceFieldSortOrderIsAscending());
599600
ASSERT_EQ(BucketFunctionType::MOD, core_options.GetBucketFunctionType());
600601
}
602+
603+
TEST(CoreOptionsTest, TestScanTimestampMillis) {
604+
ASSERT_OK_AND_ASSIGN(CoreOptions core_options,
605+
CoreOptions::FromMap({{Options::SCAN_TIMESTAMP_MILLIS, "1721614515032"}}));
606+
ASSERT_EQ(1721614515032, core_options.GetScanTimestampMillis().value());
607+
ASSERT_EQ(StartupMode::FromTimestamp(), core_options.GetStartupMode());
608+
}
609+
610+
TEST(CoreOptionsTest, TestScanTimestampMillisExplicitMode) {
611+
ASSERT_OK_AND_ASSIGN(CoreOptions core_options,
612+
CoreOptions::FromMap({{Options::SCAN_MODE, "from-timestamp"},
613+
{Options::SCAN_TIMESTAMP_MILLIS, "1721614515032"}}));
614+
ASSERT_EQ(StartupMode::FromTimestamp(), core_options.GetStartupMode());
615+
ASSERT_EQ(1721614515032, core_options.GetScanTimestampMillis().value());
616+
}
617+
618+
TEST(CoreOptionsTest, TestScanTimestampMillisNotSet) {
619+
ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap({}));
620+
ASSERT_EQ(std::nullopt, core_options.GetScanTimestampMillis());
621+
ASSERT_EQ(StartupMode::LatestFull(), core_options.GetStartupMode());
622+
}
623+
624+
TEST(CoreOptionsTest, TestScanTimestampString) {
625+
TimezoneGuard tz_guard("Asia/Shanghai");
626+
ASSERT_OK_AND_ASSIGN(CoreOptions core_options,
627+
CoreOptions::FromMap({{Options::SCAN_TIMESTAMP, "2023-06-01 00:00:00"}}));
628+
ASSERT_EQ(core_options.GetScanTimestampMillis().value(), 1685548800000);
629+
ASSERT_EQ(StartupMode::FromTimestamp(), core_options.GetStartupMode());
630+
}
631+
632+
TEST(CoreOptionsTest, TestScanTimestampStringDateOnly) {
633+
ASSERT_OK_AND_ASSIGN(CoreOptions opts1,
634+
CoreOptions::FromMap({{Options::SCAN_TIMESTAMP, "2023-06-01"}}));
635+
ASSERT_OK_AND_ASSIGN(CoreOptions opts2,
636+
CoreOptions::FromMap({{Options::SCAN_TIMESTAMP, "2023-06-01 00:00:00"}}));
637+
ASSERT_EQ(opts1.GetScanTimestampMillis().value(), opts2.GetScanTimestampMillis().value());
638+
}
639+
640+
TEST(CoreOptionsTest, TestScanTimestampMillisAndStringMutuallyExclusive) {
641+
ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::SCAN_TIMESTAMP_MILLIS, "1721614515032"},
642+
{Options::SCAN_TIMESTAMP, "2023-06-01 00:00:00"}}),
643+
"scan.timestamp-millis and scan.timestamp cannot be set at the same time");
644+
}
645+
646+
TEST(CoreOptionsTest, TestScanTimestampInvalidString) {
647+
ASSERT_NOK(CoreOptions::FromMap({{Options::SCAN_TIMESTAMP, "not-a-date"}}));
648+
}
601649
} // namespace paimon::test

src/paimon/core/table/source/abstract_table_scan.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,33 @@ class AbstractTableScan : public TableScan {
8383
return Status::Invalid(
8484
"scan.snapshot-id must be set when startup mode is FROM_SNAPSHOT_FULL");
8585
}
86+
} else if (startup_mode == StartupMode::FromTimestamp()) {
87+
std::optional<int64_t> timestamp_millis = core_options_.GetScanTimestampMillis();
88+
if (timestamp_millis == std::nullopt) {
89+
return Status::Invalid(
90+
"scan.timestamp-millis or scan.timestamp must be set when startup mode is "
91+
"FROM_TIMESTAMP");
92+
}
93+
if (is_streaming) {
94+
PAIMON_ASSIGN_OR_RAISE(
95+
std::optional<Snapshot> earlier_snapshot,
96+
snapshot_manager->EarlierThanTimeMillis(timestamp_millis.value()));
97+
int64_t start_id =
98+
earlier_snapshot ? earlier_snapshot->Id() + 1 : Snapshot::FIRST_SNAPSHOT_ID;
99+
return std::make_shared<ContinuousFromSnapshotStartingScanner>(snapshot_manager,
100+
start_id);
101+
} else {
102+
PAIMON_ASSIGN_OR_RAISE(
103+
std::optional<Snapshot> snapshot,
104+
snapshot_manager->EarlierOrEqualTimeMillis(timestamp_millis.value()));
105+
if (snapshot == std::nullopt) {
106+
return Status::Invalid(fmt::format(
107+
"There is currently no snapshot earlier than or equal to timestamp [{}]",
108+
timestamp_millis.value()));
109+
}
110+
return std::make_shared<StaticFromSnapshotStartingScanner>(snapshot_manager,
111+
snapshot->Id());
112+
}
86113
}
87114
return Status::Invalid(
88115
fmt::format("Unsupported snapshot startup mode {}", startup_mode.ToString()));

0 commit comments

Comments
 (0)