Skip to content

Commit

Permalink
Supports evenly distribute topics count when splits bundle (apache#6241)
Browse files Browse the repository at this point in the history
### Motivation

Currently, bundle split splits the bundle into two parts of the same size. When there are fewer topics, bundle split does not work well. The topic assigned to the broker according to the topic name hash value, hashing is not effective in a small number of topics bundle split.

So, this PR introduces an option(-balance-topic-count) for bundle split.  When setting it to true, the given bundle splits to 2 parts, each part has the same amount of topics.

And introduce a new Load Manager implementation named `org.apache.pulsar.broker.loadbalance.impl.BalanceTopicCountModularLoadManager`.  The new Load Manager implementation splits bundle with balance topics count, others are not different from ModularLoadManagerImpl.
  • Loading branch information
codelipenghui committed Feb 13, 2020
1 parent bc1f078 commit 1c099da
Show file tree
Hide file tree
Showing 28 changed files with 510 additions and 160 deletions.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,14 @@ loadBalancerOverrideBrokerNicSpeedGbps=
# Name of load manager to use
loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl

# Supported algorithms name for namespace bundle split.
# "range_equally_divide" divides the bundle into two parts with the same hash range size.
# "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide,topic_count_equally_divide]

# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide

### --- Replication --- ###

# Enable replication metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker;


import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import io.netty.util.internal.PlatformDependent;
Expand Down Expand Up @@ -1204,7 +1205,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Name of load manager to use"
)
private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl";

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Supported algorithms name for namespace bundle split"
)
private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide");
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Default algorithm name for namespace bundle split"
)
private String defaultNamespaceBundleSplitAlgorithm = "range_equally_divide";
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Option to override the auto-detected network interfaces max speed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
Expand Down Expand Up @@ -152,6 +153,15 @@ private static class BrokerStarter {
throw new IllegalArgumentException("Max message size need smaller than jvm directMemory");
}

if (!NamespaceBundleSplitAlgorithm.availableAlgorithms.containsAll(brokerConfig.getSupportedNamespaceBundleSplitAlgorithms())) {
throw new IllegalArgumentException("The given supported namespace bundle split algorithm has unavailable algorithm. " +
"Available algorithms are " + NamespaceBundleSplitAlgorithm.availableAlgorithms);
}

if (!brokerConfig.getSupportedNamespaceBundleSplitAlgorithms().contains(brokerConfig.getDefaultNamespaceBundleSplitAlgorithm())) {
throw new IllegalArgumentException("Supported namespace bundle split algorithms must contains the default namespace bundle split algorithm");
}

// init functions worker
if (starterArguments.runFunctionsWorker || brokerConfig.isFunctionsWorkerEnabled()) {
WorkerConfig workerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -775,7 +776,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat
}

@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload) {
protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);

validateSuperUserAccess();
Expand All @@ -793,19 +794,41 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit
NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);

List<String> supportedNamespaceBundleSplitAlgorithms = pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName) && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are " + supportedNamespaceBundleSplitAlgorithms);
}

try {
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload).get();
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
} catch (IllegalArgumentException e) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalArgumentException) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
bundleRange, e.getMessage());
throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request");
throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request");
} else {
log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e);
throw new RestException(e.getCause());
}
} catch (Exception e) {
log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e);
throw new RestException(e);
}
}

private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.of(pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.rangeEquallyDivide;
}
return algorithm;
}

protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
validateSuperUserAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
Expand Down Expand Up @@ -456,7 +457,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
validateNamespaceName(property, cluster, namespace);
internalSplitNamespaceBundle(bundleRange, authoritative, unload);
internalSplitNamespaceBundle(bundleRange, authoritative, unload, NamespaceBundleSplitAlgorithm.rangeEquallyDivideName);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,10 @@ public void unloadNamespaceBundle(@PathParam("tenant") String tenant, @PathParam
public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName) {
validateNamespaceName(tenant, namespace);
internalSplitNamespaceBundle(bundleRange, authoritative, unload);
internalSplitNamespaceBundle(bundleRange, authoritative, unload, splitAlgorithmName);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public void checkNamespaceBundleSplit() {
}
log.info("Load-manager splitting bundle {} and unloading {}", bundleName, unloadSplitBundles);
pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange,
unloadSplitBundles);
unloadSplitBundles, null);
// Make sure the same bundle is not selected again.
loadData.getBundleData().remove(bundleName);
localData.getLastStats().remove(bundleName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
Expand Down Expand Up @@ -1478,9 +1479,9 @@ public void doNamespaceBundleSplit() throws Exception {
for (String bundleName : bundlesToBeSplit) {
try {
pulsar.getAdminClient().namespaces().splitNamespaceBundle(
LoadManagerShared.getNamespaceNameFromBundleName(bundleName),
LoadManagerShared.getBundleRangeFromBundleName(bundleName),
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled());
LoadManagerShared.getNamespaceNameFromBundleName(bundleName),
LoadManagerShared.getBundleRangeFromBundleName(bundleName),
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), null);
log.info("Successfully split namespace bundle {}", bundleName);
} catch (Exception e) {
log.error("Failed to split namespace bundle {}", bundleName, e);
Expand Down
Loading

0 comments on commit 1c099da

Please sign in to comment.