Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d37373e
proto: sync PulsarApi.proto from upstream (PIP-420 support)
codelipenghui Mar 13, 2026
f1055c6
error: add SchemaRegistry variant for external schema errors
codelipenghui Mar 13, 2026
766ff83
feat: add schema_id_util with magic-byte framing (PIP-420)
codelipenghui Mar 13, 2026
3e36b01
feat: add PulsarSchema trait and EncodeData struct
codelipenghui Mar 13, 2026
f8a8664
feat: add DefaultPulsarSchema bridging old traits to PulsarSchema
codelipenghui Mar 13, 2026
ae3bccc
feat: add KeyValueSchema composing two inner PulsarSchema instances
codelipenghui Mar 13, 2026
5ba8a8e
feat: re-export PulsarSchema, EncodeData, and schema sub-types
codelipenghui Mar 13, 2026
cb1a37d
refactor: Message<T> — replace _phantom with decoded field, add value()
codelipenghui Mar 13, 2026
284d4fa
refactor: TopicConsumer uses unified DecodedMessageReceiver channel
codelipenghui Mar 13, 2026
2e9ef94
feat: add with_schema() to ConsumerBuilder
codelipenghui Mar 13, 2026
b984920
feat: wire PulsarSchema decode task into TopicConsumer
codelipenghui Mar 13, 2026
db9a54d
feat: add PulsarSchema support to Producer/ProducerBuilder with schem…
codelipenghui Mar 13, 2026
61de0c3
feat: add pulsar-schema-confluent as workspace member
codelipenghui Mar 13, 2026
79c289e
fix: address PR review critical issues (C1-C5)
codelipenghui Mar 13, 2026
79d3b3a
fix: address PR review important issues (I1-I5)
codelipenghui Mar 13, 2026
d9efa43
fix: address PR review round 2 (6 findings)
codelipenghui Mar 13, 2026
26ac9de
fix: carry schema through build_multi_topic and fix clippy warnings
codelipenghui Mar 13, 2026
c22e159
fix: address review round 3 (C2, I1-I3)
codelipenghui Mar 13, 2026
f3296ca
fix: restore backward-compatible public API
codelipenghui Mar 13, 2026
22aa649
fix: address review round 4 (I1-I3, Message backward compat)
codelipenghui Mar 13, 2026
6ab55ee
style: run rustfmt (stable 1.94.0) to fix CI format check
codelipenghui Mar 13, 2026
ae49c8e
fix: minimize PulsarApi.proto diff to only PIP-420 fields
codelipenghui Mar 13, 2026
ee64fb2
fix: remove needless ..Default::default() after proto revert
codelipenghui Mar 13, 2026
8143563
fix: revert unrelated changes (clippy lints, doc cosmetics)
codelipenghui Mar 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[workspace]
members = ["pulsar-schema-confluent"]

[package]
name = "pulsar"
version = "6.4.1"
Expand Down
2 changes: 2 additions & 0 deletions PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message Schema {
LocalTime = 18;
LocalDateTime = 19;
ProtobufNative = 20;
External = 22;
}

required string name = 1;
Expand Down Expand Up @@ -162,6 +163,7 @@ message MessageMetadata {

// Indicate if the message partition key is set
optional bool null_partition_key = 30 [default = false];
optional bytes schema_id = 32;
}

message SingleMessageMetadata {
Expand Down
5 changes: 1 addition & 4 deletions examples/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ struct TestData {
impl SerializeMessage for TestData {
fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
let payload = serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
Ok(producer::Message {
payload,
..Default::default()
})
Ok(producer::Message::new(payload))
}
}

Expand Down
5 changes: 1 addition & 4 deletions examples/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ struct TestData {
impl SerializeMessage for TestData {
fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
let payload = serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
Ok(producer::Message {
payload,
..Default::default()
})
Ok(producer::Message::new(payload))
}
}

Expand Down
5 changes: 1 addition & 4 deletions examples/round_trip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ struct TestData {
impl SerializeMessage for TestData {
fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
let payload = serde_json::to_vec(&input).map_err(|e| PulsarError::Custom(e.to_string()))?;
Ok(producer::Message {
payload,
..Default::default()
})
Ok(producer::Message::new(payload))
}
}

Expand Down
18 changes: 18 additions & 0 deletions pulsar-schema-confluent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "pulsar-schema-confluent"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
description = "Confluent Schema Registry integration for pulsar-rs"
repository = "https://github.com/streamnative/pulsar-rs"

[dependencies]
pulsar = { path = "..", default-features = false, features = ["tokio-runtime"] }
async-trait = "0.1"
schema_registry_converter = { version = "4.8", features = ["json"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
log = "0.4"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
Loading
Loading