Kafka-消費者訂閱主題

消費者訂閱

消費者通過 subscribe() 和 assign() 兩種方式訂閱主題

subscribe()

使用 subscribe() 可以訂閱一個或多個主題,對於這個方法而言,可以以集合的方式訂閱多個主題,也可以以正則表達式的形式訂閱特定模式的主題。

subscribe 的幾個重載方法如下:

public void subscribe(Collection<string> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
/<string>

對於消費者以集合的方式訂閱主題,如果前後兩次訂閱了不同的主題,以最後一次的訂閱為準,前面的訂閱都會失效,如:

consumer.subscribe(Arrays.asList("topic1"));
consumer.subscribe(Arrays.asList("topic2"));

上述的示例,最終訂閱的是 topic2,由此可以看出來 topic1 訂閱失效了。

如果以正則表達式的方式訂閱主題,在之後的過程中,如果新創建了新的主題,並且主題的名稱與正則表達式相匹配,那麼這個消費者就可以消費到這個新添加的主題中的消息。

示例如下:

consumer.subscribe(Pattern.compile("superz-*"));

assign()

使用 assign() 可以指定訂閱的主題分區

assign() 的方法定義如下:

public void assign(Collection<topicpartition> partitions)
/<topicpartition>

兩者的區別

分區分配的區別:

  • subscribe() 是有 Kafka 內部算法為消費者自動分配分區
  • assign() 則需要開發者手動為消費者指定消費的分區

通過分區分配的區別可以看出來,採用 subscribe() 方式的訂閱,多個消費者之間消費的消息不會重複,且所有消費者消費的消息是一個主題的全部消息;但使用 assign() 方式的訂閱,在位移未提交的情況下,多個消費者訂閱相同的主題分區,消費到的消息是完全一樣的。

建議:assign() 與 subscribe() 不要混用

因為 assign() 、subscribe() 訂閱並配置 enable.auto.commit=true 的情況下,poll() 會提交偏移量,這樣會造成 assign()、subscribe() 都會對同一個主題分區提交偏移量,這樣的偏移量對其中的一些訂閱是有問題的。

消費者取消訂閱

在 KafkaConsumer 中可以使用 unsubscribe() 方法來取消主題的訂閱。

這個方法可以取消如下的訂閱方式:

  • 以 subscribe(Collection) 方式實現的訂閱
  • 以 subscribe(Pattern) 方式實現的訂閱
  • 以 assign() 方式實現的訂閱

使用方式的示例如下:

consumer.unsubscribe();

如果將 subscribe(Collection) 或 assign() 中的集合參數設置為空集合,也可以實現取消訂閱,以下三種方式都可以取消訂閱:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<string>());
consumer.assign(new ArrayList<topicpartition>());
/<topicpartition>/<string>

本文由博客一文多發平臺 https://openwrite.cn?from=article_bottom 發佈!


分享到:


相關文章: