大數據必須瞭解的Flink實時數據架構

本文從上述現狀及實時數據需求出發,結合工業界案例、筆者的實時數據開發經驗, 梳理總結了實時數據體系建設的總體方案。

隨著互聯網的發展進入下半場,數據的時效性對企業的精細化運營越來越重要, 商場如戰場,在每天產生的海量數據中,如何能實時有效的挖掘出有價值的信息, 對企業的決策運營策略調整有很大幫助。此外,隨著 5G 技術的成熟、廣泛應用, 對於工業互聯網、物聯網等數據時效性要求非常高的行業,企業就更需要一套完整成熟的實時數據體系來提高自身的行業競爭力。

本文從上述現狀及實時數據需求出發,結合工業界案例、筆者的實時數據開發經驗, 梳理總結了實時數據體系建設的總體方案,本文主要分為三個部分:

  • 第一部分主要介紹了當下在工業界比較火熱的實時計算引擎 Flink 在實時數據體系建設過程中主要的應用場景及對應解決方案;
  • 第二部分從實時數據體系架構、實時數據模型分層、實時數據體系建設方式、流批一體實時數據架構發展等四個方面思考了實時數據體系的建設方案;
  • 第三部分則以一個具體案例介紹如何使用 Flink SQL 完成實時數據統計類需求。

一、Flink 實時應用場景

目前看來,Flink 在實時計算領域內的主要應用場景主要可分為四類場景, 分別是實時數據同步、流式 ETL、實時數據分析和複雜事件處理,具體的業務場景和對應的解決方案可詳細研究下圖, 文字層面不再詳述。

ss/202004/28/c751e39d76b71542872d04e0e6971eac.jpg" _fcksavedurl="https://s5.51cto.com/oss/202004/28/c751e39d76b71542872d04e0e6971eac.jpg" target="_blank">

ss/202004/28/858b3962449f8965f92ee85087656206.jpg" _fcksavedurl="https://s4.51cto.com/oss/202004/28/858b3962449f8965f92ee85087656206.jpg" target="_blank">

ss/202004/28/3acd52145f106e9f680cd411a172257b.jpg" _fcksavedurl="https://s3.51cto.com/oss/202004/28/3acd52145f106e9f680cd411a172257b.jpg" target="_blank">

ss="dp-sql">

  • ss="alt">ss="keyword">public class PageViewDeserializationSchema implements DeserializationSchema {
  • ss="alt"> ss="keyword">public ss="keyword">static final Logger LOG = LoggerFactory.getLogger(PageViewDeserializationSchema.class);
  • protected SimpleDateFormat dayFormatter;
  • ss="alt">
  • private final RowTypeInfo rowTypeInfo;
  • ss="alt">
  • ss="keyword">public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo){
  • ss="alt"> dayFormatter = new SimpleDateFormat(ss="string">"yyyyMMdd", Locale.UK);
  • this.rowTypeInfo = rowTypeInfo;
  • ss="alt"> }
  • @Override
  • ss="alt"> ss="keyword">public Row deserialize(byte[] message) throws IOException {
  • Row row = new Row(rowTypeInfo.getArity());
  • ss="alt"> MobilePage mobilePage = ss="op">null;
  • try {
  • ss="alt"> mobilePage = MobilePage.parseFrom(message);
  • String mid = mobilePage.getMid();
  • ss="alt"> row.setField(0, mid);
  • Long timeLocal = mobilePage.getTimeLocal();
  • ss="alt"> String logDate = dayFormatter.format(timeLocal);
  • row.setField(1, logDate);
  • ss="alt"> row.setField(2, timeLocal);
  • }catch (Exception e){
  • ss="alt"> String mobilePageError = (mobilePage != ss="op">null) ? mobilePage.toString() : ss="string">"";
  • LOG.error(ss="string">"error parse bytes payload is {}, pageview error is {}", message.toString(), mobilePageError, e);
  • ss="alt"> }
  • ss="keyword">return ss="op">null;
  • ss="alt"> }

  • 3.2 編寫 Flink Job 主程序

    將 PV 數據解析為 Flink 的 Row 類型後,接下來就很簡單了,編寫主函數,寫 SQL 就能統計 UV 指標了,代碼如下:

    <code>ss="dp-sql">ss="alt">ss="keyword">public class RealtimeUV {  ss="alt">    ss="keyword">public ss="keyword">static void main(String[] args) throws Exception {         //step1 從properties配置文件中解析出需要的Kakfa、Hbase配置信息、ss="keyword">checkpoint參數信息 ss="alt">        Map config = PropertiesUtil.loadConfFromFile(args[0]);         String topic = config.get(ss="string">"source.kafka.topic"); ss="alt">        String groupId = config.get(ss="string">"source.group.id");         String sourceBootStrapServers = config.get(ss="string">"source.bootstrap.servers"); ss="alt">        String hbaseTable = config.get(ss="string">"hbase.table.name");         String hbaseZkQuorum = config.get(ss="string">"hbase.zk.quorum"); ss="alt">        String hbaseZkParent = config.get(ss="string">"hbase.zk.parent");         ss="keyword">int checkPointPeriod = ss="keyword">Integer.parseInt(config.get(ss="string">"checkpoint.period")); ss="alt">        ss="keyword">int checkPointTimeout = ss="keyword">Integer.parseInt(config.get(ss="string">"checkpoint.timeout"));  ss="alt">        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();         //step2 設置ss="keyword">Checkpoint相關參數,用於Failover容錯 ss="alt">        sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class,                 ProtobufSerializer.class); ss="alt">        sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(ss="keyword">false);         sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); ss="alt">        sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE);         sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout); ss="alt">        sEnv.getCheckpointConfig().enableExternalizedCheckpoints(                 CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); ss="alt">         //step3 使用Blink planner、創建TableEnvironment,並且設置狀態過期時間,避免Job OOM ss="alt">        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()                 .useBlinkPlanner() ss="alt">                .inStreamingMode()                 .build(); ss="alt">        StreamTableEnvironment tEnv = StreamTableEnvironment.ss="keyword">create(sEnv, environmentSettings);         tEnv.getConfig().setIdleStateRetentionTime(ss="keyword">Time.days(1), ss="keyword">Time.days(2)); ss="alt">         Properties sourceProperties = new Properties(); ss="alt">        sourceProperties.setProperty(ss="string">"bootstrap.servers", sourceBootStrapServers);         sourceProperties.setProperty(ss="string">"auto.commit.interval.ms", ss="string">"3000"); ss="alt">        sourceProperties.setProperty(ss="string">"group.id", groupId);  ss="alt">        //step4 初始化KafkaTableSource的ss="keyword">Schema信息,筆者這裡使用register TableSource的方式將源表註冊到Flink中,而沒有用register DataStream方式,也是因為想熟悉一下如何註冊KafkaTableSource到Flink中         TableSchema ss="keyword">schema = TableSchemaUtil.getAppPageViewTableSchema(); ss="alt">        Optional proctimeAttribute = Optional.empty();         List rowtimeAttributeDescriptors = Collections.emptyList(); ss="alt">        Map fieldMapping = new HashMap 
    <>();         List columnNames = new ArrayList<>(); ss="alt">        RowTypeInfo rowTypeInfo = new RowTypeInfo(ss="keyword">schema.getFieldTypes(), ss="keyword">schema.getFieldNames());         columnNames.addAll(Arrays.asList(ss="keyword">schema.getFieldNames())); ss="alt">        columnNames.forEach(ss="keyword">name -> fieldMapping.put(ss="keyword">name, ss="keyword">name));         PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema( ss="alt">                rowTypeInfo);         Map specificOffsets = new HashMap<>(); ss="alt">        Kafka011TableSource kafkaTableSource = new Kafka011TableSource(                 ss="keyword">schema, ss="alt">                proctimeAttribute,                 rowtimeAttributeDescriptors, ss="alt">                Optional.ss="keyword">of(fieldMapping),                 topic, ss="alt">                sourceProperties,                 deserializationSchema, ss="alt">                StartupMode.EARLIEST,                 specificOffsets); ss="alt">        tEnv.registerTableSource(ss="string">"pageview", kafkaTableSource);  ss="alt">        //step5 初始化Hbase TableSchema、寫入參數,並將其註冊到Flink中         HBaseTableSchema hBaseTableSchema = new HBaseTableSchema(); ss="alt">        hBaseTableSchema.setRowKey(ss="string">"log_date", String.class);         hBaseTableSchema.addColumn(ss="string">"f", ss="string">"UV", Long.class); ss="alt">        HBaseOptions hBaseOptions = HBaseOptions.builder()                 .setTableName(hbaseTable) ss="alt">                .setZkQuorum(hbaseZkQuorum)                 .setZkNodeParent(hbaseZkParent) ss="alt">                .build();         HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder() ss="alt">                .setBufferFlushMaxRows(1000)                 .setBufferFlushIntervalMillis(1000) ss="alt">                .build();         HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions); ss="alt">        tEnv.registerTableSink(ss="string">"uv_index", hBaseSink);  ss="alt">        //step6 實時計算當天UV指標sql, 這裡使用最簡單的ss="keyword">group ss="keyword">by agg,沒有使用minibatch或窗口,在大數據量優化時最好使用後兩種方式         String uvQuery = ss="string">"insert into uv_index " ss="alt">                + ss="string">"select log_date,\n"                 + ss="string">"ROW(count(distinct mid) as UV)\n" ss="alt">                + ss="string">"from pageview\n"                 + ss="string">"group by log_date"; ss="alt">        tEnv.sqlUpdate(uvQuery);         //step7 執行Job ss="alt">        sEnv.ss="keyword">execute(ss="string">"UV Job");     } ss="alt">} /<code>

    以上就是一個簡單的使用 Flink SQL 統計 UV 的 case, 代碼非常簡單,只需要理清楚如何解析 Kafka 中數據,如何初始化 Table Schema,以及如何將表註冊到 Flink中,即可使用 Flink SQL 完成各種複雜的實時數據統計類的業務需求,學習成本比API 的方式低很多。說明一下,筆者這個 demo 是基於目前業務場景而開發的,在生產環境中可以真實運行起來,可能不能拆箱即用,你需要結合自己的業務場景自定義相應的 kafka 數據解析類。


    分享到:


    相關文章: