|
1 | | -# PIP-298: Authorization Operation Metrics |
| 1 | +# PIP-298: Support read transaction buffer snapshot segments from earliest |
2 | 2 |
|
3 | | -# Background knowledge |
| 3 | +# Background |
4 | 4 |
|
5 | | -Pulsar brokers perform authorization checks before allowing clients, proxies, and administrative callers to access |
6 | | -topics, namespaces, tenants, brokers, and policy operations. These checks are enforced through the broker-side |
7 | | -`AuthorizationService`, which delegates the decision to the configured `AuthorizationProvider`. |
8 | | - |
9 | | -Pulsar already exposes several security-related metrics, especially around authentication. These metrics are useful for |
10 | | -detecting login failures, unhealthy client behavior, and changes in access patterns. However, Pulsar does not expose an |
11 | | -equivalent broker-level metric stream for authorization outcomes. In practice, authorization failures are primarily |
12 | | -visible through logs and request failures rather than through a dedicated metric. |
13 | | - |
14 | | -Pulsar also supports OpenTelemetry metrics. For operational consistency, new broker observability features should align |
15 | | -Prometheus-style metrics with OpenTelemetry counters rather than introducing instrumentation in only one pipeline. |
| 5 | +In the implementation of the Pulsar Transaction, each topic is configured with a `Transaction Buffer` to prevent |
| 6 | +consumers from reading uncommitted messages, which are invisible until the transaction is committed. Transaction Buffer |
| 7 | +works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only dispatches messages, before the |
| 8 | +maxReadPosition, to the consumers. When the broker dispatches the messages before maxReadPosition to the consumer, the |
| 9 | +messages sent by aborted transactions will get filtered by the Transaction Buffer. |
16 | 10 |
|
17 | 11 | # Motivation |
18 | 12 |
|
19 | | -Operators need a low-cardinality, broker-native signal that shows whether authorization checks are succeeding or |
20 | | -failing. This is needed for security alerting, baseline monitoring, and compliance reporting. |
21 | | - |
22 | | -Without a dedicated authorization metric, operators have to infer authorization denials from logs, HTTP status codes, |
23 | | -or client-side errors. That is brittle and does not support standard monitoring patterns such as: |
24 | | - |
25 | | -- Alerting on spikes in authorization failures. |
26 | | -- Comparing authorization failures against successful authorizations. |
27 | | -- Building dashboards that differentiate between authentication problems and authorization problems. |
28 | | -- Exporting equivalent signals through both Prometheus and OpenTelemetry. |
29 | | - |
30 | | -The lack of a generic metric also encourages overly narrow designs such as a failure-only counter. That limits |
31 | | -observability because operators often need both success and failure counts to understand whether a denial spike reflects |
32 | | -an attack, a rollout problem, or a normal traffic shift. |
33 | | - |
34 | | -# Goals |
| 13 | +Currently, Pulsar transactions do not have configurable isolation levels. By introducing isolation level configuration |
| 14 | +for consumers, we can enhance the flexibility of Pulsar transactions. |
| 15 | + |
| 16 | +Let's consider an example: |
| 17 | + |
| 18 | +**System**: Financial Transaction System |
| 19 | + |
| 20 | +**Operations**: Large volume of deposit and withdrawal operations, a |
| 21 | +small number of transfer operations. |
| 22 | + |
| 23 | +**Roles**: |
| 24 | + |
| 25 | +- **Client A1** |
| 26 | +- **Client A2** |
| 27 | +- **User Account B1** |
| 28 | +- **User Account B2** |
| 29 | +- **Request Topic C** |
| 30 | +- **Real-time Monitoring System D** |
| 31 | +- **Business Processing System E** |
| 32 | + |
| 33 | +**Client Operations**: |
| 34 | + |
| 35 | +- **Withdrawal**: Client A1 decreases the deposit amount from User |
| 36 | + Account B1 or B2. |
| 37 | +- **Deposit**: Client A1 increases the deposit amount in User Account B1 or B2. |
| 38 | +- **Transfer**: Client A2 decreases the deposit amount from User |
| 39 | + Account B1 and increases it in User Account B2. Or vice versa. |
| 40 | + |
| 41 | +**Real-time Monitoring System D**: Obtains the latest data from |
| 42 | +Request Topic C as quickly as possible to monitor transaction data and |
| 43 | +changes in bank reserves in real-time. This is necessary for the |
| 44 | +timely detection of anomalies and real-time decision-making. |
| 45 | + |
| 46 | +**Business Processing System E**: Reads data from Request Topic C, |
| 47 | +then actually operates User Accounts B1, B2. |
| 48 | + |
| 49 | +**User Scenario**: Client A1 sends a large number of deposit and |
| 50 | +withdrawal requests to Request Topic C. Client A2 writes a small |
| 51 | +number of transfer requests to Request Topic C. |
| 52 | + |
| 53 | +In this case, Business Processing System E needs a read-committed |
| 54 | +isolation level to ensure operation consistency and Exactly Once |
| 55 | +semantics. The real-time monitoring system does not care if a small |
| 56 | +number of transfer requests are incomplete (dirty data). What it |
| 57 | +cannot tolerate is a situation where a large number of deposit and |
| 58 | +withdrawal requests cannot be presented in real time due to a small |
| 59 | +number of transfer requests (the current situation is that uncommitted |
| 60 | +transaction messages can block the reading of committed transaction |
| 61 | +messages). |
| 62 | + |
| 63 | +In this case, it is necessary to set different isolation levels for |
| 64 | +different consumers/subscriptions. |
| 65 | +The uncommitted transactions do not impact actual users' bank accounts. |
| 66 | +Business Processing System E only reads committed transactional |
| 67 | +messages and operates users' accounts. It needs Exactly-once semantic. |
| 68 | +Real-time Monitoring System D reads uncommitted transactional |
| 69 | +messages. It does not need Exactly-once semantic. |
| 70 | + |
| 71 | +They use different subscriptions and choose different isolation |
| 72 | +levels. One needs transaction, one does not. |
| 73 | +In general, multiple subscriptions of the same topic do not all |
| 74 | +require transaction guarantees. |
| 75 | +Some want low latency without the exact-once semantic guarantee, and |
| 76 | +some must require the exactly-once guarantee. |
| 77 | +We just provide a new option for different subscriptions. |
| 78 | + |
| 79 | +# Goal |
35 | 80 |
|
36 | 81 | ## In Scope |
37 | 82 |
|
38 | | -- Add a low-cardinality broker authorization metric for operation outcomes. |
39 | | -- Record both successful and failed authorization decisions. |
40 | | -- Expose the metric through Prometheus-compatible broker metrics. |
41 | | -- Expose the same metric through OpenTelemetry. |
42 | | -- Centralize instrumentation in `AuthorizationService` so all broker authorization paths share the same metric model. |
| 83 | +Implement Read Committed and Read Uncommitted isolation levels for Pulsar transactions. Allow consumers to configure |
| 84 | +isolation levels during the building process. |
43 | 85 |
|
44 | 86 | ## Out of Scope |
45 | 87 |
|
46 | | -- Per-role, per-topic, per-tenant, or per-principal labels. |
47 | | -- Audit-log payloads or structured security event streams. |
48 | | -- New authorization APIs or protocol changes. |
49 | | -- Alert rule definitions for downstream monitoring stacks. |
50 | | - |
| 88 | +None. |
51 | 89 |
|
52 | 90 | # High Level Design |
53 | 91 |
|
54 | | -Introduce a generic authorization operation counter that is incremented whenever the broker finishes an authorization |
55 | | -decision. |
56 | | - |
57 | | -The metric is recorded centrally in `AuthorizationService`, which already serves as the broker-side entry point for |
58 | | -authorization checks across topic, namespace, tenant, broker, cluster, and policy operations. Each authorization check |
59 | | -will emit one result with a small, fixed label set: |
60 | | - |
61 | | -- what kind of resource was checked |
62 | | -- what operation category was requested |
63 | | -- whether the result was a success or failure |
64 | | - |
65 | | -This metric will be exported in two equivalent forms: |
66 | | - |
67 | | -- a Prometheus counter for the existing broker metrics endpoint |
68 | | -- an OpenTelemetry counter for modern metrics pipelines |
69 | | - |
70 | | -Invalid original-principal combinations in proxied authorization flows will also be counted as authorization failures, |
71 | | -because they represent rejected authorization attempts from the broker’s perspective. |
| 92 | +Add a configuration 'subscriptionIsolationLevel' in the consumer builder to allow users to choose different transaction |
| 93 | +isolation levels. |
72 | 94 |
|
73 | 95 | # Detailed Design |
74 | 96 |
|
75 | | -## Design & Implementation Details |
76 | | - |
77 | | -This proposal introduces a broker authorization metrics helper that owns: |
78 | | - |
79 | | -- a Prometheus counter for broker metrics scraping |
80 | | -- an OpenTelemetry `LongCounter` for broker metrics export |
81 | | - |
82 | | -The helper is instantiated by `AuthorizationService`. `AuthorizationService` records results after each completed |
83 | | -authorization decision. If the provider returns `true`, the helper records a success. If the provider returns `false`, |
84 | | -the helper records a failure. If `AuthorizationService` rejects a request before provider evaluation, such as an |
85 | | -invalid original-principal combination for proxied requests, the helper records a failure directly. |
86 | | - |
87 | | -The instrumentation is attached to the following authorization flows: |
88 | | - |
89 | | -- superuser checks |
90 | | -- tenant-admin checks |
91 | | -- tenant operations |
92 | | -- broker operations |
93 | | -- cluster operations |
94 | | -- cluster policy operations |
95 | | -- namespace operations |
96 | | -- namespace policy operations |
97 | | -- topic operations |
98 | | -- topic policy operations |
99 | | - |
100 | | -This proposal intentionally keeps the label space small. It does not include role names, topic names, tenant names, |
101 | | -client addresses, provider names, or error strings. |
102 | | - |
103 | 97 | ## Public-facing Changes |
104 | 98 |
|
105 | | -### Public API |
106 | | - |
107 | | -No public API changes. |
108 | | - |
109 | | -### Binary protocol |
110 | | - |
111 | | -No binary protocol changes. |
| 99 | +Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read |
| 100 | +Uncommitted. |
112 | 101 |
|
113 | | -### Configuration |
| 102 | +### Before the Change |
114 | 103 |
|
115 | | -No new configuration is required. |
| 104 | +The PulsarConsumer builder process currently does not include isolation level configurations. The consumer creation |
| 105 | +process might look like this: |
116 | 106 |
|
117 | | -### CLI |
| 107 | +``` |
| 108 | +PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); |
118 | 109 |
|
119 | | -No CLI changes. |
| 110 | +Consumer<String> consumer = client.newConsumer(Schema.STRING) |
| 111 | + .topic("persistent://my-tenant/my-namespace/my-topic") |
| 112 | + .subscriptionName("my-subscription") |
| 113 | + .subscriptionType(SubscriptionType.Shared) |
| 114 | + .subscribe(); |
| 115 | +``` |
120 | 116 |
|
121 | | -### Metrics |
| 117 | +### After the Change |
122 | 118 |
|
123 | | -Prometheus metric: |
| 119 | +Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read |
| 120 | +Uncommitted. Introduce a new method subscriptionIsolationLevel() in the consumer builder, which accepts an enumeration |
| 121 | +value representing the isolation level: |
124 | 122 |
|
125 | | -- Full name: `pulsar_authorization_operations_total` |
126 | | -- Description: Total number of broker authorization operations. |
127 | | -- Attributes: |
128 | | - - `resource_type` |
129 | | - - `operation` |
130 | | - - `result` |
131 | | -- Unit: operations |
| 123 | +``` |
| 124 | +public enum SubscriptionIsolationLevel { |
| 125 | + // Consumer can only consume all transactional messages which have been committed. |
| 126 | + READ_COMMITTED, |
132 | 127 |
|
133 | | -OpenTelemetry metric: |
| 128 | + // Consumer can consume all messages, even transactional messages which have been aborted. |
| 129 | + READ_UNCOMMITTED; |
| 130 | +} |
| 131 | +``` |
134 | 132 |
|
135 | | -- Full name: `pulsar.authorization.operation.count` |
136 | | -- Description: Total number of broker authorization operations. |
137 | | -- Attributes: |
138 | | - - `pulsar.authorization.type` |
139 | | - - `pulsar.authorization.operation` |
140 | | - - `pulsar.authorization.result` |
141 | | -- Unit: `{operation}` |
| 133 | +Then, modify the consumer creation process to include the new isolation level configuration: |
142 | 134 |
|
143 | | -Attribute values: |
| 135 | +``` |
| 136 | +PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); |
144 | 137 |
|
145 | | -- `result`: `success` or `failure` |
146 | | -- `resource_type`: fixed categories such as `topic`, `namespace`, `tenant`, `broker`, `cluster`, `superuser`, |
147 | | - `tenant_admin`, `topic_policy`, `namespace_policy`, and `cluster_policy` |
148 | | -- `operation`: the normalized operation name for the authorization check |
| 138 | +Consumer<String> consumer = client.newConsumer(Schema.STRING) |
| 139 | + .topic("persistent://my-tenant/my-namespace/my-topic") |
| 140 | + .subscriptionName("my-subscription") |
| 141 | + .subscriptionType(SubscriptionType.Shared) |
| 142 | + .subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_COMMITTED) // Adding the isolation level configuration |
| 143 | + .subscribe(); |
| 144 | +``` |
149 | 145 |
|
| 146 | +With this change, users can now choose between Read Committed and Read Uncommitted isolation levels when creating a new |
| 147 | +consumer. If the isolationLevel() method is not called during the builder process, the default isolation level will be |
| 148 | +Read Committed. |
| 149 | +Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be |
| 150 | +configured with the same IsolationLevel. |
150 | 151 |
|
151 | | -# Monitoring |
152 | | - |
153 | | -Operators should monitor both absolute authorization failures and the relationship between failures and successes. |
154 | | -Recommended patterns include: |
155 | | - |
156 | | -- Alert on sustained increases in `result="failure"`. |
157 | | -- Build dashboards that show `success` and `failure` together by `resource_type`. |
158 | | -- Investigate rollout regressions by comparing the failure rate before and after authorization policy changes. |
159 | | -- Distinguish authentication incidents from authorization incidents by correlating authorization failures with existing |
160 | | - authentication metrics. |
161 | | - |
162 | | -This proposal intentionally enables ratio-based alerting, such as failure/success comparisons, by including both result |
163 | | -types in the same metric family. |
164 | | - |
165 | | -# Security Considerations |
| 152 | +## Design & Implementation Details |
166 | 153 |
|
167 | | -This proposal improves security observability but does not change authorization semantics. |
| 154 | +### Client Changes |
168 | 155 |
|
169 | | -Because authorization decisions can be high volume and can involve sensitive identifiers, the metric must avoid |
170 | | -high-cardinality or identity-bearing labels. This proposal therefore excludes role names, topic names, namespaces, |
171 | | -tenants, and client network information from metric attributes. That preserves operational usefulness without turning |
172 | | -the metric into a data-leak or cardinality risk. |
| 156 | +Update the PulsarConsumer builder to accept isolation level configurations for Read Committed and Read Uncommitted levels. |
173 | 157 |
|
174 | | -Failed proxy original-principal validation is counted as an authorization failure because the broker rejects the |
175 | | -request during authorization handling. |
| 158 | +In order to achieve the above goals, the following modifications need to be made: |
176 | 159 |
|
177 | | -# Backward & Forward Compatibility |
| 160 | +- Added `IsolationLevel` related fields and methods in `ConsumerConfigurationData` and `ConsumerBuilderImpl` and `ConsumerImpl` |
178 | 161 |
|
179 | | -## Upgrade |
| 162 | +- Modify PulsarApi.CommandSubscribe, add field -- IsolationLevel |
180 | 163 |
|
181 | | -No special upgrade action is required. The new metrics appear automatically after upgrading brokers that include this |
182 | | -feature. |
| 164 | +``` |
| 165 | +message CommandSubscribe { |
183 | 166 |
|
184 | | -## Downgrade / Rollback |
| 167 | + enum IsolationLevel { |
| 168 | + READ_COMMITTED = 0; |
| 169 | + READ_UNCOMMITTED = 1; |
| 170 | + } |
| 171 | + optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED]; |
| 172 | +} |
| 173 | +``` |
185 | 174 |
|
186 | | -Downgrading removes the metrics. Monitoring systems should tolerate missing-series behavior during rollback. |
| 175 | +### Broker changes |
187 | 176 |
|
188 | | -## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations |
| 177 | +Modify the transaction buffer and dispatching mechanisms to handle messages based on the chosen isolation level. |
189 | 178 |
|
190 | | -No geo-replication protocol or metadata compatibility changes are introduced. |
| 179 | +In order to achieve the above goals, the following modifications need to be made: |
191 | 180 |
|
192 | | -# Alternatives |
| 181 | +- Determine in the `readMoreEntries` method of Dispatchers such as `PersistentDispatcherSingleActiveConsumer` |
| 182 | + and `PersistentDispatcherMultipleConsumers`: |
193 | 183 |
|
194 | | -- Failure-only counter: |
195 | | - Rejected because operators often need both success and failure counts to interpret changes correctly and to build |
196 | | - ratio-based alerts. |
| 184 | + - If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition = topic.getMaxReadPosition(), that is, |
| 185 | + transactionBuffer.getMaxReadPosition() |
197 | 186 |
|
198 | | -- Add detailed identity labels such as role or topic: |
199 | | - Rejected due to cardinality and privacy concerns. |
| 187 | + - If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition = PositionImpl.LATEST |
200 | 188 |
|
201 | | -- Instrument each authorization call site independently: |
202 | | - Rejected because it would be error-prone and would likely produce inconsistent semantics across broker paths. |
| 189 | +- Add a new metrics `subscriptionIsolationLevel` in `SubscriptionStatsImpl`. |
203 | 190 |
|
204 | | -# General Notes |
| 191 | +# Monitoring |
205 | 192 |
|
206 | | -This proposal is intentionally limited to broker metrics. It does not attempt to replace audit logging or structured |
207 | | -security events. |
| 193 | +After this PIP, Users can query the subscription stats of a topic through the admin tool, and observe the `subscriptionIsolationLevel` in the subscription stats to determine the isolation level of the subscription. |
208 | 194 |
|
209 | 195 | # Links |
210 | 196 |
|
211 | | -* Mailing List discussion thread: |
212 | | -* Mailing List voting thread: |
| 197 | +* Mailing List discussion thread: https://lists.apache.org/thread/8ny0qtp7m9qcdbvnfjdvpnkc4c5ssyld |
| 198 | +* Mailing List voting thread: https://lists.apache.org/thread/4q1hrv466h8w9ccpf4moxt6jv1jxp1mr |
| 199 | +* Document link: https://github.com/apache/pulsar-site/pull/712 |
0 commit comments