主題管理
創建主題
public void create(String topic, int partitions, int replication, Map<string> configs) throws Exception {
// 為了兼容性增加一層副本系數和節點數量的判斷
if (replication > getBrokerNums())
throw new RuntimeException("副本系數不能大於broker節點數量");
short replication_short = (short) replication;
NewTopic newTopic = new NewTopic(topic, partitions, replication_short);
// 創建主題的相關配置
if (null != configs && configs.size() > 0)
newTopic.configs(configs);
CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));
result.all().get(timeout, TimeUnit.SECONDS);
}
/<string>
修改主題
public void update(String topic, List<alterconfigop> alterConfigOps) throws Exception {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
Map<configresource>> configs = new HashMap<>();
configs.put(resource, alterConfigOps);
adminClient.incrementalAlterConfigs(configs).all().get(timeout, TimeUnit.SECONDS);
}
// 新增、更新的參數
// ConfigEntry configEntry = new ConfigEntry(property.getKey(), property.getValue());
// AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);
// 刪除的參數
// ConfigEntry configEntry = new ConfigEntry(deleteProperty, null);
// AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.DELETE);
/<configresource>/<alterconfigop>
刪除主題
public void delete(String topic) throws Exception {
// 服務端server.properties需要設置delete.topic.enable=true,才可以使用同步刪除,否則只是將主題標記為刪除
adminClient.deleteTopics(Arrays.asList(topic));
}
列出主題
public Set<string> list() throws Exception {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<string> topics = listTopicsResult.names().get(timeout, TimeUnit.SECONDS);
return topics;
}
/<string>/<string>
描述主題
public TopicDescription describe(String topic) throws Exception {
TopicDescription description = adminClient.describeTopics(Arrays.asList(topic)).all()
.get(timeout, TimeUnit.SECONDS).get(topic);
return description;
}
分區管理
列出分區
public List<integer> partitions(String topic) throws Exception {
List<topicpartitioninfo> partitionInfos = describe(topic).partitions();
List<integer> result = new ArrayList<>();
for (TopicPartitionInfo partitionInfo : partitionInfos) {
result.add(partitionInfo.partition());
}
return result;
}
public List<topicpartition> topicPartitions(String topic) throws Exception {
List<topicpartitioninfo> partitionInfos = describe(topic).partitions();
List<topicpartition> result = new ArrayList<>();
for (TopicPartitionInfo partitionInfo : partitionInfos) {
result.add(new TopicPartition(topic, partitionInfo.partition()));
}
return result;
}
/<topicpartition>/<topicpartitioninfo>/<topicpartition>/<integer>/<topicpartitioninfo>/<integer>
新增分區
public void addPartitions(String topic, Integer numPartitions) throws Exception {
NewPartitions newPartitions = NewPartitions.increaseTo(numPartitions);
Map<string> map = new HashMap<>(1, 1);
map.put(topic, newPartitions);
adminClient.createPartitions(map).all().get(timeout, TimeUnit.SECONDS);
}
/<string>
本文由博客一文多發平臺 https://openwrite.cn?from=article_bottom 發佈!
閱讀更多 走了關門的耳朵 的文章
關鍵字: List adminClient 管理