要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了

場景描述:北京有很多電動車,這些車都會定時地向一個服務器發送狀態信息,這些信息可能包括:車的id、發送 時間、車的位置(經緯度)、車的速度、剩餘電量等等。有了這些信息我們可以做很多事情,比如:計算車 的軌跡、出租車的運行規律、電量維持時間等等。

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了

一、kafka到底在怎樣的應用場景下使用?

在類似這樣的場景下,項目開發中的數據量很大,一天上千萬,最初,數據存在HBase,我們想替換掉HBase ,原因如下:

1、數據量大了後,HBase運維成本很高

2、數據統計一般在Hive中進行,導致數據有一天的延時

那麼可實行的方案就是:用Kafka兜住熱數據,然後定時以 microbatch 的方式將數據落地到HDFS

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了

效果演示

回退環境

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了


MQ 選型

問:RocketMQ 異常優秀。是不是直接選用 RocketMQ?

答:RocketMQ 是在 Kafka 的基礎上重寫的,保留了 Kafka durable 機制、集群優勢,犧牲了一些 吞吐量,換取了更好的 數據可靠性。我們這個場景要求的就是吞吐量。

Kafka 更適合密集的數據,RocketMQ適合稀疏的數據:

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了


結論:

業務場景:用RocketMQ

數據場景:1、一般用 Kafka,2個例外:

》若有大量小 Topic,用 RocketMQ

》若對數據可靠性要求極高,用 RocketMQ

二、Kafka 基礎

1 Topic

Kafka對數據進行劃分唯一的邏輯單元

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了

2 、架構速覽

問:這樣的架構,能否保證 Topic 中數據的順序?

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了


三、Kafka集群搭建

要進行這樣一個方案,我們首先需要一個Kafka集群,畢竟巧婦難為無米之炊

現在就帶著搭建一個生產級別的Kafka

今天帶著大家全手動搭建集群,這樣可以對集群原理有更好的認識

1、 安裝JDK8

JDK自行解決

2、 ZK 安裝

Kafka的元數據全部放在ZK上,Kafka強依賴ZK,所以PROD上轉kafka,要先裝ZK

<code>#統一各機器的時鐘

date -s 'Fri Nov 1 11:17:46 CST 2019'

#上傳安裝包

#解壓縮

tar -zxvf kafka_2.11-2.2.1.tgz

tar -zxvf zookeeper-3.4.13.tar.gz

#創建數據目錄

mkdir -p data/zookeeper/

mkdir -p data/kafka

  

cp zoo_sample.cfg zoo.cfg

vi zoo.cfg/<code>
<code># The number of milliseconds of each tick

tickTime=2000

# The number of ticks that the initial

# synchronization phase can take

initLimit=10

syncLimit=5

# example sakes.

dataDir=/home/zk/data/zookeeper

#change

# the port at which the clients will connect

clientPort=2181

server.1=192.168.90.131:8880:7770 #add

server.2=192.168.90.132:8880:7770 #add

server.3=192.168.90.133:8880:7770 #add/<code>
<code>#創建日誌目錄

mkdir -p /home/zk/zookeeper-3.4.13/logs

#指定日誌目

vi zkEnv.sh 添加如下行:

ZOO_LOG_DIR=/home/zk/zookeeper-3.4.13/logs/<code>
<code>#分發 安裝包

cd /home/zk/

scp -r zookeeper-3.4.13 192.168.90.132:`pwd`

scp -r zookeeper-3.4.13 192.168.90.133:`pwd`

#每臺機器配置 myid

cd /home/zk/data/zookeeper/

echo "1" > myid #在第1臺機器執行

echo "2" > myid #在第2臺機器執行

echo "3" > myid #在第3臺機器執行

 

#啟動ZK,每臺機器執行:

cd /home/zk/zookeeper-3.4.13

bin/zkServer.sh start
 

#檢查集群狀態

bin/zkServer.sh status

集群狀態為 leader 或 follower,則集群正常/<code>

3、Kafka 安裝

<code>#分發kafka安裝包

scp -r kafka_2.11-2.2.1 192.168.90.132:`pwd`

scp -r kafka_2.11-2.2.1 192.168.90.133:`pwd/<code>

修改 每臺機器,
confifig/server.properties

<code>broker.id=0

其他機器改為為1、2

log.dir=/home/zk/data/kafka listeners=PLAINTEXT://zkserver1:9092

zkserver1改為其他機器相應的 hostname

啟動kafka,每臺機器執行:

bin/kafka-server-start.sh config/server.properties &/<code>

5、測試Kafka

<code>#創建topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 2

#生產

bin/kafka-console-producer.sh --broker-list 192.168.90.131:9092 --topic test

#消費

bin/kafka-console-consumer.sh --bootstrap-server 192.168.90.131:9092 --topic test/<code>

四、producer端


1、 創建項目

創建項目,指定 compiler

<code>

1.8

1.8

/<code>

2、確定數據結構

<code>import java.sql.Date;

public class Electrocar {
    private String id;

    //數據發送時間
    private Date time;

    //經度
    private double longitude;
    private double latitude;

    //速度
    private double speed;

    //剩餘電量
    private double dump_energy;


    //構造函數,用於快速構造數據
    public Electrocar(String id,
                      Date time,
                      double longitude,
                      double latitude,
                      double speed,
                      double dump_energy){
        this.id = id;
        this.time = time;
        this.longitude = longitude;
        this.speed = speed;
        this.dump_energy = dump_energy;
    }


    //生成getter方法,不生成setter方法

    public String getId() {
        return id;
    }

    public Date getTime() {
        return time;
    }

    public double getLongitude() {
        return longitude;
    }

    public double getLatitude() {
        return latitude;
    }

    public double getSpeed() {
        return speed;
    }

    public double getDump_energy() {
        return dump_energy;
    }

}/<code>

2、生成數據

<code>public class CarDataSource {

    public static void main(String args[]) throws InterruptedException {

        while (true){

            ElectroCar car = nextRecord();  //生成數據

 

            System.out.println(String.format("%s|%f|%f", car.getId(), car.getLatitude(), car.getLongitude()));

            Thread.sleep(200);

        }

    }

    public static ElectroCar nextRecord(){

        //定義random,用於生成隨機值

        Random random = new Random();

 

        //構建 ElectroCar對象

        ElectroCar car = new ElectroCar(

                random.nextInt(10) + "",

                new Date(System.currentTimeMillis()),

                random.nextFloat(),

                random.nextFloat(),

                random.nextFloat(),

                random.nextFloat()

        );

        return car;

    } 

}/<code> 

3、producer 官網示例

<code>

            org.apache.kafka

            kafka-clients

            2.2.0

        /<code>
<code>Properties props = new Properties();

 props.put("bootstrap.servers", "localhost:9092");

 props.put("acks", "all");

 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 

 Producer producer = new KafkaProducer<>(props);

 for (int i = 0; i ("my-topic", Integer.toString(i), Integer.toString(i)));

 

 producer.close();/<code>

  

4 創建topic

<code>bin/kafka-topics.sh --create \

  --bootstrap-server 192.168.90.131:9092 \

  --replication-factor 3 \

  --partitions 3 \

  --topic electrocar/<code>

5 數據格式

思考:應該以什麼格式將數據 publish 到 Kafka? json不好, 要用二進制

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了

ObjectBinary測試

<code>public class ObjectBinaryUtil {

 

    public static void main(String args[]){

        Electrocar car = CarDataSource.nextRecord();

 

        byte[] arr = null;

 

        //將Car obj output 為byte[]

        //ByteArray輸出

        ByteArrayOutputStream bos = new ByteArrayOutputStream();

        try {

            //將oos輸出到bos

            ObjectOutputStream oos = new ObjectOutputStream(bos);

 

            //對象輸出到oos

            oos.writeObject(car);

 

            //獲取byte[]

            arr = bos.toByteArray();

            System.out.println("arr.length :" + arr.length);

        } catch (IOException e) {

            e.printStackTrace();

        }

        //將byte[] 轉成 obj

        //接受arr輸入

        ByteArrayInputStream bis = new ByteArrayInputStream(arr);

        try {

            //bis 轉為ObjectInput

            ObjectInputStream ois = new ObjectInputStream(bis);

            //從ObjectInput 讀取Obj

            Electrocar car1 = (Electrocar) ois.readObject();

 

            System.out.println("++++" + car.getLatitude());

        } catch (IOException e) {

            e.printStackTrace();

        } catch (ClassNotFoundException e) {

            e.printStackTrace();

        }

 

    }/<code>

ObjectBinearyUtil 封裝

<code>//Object to byte[]

    public static byte[] toBinary(Object obj){

        //將Car obj output 為byte[]

        //ByteArray輸出

        ByteArrayOutputStream bos = new ByteArrayOutputStream();

        ObjectOutputStream oos = null;

        try {

            //將oos輸出到bos

            oos = new ObjectOutputStream(bos);

 

            //對象輸出到oos

            oos.writeObject(obj);

 

            //獲取byte[]

            return bos.toByteArray();

        } catch (IOException e) {

            e.printStackTrace();

        }finally {

            if (bos !=null){

                try {

                    bos.close();

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

            if (oos !=null){

                try {

                    oos.close();

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

        return null;

    }


    //byte[] to Object

    public static Object toObject(byte[] arr){

        //將byte[] 轉成 obj

        //接受arr輸入

        ByteArrayInputStream bis = new ByteArrayInputStream(arr);

        ObjectInputStream ois = null;

        try {

            //bis 轉為ObjectInput

            ois = new ObjectInputStream(bis);

            //從ObjectInput 讀取Obj

            return ois.readObject();


        } catch (IOException e) {

            e.printStackTrace();

        } catch (ClassNotFoundException e) {

            e.printStackTrace();

        }finally {

            if (bis!=null){

                try {

                    bis.close();

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

            if (ois !=null){

                if (ois !=null){

                    try {

                        ois.close();

                    } catch (IOException e) {

                        e.printStackTrace();

                    }

                }

            }

        }

        return null;

    }
/<code>

6、消息順序

思考:消息的順序丟失了,怎麼辦? 將相同id的數據放到同一個partition

<code>while (true){

            Electrocar car = nextRecord();

            byte[] carBinary = ObjectBinaryUtil.toBinary(car);   

 

            ProducerRecord record = new ProducerRecord(

                    "electrocar",

                    car.getId(),    //通過傳入carId,來保證消息的順序

                    carBinary);

            producer.send(record);

 

            Thread.sleep(200);

            System.out.println("published...");

        }

  /<code>

五、consumer 傳統方式

group.id

Kafka 中有一個消費者集群的概念,我們將其稱之為consumer group。

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了

auto.commit

1、問:consumer 重啟時,應該從何處開始繼續消費?

答:從關閉時的 offset開始消費,這就要 實時記錄消費進度

2、enable.auto.commit=true時,由 consumer 自動提交,false時手動提交

1

consumer.commitAsync(); //手動提交API

  

3、問: offset 提交到哪裡了呢?

答:在 offset早期,提交到ZK,提交到系統級別的topic

4、存在數據數據一致性問題

能夠理解的同學扣個1,不理解的扣個2

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了


exactly-once 方案

方案總述

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了


消費kafka

<code>//創建 demo2

//實例化consumer從demo1處拷貝

//修改數據類型 
KafkaConsumer consumer
ByteArrayDeserializer

//沒有 commit offset,不能用subscribe 方法
        List partitions = new ArrayList<>();
        for (int i=0; i<3; i++){
            //構建partition 對象
            TopicPartition p = new TopicPartition(topic, i);
            partitions.add(p);
        }

        //指定,當前consuer具體消費哪幾個paritions
        consumer.assign(partitions);/<code>

seek到具體Offset

重啟consumer時,要從MySQL中獲取offset,

根據該offset開始消費 toipic,

就要知道如何跳轉到 具體的 offset

<code>for (TopicPartition p : partitions){
            consumer.seek(p, 20);       //將partition seek到具體的offset開始消費
        }/<code>

建MySQL表

<code>CREATE TABLE `electrocar` (

  `topic` varchar(20) DEFAULT NULL,

  `pid` int(11) DEFAULT NULL,

  `offset` mediumtext,

  `id` int(11) DEFAULT NULL,

  `timestamp` date DEFAULT NULL,

  `longitude` float DEFAULT NULL,

  `latitude` float DEFAULT NULL,

  `speed` float DEFAULT NULL,

  `dump_energy` float DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8/<code>

落地數據

<code>//引入JdbcHelper

#創建連接

JdbcHelper jdbcHelper = new JdbcHelper("jdbc:mysql://192.168.90.131:3306/kafka", "kafka", "kafka");

Connection conn = jdbcHelper.getConnection();

System.out.println("MySQL conn inited...");

 

Statement stat = null;           //創建會話

try {

    stat = conn.createStatement();

    while (true) {                  //循環執行poll方法

        //到服務端拉取消息,得到一個集合

        ConsumerRecords records = consumer.poll(Duration.ofMillis(100));

 

        if (records.count() >0){    //有消息,才insert

            //將records 轉成 批量插入的SQL語句

            String sql = records2SQL(records);

            stat.execute(sql);

            System.out.println("inserted...");

        }else {

            System.out.println("no record...");

        }

    }

} catch (SQLException e) {

    e.printStackTrace();

}/<code>

records轉SQL

<code>public static String records2SQL(ConsumerRecords records){
        StringBuilder sb = new StringBuilder();

        sb.append("INSERT INTO kafka.electrocar VALUES ");

        Iterator itr = records.iterator();

        while (itr.hasNext()){
            ConsumerRecord record = (ConsumerRecord)itr.next();
            Electrocar car = (Electrocar) ObjectBinaryUtil.toObject(record.value());

            String strDateFormat = "yyyy-MM-dd HH:mm:ss";
            SimpleDateFormat sdf = new SimpleDateFormat(strDateFormat);
            String time = sdf.format(car.getTime());

            String sqlPiece = String.format("('%s',%d,%d,%s,'%s',%f,%f,%f,%f)",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    car.getId(),
                    time,
                    car.getLongitude(),
                    car.getLatitude(),
                    car.getSpeed(),
                    car.getDump_energy());

            sb.append(sqlPiece);

            if (itr.hasNext()){
                sb.append(",");
            }

        }

        //System.out.println(sb.toString());
        return sb.toString();
    }/<code>
<code>
      mysql
      mysql-connector-java
      5.1.25
    

    import com.mysql.jdbc.Driver;/<code>

封裝成通用工具

1、創建 ExactOnceConsumer

現在還只是一個demo,只能用於electrocar topic的消費,現在我們將其封裝成一個小框架,讓他能夠經過極少量的開發,就能消費其他的topic

2、重構

關注公眾號:艾編程,看完整本課程視頻+資料


分享到:


相關文章: