Skip to content

Conversation

@coderzc
Copy link
Member

@coderzc coderzc commented Jan 11, 2026

Fixes #xyz

Main Issue: #xyz

PIP: #xyz

Motivation

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added PIP doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. labels Jan 11, 2026
@coderzc coderzc force-pushed the pip-452 branch 7 times, most recently from 9bfb65e to 8362933 Compare January 12, 2026 12:54
@coderzc coderzc force-pushed the pip-452 branch 3 times, most recently from e2f22eb to 06e3b36 Compare January 12, 2026 16:41
pip/pip-452.md Outdated
.thenApply(topics -> TopicListingResult.success(topics, false));
}
}).thenCompose(listingResult -> {
List<String> rawTopics = listingResult.getTopics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to copy so much code here, which does not focus on the key change. The only thing that matters is that what will replace the following existing code:

                        return getBrokerService().pulsar().getNamespaceService()
                                .getListOfUserTopics(namespaceName, mode)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified pseudocode

pip/pip-452.md Outdated
* Return an invalid result indicating that the caller should continue with the default logic.
*/
public static TopicListingResult passThrough() {
return PASS_THROUGH_INSTANCE;
Copy link
Contributor

@BewareMyPower BewareMyPower Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design is a bit over-abstraction. IMO, the following design could be simpler and clearer.

public record TopicListingResult(List<String> topics, boolean filtered) {}
    /**
     * @return the future of the result, if it's empty, fall back to the built-in implementation to list all topics
     */
    default CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the built-in implementation is very simple that calls method directly on the NamespaceService, I think the fallback logic is unnecessary.

Copy link
Member Author

@coderzc coderzc Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we allow the configuration of multiple interceptors, the return here means that the result of current interception is not processed this method. If all the interceptors do not process the method, the default logic will be used.

In addition, the interception configuration is empty by default. In this case, we still need to fall back to the default logic.

Copy link
Member Author

@coderzc coderzc Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace()

Applied this suggestion

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces PIP-452, which proposes a design for customizable topic listing in Pulsar's GetTopicsOfNamespace command. The proposal aims to make topic discovery more flexible by allowing clients to pass context properties and enabling plugins to override the default metadata store scanning behavior.

Changes:

  • Adds protocol extension to include properties field in CommandGetTopicsOfNamespace
  • Introduces BrokerInterceptor interface method for custom topic listing logic
  • Extends REST API and CLI to support properties parameter for topic listing

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

* @param topicsPattern Optional regex pattern provided by the client.
* @param properties Context properties provided by the client.
* @return A CompletableFuture containing the result:
* If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic.
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling/grammar: The sentence ends with "list all topic" but should be "list all topics" (plural).

Suggested change
* If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic.
* If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topics.

Copilot uses AI. Check for mistakes.

CLI:
```
pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent quote style: The CLI example uses a double quote at the end but no opening quote. Either remove the trailing quote or add both opening and closing quotes around the entire command for consistency.

Suggested change
pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2"
pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2

Copilot uses AI. Check for mistakes.
* @param topicsPattern Optional regex pattern provided by the client.
* @param properties Context properties provided by the client.
* @return A CompletableFuture containing the result:
* If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic.
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Javadoc reference syntax: The Javadoc reference {@code Optional#empty} should be {@code Optional.empty()} or {@link Optional#empty()} to follow proper Javadoc conventions. The hash symbol is correct for linking to methods, but when using @code, it should show the actual method call syntax with parentheses, or use @link for proper cross-referencing.

Suggested change
* If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic.
* If the future completes with {@code Optional.empty()}, proceed to the next interceptor or broker's default logic to list all topic.

Copilot uses AI. Check for mistakes.
Comment on lines +149 to +154
// Wrap List<String> into TopicListingResult(topics, filtered=false) for unified processing
return getBrokerService().pulsar().getNamespaceService()
.getListOfUserTopics(namespaceName, mode)
.thenApply(topics -> new TopicListingResult(topics, false));
}
}).thenCompose(listingResult -> {
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent wrapping behavior: The code shows wrapping the default namespace service result in TopicListingResult(topics, false) at line 152, but the interceptor is expected to return Optional&lt;TopicListingResult&gt;. The logic then unwraps this at line 144-154, but the code flow shows that line 152 returns a TopicListingResult directly, not wrapped in an Optional. This should return Optional.of(new TopicListingResult(topics, false)) to maintain type consistency.

Suggested change
// Wrap List<String> into TopicListingResult(topics, filtered=false) for unified processing
return getBrokerService().pulsar().getNamespaceService()
.getListOfUserTopics(namespaceName, mode)
.thenApply(topics -> new TopicListingResult(topics, false));
}
}).thenCompose(listingResult -> {
// Wrap List<String> into TopicListingResult(topics, filtered=false) inside an Optional for unified processing
return getBrokerService().pulsar().getNamespaceService()
.getListOfUserTopics(namespaceName, mode)
.thenApply(topics -> Optional.of(new TopicListingResult(topics, false)));
}
}).thenCompose(listingResultOpt -> {
TopicListingResult listingResult = listingResultOpt.get();

Copilot uses AI. Check for mistakes.
Comment on lines +142 to +154
return interceptorFuture.thenCompose(interceptorResult -> {
// Decision branch: Use the interceptor result OR fall back to the default logic
if (interceptorResult != null && interceptorResult.isPresent()) {
// case A: The interceptor has taken over the request
return CompletableFuture.completedFuture(interceptorResult);
} else {
// case B: The interceptor did not handle, so the original query logic is executed.
// Wrap List<String> into TopicListingResult(topics, filtered=false) for unified processing
return getBrokerService().pulsar().getNamespaceService()
.getListOfUserTopics(namespaceName, mode)
.thenApply(topics -> new TopicListingResult(topics, false));
}
}).thenCompose(listingResult -> {
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type inconsistency in chain: The return type handling is problematic. Line 142 shows interceptorFuture.thenCompose(interceptorResult -> {...}) where interceptorResult is of type Optional&lt;TopicListingResult&gt;. Lines 146 and 152 appear to return different types - line 146 returns CompletableFuture.completedFuture(interceptorResult) (a CompletableFuture&lt;Optional&lt;TopicListingResult&gt;&gt;), while line 152 returns a TopicListingResult directly without Optional wrapping. Then line 154 expects listingResult to be a TopicListingResult. This type chain is broken and needs to be fixed.

Copilot uses AI. Check for mistakes.

This implementation limits the flexibility required for complex multi-tenant scenarios:

No Client Context: The broker cannot distinguish who is asking for the topics or why. It cannot filter topics based on client properties (This properties may be related to topic properties).
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar issue: The parenthetical remark should be clearer. Consider revising to "These properties may be related to topic properties" or better yet, clarify what type of properties are being referenced and their relationship to topic properties.

Suggested change
No Client Context: The broker cannot distinguish who is asking for the topics or why. It cannot filter topics based on client properties (This properties may be related to topic properties).
No Client Context: The broker cannot distinguish who is asking for the topics or why. It cannot filter topics based on client properties (these client properties may correspond to, or be derived from, topic properties).

Copilot uses AI. Check for mistakes.
Comment on lines +85 to +117
package org.apache.pulsar.broker.intercept;

import java.util.Optional;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import java.util.concurrent.CompletableFuture;

public interface BrokerInterceptor extends AutoCloseable {

// ... existing methods ...

/**
* Intercept the GetTopicsOfNamespace request.
* <p>
* This method allows plugins to override the default topic discovery logic (ZooKeeper scan).
* It enables fetching topics from external sources (e.g., databases, other metadata stores)
* based on the provided client context properties.
*
* @param namespace The namespace being queried.
* @param mode The query mode (PERSISTENT, NON_PERSISTENT, or ALL).
* @param topicsPattern Optional regex pattern provided by the client.
* @param properties Context properties provided by the client.
* @return A CompletableFuture containing the result:
* If the future completes with {@code Optional#empty}, proceed to the next interceptor or broker's default logic to list all topic.
*/
default CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace(
NamespaceName namespace,
CommandGetTopicsOfNamespace.Mode mode,
Optional<String> topicsPattern,
Map<String, String> properties) {
return CompletableFuture.completedFuture(Optional.empty());
}
}
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing import statement: The code snippet references Map interface but doesn't include the required import statement. Add import java.util.Map; to the imports section.

Copilot uses AI. Check for mistakes.
Comment on lines +156 to +162
long actualSize = TopicListMemoryLimiter.estimateTopicListSize(listingResult.getTopics());
listSizeHolder.updateSize(actualSize);

return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
isPermitRequestCancelled, permits -> {

List<String> finalTopics = listingResult.getTopics();
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing method accessor: The code calls listingResult.getTopics(), but TopicListingResult is defined as a Java record with a field named topics (line 73). For Java records, the accessor method should be topics() not getTopics().

Suggested change
long actualSize = TopicListMemoryLimiter.estimateTopicListSize(listingResult.getTopics());
listSizeHolder.updateSize(actualSize);
return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
isPermitRequestCancelled, permits -> {
List<String> finalTopics = listingResult.getTopics();
long actualSize = TopicListMemoryLimiter.estimateTopicListSize(listingResult.topics());
listSizeHolder.updateSize(actualSize);
return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
isPermitRequestCancelled, permits -> {
List<String> finalTopics = listingResult.topics();

Copilot uses AI. Check for mistakes.
isPermitRequestCancelled, permits -> {

List<String> finalTopics = listingResult.getTopics();
boolean hasAppliedFilter = listingResult.isFiltered();
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing method accessor: The code calls listingResult.isFiltered(), but TopicListingResult is defined as a Java record with a field named filtered (line 73). For Java records, the accessor method should be filtered() not isFiltered(). Note that for boolean fields in records, the accessor is simply the field name, not prefixed with 'is' or 'get'.

Suggested change
boolean hasAppliedFilter = listingResult.isFiltered();
boolean hasAppliedFilter = listingResult.filtered();

Copilot uses AI. Check for mistakes.

REST API:
```
GET /admin/v2/persistent/{tenant}/{namespace} with a query params properties=k1=v1,k2=v2
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear API design: The REST API format shows properties=k1=v1,k2=v2 at line 54, but it's not clear how this query parameter should be parsed. Is it a single parameter with comma-separated key-value pairs, or multiple parameters? The CLI example at line 59 shows -p k1=v1 -p k2=v2 which suggests multiple parameters. The REST API documentation should clarify the expected format and whether URL encoding is required for values.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants