-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][broker] PIP-452: Customizable Topic Listing in GetTopicsOfNamespace with properties #25134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…space with properties
9bfb65e to
8362933
Compare
e2f22eb to
06e3b36
Compare
pip/pip-452.md
Outdated
| .thenApply(topics -> TopicListingResult.success(topics, false)); | ||
| } | ||
| }).thenCompose(listingResult -> { | ||
| List<String> rawTopics = listingResult.getTopics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't have to copy so much code here, which does not focus on the key change. The only thing that matters is that what will replace the following existing code:
return getBrokerService().pulsar().getNamespaceService()
.getListOfUserTopics(namespaceName, mode)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified pseudocode
pip/pip-452.md
Outdated
| * Return an invalid result indicating that the caller should continue with the default logic. | ||
| */ | ||
| public static TopicListingResult passThrough() { | ||
| return PASS_THROUGH_INSTANCE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This design is a bit over-abstraction. IMO, the following design could be simpler and clearer.
public record TopicListingResult(List<String> topics, boolean filtered) {} /**
* @return the future of the result, if it's empty, fall back to the built-in implementation to list all topics
*/
default CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace(There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the built-in implementation is very simple that calls method directly on the NamespaceService, I think the fallback logic is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we allow the configuration of multiple interceptors, the return here means that the result of current interception is not processed this method. If all the interceptors do not process the method, the default logic will be used.
In addition, the interception configuration is empty by default. In this case, we still need to fall back to the default logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace()
Applied this suggestion
…operties in ListTopicsOptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces PIP-452, which proposes a design for customizable topic listing in Pulsar's GetTopicsOfNamespace command. The proposal aims to make topic discovery more flexible by allowing clients to pass context properties and enabling plugins to override the default metadata store scanning behavior.
Changes:
- Adds protocol extension to include properties field in CommandGetTopicsOfNamespace
- Introduces BrokerInterceptor interface method for custom topic listing logic
- Extends REST API and CLI to support properties parameter for topic listing
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * @param topicsPattern Optional regex pattern provided by the client. | ||
| * @param properties Context properties provided by the client. | ||
| * @return A CompletableFuture containing the result: | ||
| * If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic. |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling/grammar: The sentence ends with "list all topic" but should be "list all topics" (plural).
| * If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic. | |
| * If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topics. |
|
|
||
| CLI: | ||
| ``` | ||
| pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2" |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent quote style: The CLI example uses a double quote at the end but no opening quote. Either remove the trailing quote or add both opening and closing quotes around the entire command for consistency.
| pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2" | |
| pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2 |
| * @param topicsPattern Optional regex pattern provided by the client. | ||
| * @param properties Context properties provided by the client. | ||
| * @return A CompletableFuture containing the result: | ||
| * If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic. |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect Javadoc reference syntax: The Javadoc reference {@code Optional#empty} should be {@code Optional.empty()} or {@link Optional#empty()} to follow proper Javadoc conventions. The hash symbol is correct for linking to methods, but when using @code, it should show the actual method call syntax with parentheses, or use @link for proper cross-referencing.
| * If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic. | |
| * If the future completes with {@code Optional.empty()}, proceed to the next interceptor or broker's default logic to list all topic. |
| // Wrap List<String> into TopicListingResult(topics, filtered=false) for unified processing | ||
| return getBrokerService().pulsar().getNamespaceService() | ||
| .getListOfUserTopics(namespaceName, mode) | ||
| .thenApply(topics -> new TopicListingResult(topics, false)); | ||
| } | ||
| }).thenCompose(listingResult -> { |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent wrapping behavior: The code shows wrapping the default namespace service result in TopicListingResult(topics, false) at line 152, but the interceptor is expected to return Optional<TopicListingResult>. The logic then unwraps this at line 144-154, but the code flow shows that line 152 returns a TopicListingResult directly, not wrapped in an Optional. This should return Optional.of(new TopicListingResult(topics, false)) to maintain type consistency.
| // Wrap List<String> into TopicListingResult(topics, filtered=false) for unified processing | |
| return getBrokerService().pulsar().getNamespaceService() | |
| .getListOfUserTopics(namespaceName, mode) | |
| .thenApply(topics -> new TopicListingResult(topics, false)); | |
| } | |
| }).thenCompose(listingResult -> { | |
| // Wrap List<String> into TopicListingResult(topics, filtered=false) inside an Optional for unified processing | |
| return getBrokerService().pulsar().getNamespaceService() | |
| .getListOfUserTopics(namespaceName, mode) | |
| .thenApply(topics -> Optional.of(new TopicListingResult(topics, false))); | |
| } | |
| }).thenCompose(listingResultOpt -> { | |
| TopicListingResult listingResult = listingResultOpt.get(); |
| return interceptorFuture.thenCompose(interceptorResult -> { | ||
| // Decision branch: Use the interceptor result OR fall back to the default logic | ||
| if (interceptorResult != null && interceptorResult.isPresent()) { | ||
| // case A: The interceptor has taken over the request | ||
| return CompletableFuture.completedFuture(interceptorResult); | ||
| } else { | ||
| // case B: The interceptor did not handle, so the original query logic is executed. | ||
| // Wrap List<String> into TopicListingResult(topics, filtered=false) for unified processing | ||
| return getBrokerService().pulsar().getNamespaceService() | ||
| .getListOfUserTopics(namespaceName, mode) | ||
| .thenApply(topics -> new TopicListingResult(topics, false)); | ||
| } | ||
| }).thenCompose(listingResult -> { |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type inconsistency in chain: The return type handling is problematic. Line 142 shows interceptorFuture.thenCompose(interceptorResult -> {...}) where interceptorResult is of type Optional<TopicListingResult>. Lines 146 and 152 appear to return different types - line 146 returns CompletableFuture.completedFuture(interceptorResult) (a CompletableFuture<Optional<TopicListingResult>>), while line 152 returns a TopicListingResult directly without Optional wrapping. Then line 154 expects listingResult to be a TopicListingResult. This type chain is broken and needs to be fixed.
|
|
||
| This implementation limits the flexibility required for complex multi-tenant scenarios: | ||
|
|
||
| No Client Context: The broker cannot distinguish who is asking for the topics or why. It cannot filter topics based on client properties (This properties may be related to topic properties). |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grammar issue: The parenthetical remark should be clearer. Consider revising to "These properties may be related to topic properties" or better yet, clarify what type of properties are being referenced and their relationship to topic properties.
| No Client Context: The broker cannot distinguish who is asking for the topics or why. It cannot filter topics based on client properties (This properties may be related to topic properties). | |
| No Client Context: The broker cannot distinguish who is asking for the topics or why. It cannot filter topics based on client properties (these client properties may correspond to, or be derived from, topic properties). |
| package org.apache.pulsar.broker.intercept; | ||
|
|
||
| import java.util.Optional; | ||
| import org.apache.pulsar.common.naming.NamespaceName; | ||
| import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| public interface BrokerInterceptor extends AutoCloseable { | ||
|
|
||
| // ... existing methods ... | ||
|
|
||
| /** | ||
| * Intercept the GetTopicsOfNamespace request. | ||
| * <p> | ||
| * This method allows plugins to override the default topic discovery logic (ZooKeeper scan). | ||
| * It enables fetching topics from external sources (e.g., databases, other metadata stores) | ||
| * based on the provided client context properties. | ||
| * | ||
| * @param namespace The namespace being queried. | ||
| * @param mode The query mode (PERSISTENT, NON_PERSISTENT, or ALL). | ||
| * @param topicsPattern Optional regex pattern provided by the client. | ||
| * @param properties Context properties provided by the client. | ||
| * @return A CompletableFuture containing the result: | ||
| * If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic. | ||
| */ | ||
| default CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace( | ||
| NamespaceName namespace, | ||
| CommandGetTopicsOfNamespace.Mode mode, | ||
| Optional<String> topicsPattern, | ||
| Map<String, String> properties) { | ||
| return CompletableFuture.completedFuture(Optional.empty()); | ||
| } | ||
| } |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing import statement: The code snippet references Map interface but doesn't include the required import statement. Add import java.util.Map; to the imports section.
| long actualSize = TopicListMemoryLimiter.estimateTopicListSize(listingResult.getTopics()); | ||
| listSizeHolder.updateSize(actualSize); | ||
|
|
||
| return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, | ||
| isPermitRequestCancelled, permits -> { | ||
|
|
||
| List<String> finalTopics = listingResult.getTopics(); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing method accessor: The code calls listingResult.getTopics(), but TopicListingResult is defined as a Java record with a field named topics (line 73). For Java records, the accessor method should be topics() not getTopics().
| long actualSize = TopicListMemoryLimiter.estimateTopicListSize(listingResult.getTopics()); | |
| listSizeHolder.updateSize(actualSize); | |
| return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, | |
| isPermitRequestCancelled, permits -> { | |
| List<String> finalTopics = listingResult.getTopics(); | |
| long actualSize = TopicListMemoryLimiter.estimateTopicListSize(listingResult.topics()); | |
| listSizeHolder.updateSize(actualSize); | |
| return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, | |
| isPermitRequestCancelled, permits -> { | |
| List<String> finalTopics = listingResult.topics(); |
| isPermitRequestCancelled, permits -> { | ||
|
|
||
| List<String> finalTopics = listingResult.getTopics(); | ||
| boolean hasAppliedFilter = listingResult.isFiltered(); |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing method accessor: The code calls listingResult.isFiltered(), but TopicListingResult is defined as a Java record with a field named filtered (line 73). For Java records, the accessor method should be filtered() not isFiltered(). Note that for boolean fields in records, the accessor is simply the field name, not prefixed with 'is' or 'get'.
| boolean hasAppliedFilter = listingResult.isFiltered(); | |
| boolean hasAppliedFilter = listingResult.filtered(); |
|
|
||
| REST API: | ||
| ``` | ||
| GET /admin/v2/persistent/{tenant}/{namespace} with a query params properties=k1=v1,k2=v2 |
Copilot
AI
Jan 13, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unclear API design: The REST API format shows properties=k1=v1,k2=v2 at line 54, but it's not clear how this query parameter should be parsed. Is it a single parameter with comma-separated key-value pairs, or multiple parameters? The CLI example at line 59 shows -p k1=v1 -p k2=v2 which suggests multiple parameters. The REST API documentation should clarify the expected format and whether URL encoding is required for values.
Fixes #xyz
Main Issue: #xyz
PIP: #xyz
Motivation
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: