KafkaAdminClient-主題分區管理

主題管理

創建主題

public void create(String topic, int partitions, int replication, Map<string> configs) throws Exception {
// 為了兼容性增加一層副本系數和節點數量的判斷
if (replication > getBrokerNums())
throw new RuntimeException("副本系數不能大於broker節點數量");
short replication_short = (short) replication;
NewTopic newTopic = new NewTopic(topic, partitions, replication_short);
// 創建主題的相關配置
if (null != configs && configs.size() > 0)
newTopic.configs(configs);
CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));
result.all().get(timeout, TimeUnit.SECONDS);
}
/<string>

修改主題

public void update(String topic, List<alterconfigop> alterConfigOps) throws Exception {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
Map<configresource>> configs = new HashMap<>();
configs.put(resource, alterConfigOps);
adminClient.incrementalAlterConfigs(configs).all().get(timeout, TimeUnit.SECONDS);
}
// 新增、更新的參數
// ConfigEntry configEntry = new ConfigEntry(property.getKey(), property.getValue());
// AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);
// 刪除的參數
// ConfigEntry configEntry = new ConfigEntry(deleteProperty, null);
// AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.DELETE);
/<configresource>/<alterconfigop>

刪除主題

public void delete(String topic) throws Exception {
// 服務端server.properties需要設置delete.topic.enable=true,才可以使用同步刪除,否則只是將主題標記為刪除
adminClient.deleteTopics(Arrays.asList(topic));
}

列出主題

public Set<string> list() throws Exception {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<string> topics = listTopicsResult.names().get(timeout, TimeUnit.SECONDS);
return topics;
}
/<string>/<string>

描述主題

public TopicDescription describe(String topic) throws Exception {
TopicDescription description = adminClient.describeTopics(Arrays.asList(topic)).all()
.get(timeout, TimeUnit.SECONDS).get(topic);
return description;
}

分區管理

列出分區

public List<integer> partitions(String topic) throws Exception {
List<topicpartitioninfo> partitionInfos = describe(topic).partitions();
List<integer> result = new ArrayList<>();
for (TopicPartitionInfo partitionInfo : partitionInfos) {
result.add(partitionInfo.partition());
}
return result;
}
public List<topicpartition> topicPartitions(String topic) throws Exception {
List<topicpartitioninfo> partitionInfos = describe(topic).partitions();
List<topicpartition> result = new ArrayList<>();
for (TopicPartitionInfo partitionInfo : partitionInfos) {
result.add(new TopicPartition(topic, partitionInfo.partition()));
}
return result;
}
/<topicpartition>/<topicpartitioninfo>/<topicpartition>/<integer>/<topicpartitioninfo>/<integer>

新增分區

public void addPartitions(String topic, Integer numPartitions) throws Exception {
NewPartitions newPartitions = NewPartitions.increaseTo(numPartitions);
Map<string> map = new HashMap<>(1, 1);
map.put(topic, newPartitions);
adminClient.createPartitions(map).all().get(timeout, TimeUnit.SECONDS);
}
/<string>

本文由博客一文多發平臺 https://openwrite.cn?from=article_bottom 發佈!


分享到:


相關文章: