前言
阿里的雙11銷量大屏可以說是一道特殊的風景線。實時大屏(real-time dashboard)正在被越來越多的企業採用,用來及時呈現關鍵的數據指標。並且在實際操作中,肯定也不會僅僅計算一兩個維度。由於Flink的“真·流式計算”這一特點,它比Spark Streaming要更適合大屏應用。本文從筆者的實際工作經驗抽象出簡單的模型,並簡要敘述計算流程(當然大部分都是源碼)。
數據格式與接入
簡化的子訂單消息體如下。
<code>{
"userId": 234567,
"orderId": 2902306918400,
"subOrderId": 2902306918401,
"siteId": 10219,
"siteName": "site_blabla",
"cityId": 101,
"cityName": "北京市",
"warehouseId": 636,
"merchandiseId": 187699,
"price": 299,
"quantity": 2,
"orderStatus": 1,
"isNewOrder": 0,
"timestamp": 1572963672217
}/<code>
由於訂單可能會包含多種商品,故會被拆分成子訂單來表示,每條JSON消息表示一個子訂單。現在要按照自然日來統計以下指標,並以1秒的刷新頻率呈現在大屏上:
每個站點(站點ID即siteId)的總訂單數、子訂單數、銷量與GMV;
當前銷量排名前N的商品(商品ID即merchandiseId)與它們的銷量。
由於大屏的最大訴求是實時性,等待遲到數據顯然不太現實,因此我們採用處理時間作為時間特徵,並以1分鐘的頻率做checkpointing。
<code>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig.setCheckpointTimeout(30 * 1000);/<code>
然後訂閱Kafka的訂單消息作為數據源。
<code>Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");
DataStream<string> sourceStream = env
.addSource(new FlinkKafkaConsumer011<>(
ORDER_EXT_TOPIC_NAME, // topic
new SimpleStringSchema, // deserializer
consumerProps // consumer properties
))
.setParallelism(PARTITION_COUNT)
.name("source_kafka_" + ORDER_EXT_TOPIC_NAME)
.uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);/<string>/<code>
給帶狀態的算子設定算子ID(通過調用uid()方法)是個好習慣,能夠保證Flink應用從保存點重啟時能夠正確恢復狀態現場。為了儘量穩妥,Flink官方也建議為每個算子都顯式地設定ID,參考:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job接下來將JSON數據轉化為POJO,JSON框架採用FastJSON。
<code>DataStream<suborderdetail> orderStream = sourceStream
.map(message -> JSON.parseObject(message, SubOrderDetail.class))
.name("map_sub_order_detail").uid("map_sub_order_detail");/<suborderdetail>/<code>
JSON已經是預先處理好的標準化格式,所以POJO類SubOrderDetail的寫法可以通過Lombok極大地簡化。如果JSON的字段有不規範的,那麼就需要手寫Getter和Setter,並用@JSONField註解來指明。
<code>@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class SubOrderDetail implements Serializable {
private static final long serialVersionUID = 1L;
private long userId;
private long orderId;
private long subOrderId;
private long siteId;
private String siteName;
private long cityId;
private String cityName;
private long warehouseId;
private long merchandiseId;
private long price;
private long quantity;
private int orderStatus;
private int isNewOrder;
private long timestamp;
}/<code>
統計站點指標
將子訂單流按站點ID分組,開1天的滾動窗口,並同時設定ContinuousProcessingTimeTrigger觸發器,以1秒週期觸發計算。注意處理時間的時區問題,這是老生常談了。
<code>WindowedStream<suborderdetail> siteDayWindowStream = orderStream
.keyBy("siteId")
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));/<suborderdetail>/<code>
接下來寫個聚合函數。
<code>DataStream<orderaccumulator> siteAggStream = siteDayWindowStream
.aggregate(new OrderAndGmvAggregateFunc)
.name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");/<orderaccumulator>/<code>
<code>public static final class OrderAndGmvAggregateFunc
implements AggregateFunction<suborderdetail> {
private static final long serialVersionUID = 1L;
@Override
public OrderAccumulator createAccumulator {
return new OrderAccumulator;
}
@Override
public OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {
if (acc.getSiteId == 0) {
acc.setSiteId(record.getSiteId);
acc.setSiteName(record.getSiteName);
}
acc.addOrderId(record.getOrderId);
acc.addSubOrderSum(1);
acc.addQuantitySum(record.getQuantity);
acc.addGmv(record.getPrice * record.getQuantity);
return acc;
}
@Override
public OrderAccumulator getResult(OrderAccumulator acc) {
return acc;
}
@Override
public OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {
if (acc1.getSiteId == 0) {
acc1.setSiteId(acc2.getSiteId);
acc1.setSiteName(acc2.getSiteName);
}
acc1.addOrderIds(acc2.getOrderIds);
acc1.addSubOrderSum(acc2.getSubOrderSum);
acc1.addQuantitySum(acc2.getQuantitySum);
acc1.addGmv(acc2.getGmv);
return acc1;
}
}/<suborderdetail>/<code>
累加器類OrderAccumulator的實現很簡單,看源碼就大概知道它的結構了,因此不再多廢話。唯一需要注意的是訂單ID可能重複,所以需要用名為orderIds的HashSet來保存它。HashSet應付我們目前的數據規模還是沒太大問題的,如果是海量數據,就考慮換用HyperLogLog吧。
接下來就該輸出到Redis供呈現端查詢了。這裡有個問題:一秒內有數據變化的站點並不多,而ContinuousProcessingTimeTrigger每次觸發都會輸出窗口裡全部的聚合數據,這樣做了很多無用功,並且還會增大Redis的壓力。所以,我們在聚合結果後再接一個ProcessFunction,代碼如下。
<code>DataStream<tuple2>> siteResultStream = siteAggStream
.keyBy(0)
.process(new OutputOrderGmvProcessFunc, TypeInformation.of(new TypeHint<tuple2>> {}))
.name("process_site_gmv_changed").uid("process_site_gmv_changed");/<tuple2>/<tuple2>/<code>
<code>public static final class OutputOrderGmvProcessFunc
extends KeyedProcessFunction<tuple>> {
private static final long serialVersionUID = 1L;
private MapState<long> state;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
state = this.getRuntimeContext.getMapState(new MapStateDescriptor<>(
"state_site_order_gmv",
Long.class,
OrderAccumulator.class)
);
}
@Override
public void processElement(OrderAccumulator value, Context ctx, Collector<tuple2>> out) throws Exception {
long key = value.getSiteId;
OrderAccumulator cachedValue = state.get(key);
if (cachedValue == || value.getSubOrderSum != cachedValue.getSubOrderSum) {
JSONObject result = new JSONObject;
result.put("site_id", value.getSiteId);
result.put("site_name", value.getSiteName);
result.put("quantity", value.getQuantitySum);
result.put("orderCount", value.getOrderIds.size);
result.put("subOrderCount", value.getSubOrderSum);
result.put("gmv", value.getGmv);
out.collect(new Tuple2<>(key, result.toJSONString);
state.put(key, value);
}
}
@Override
public void close throws Exception {
state.clear;
super.close;
}
}/<tuple2>/<long>/<tuple>/<code>
說來也簡單,就是用一個MapState狀態緩存當前所有站點的聚合數據。由於數據源是以子訂單為單位的,因此如果站點ID在MapState中沒有緩存,或者緩存的子訂單數與當前子訂單數不一致,表示結果有更新,這樣的數據才允許輸出。
最後就可以安心地接上Redis Sink了,結果會被存進一個Hash結構裡。
<code>// 看官請自己構造合適的FlinkJedisPoolConfig
FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);
siteResultStream
.addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper))
.name("sink_redis_site_gmv").uid("sink_redis_site_gmv")
.setParallelism(1);/<code>
<code>public static final class GmvRedisMapper implements RedisMapper<tuple2>> {
dditionalKey(Tuple2<long> data) {
private static final long serialVersionUID = 1L;
private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";
@Override
public RedisCommandDescription getCommandDescription {
return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);
}
@Override
public String getKeyFromData(Tuple2<long> data) {
return String.valueOf(data.f0);
}
@Override
public String getValueFromData(Tuple2<long> data) {
return data.f1;
}
@Override
public Optional<string> getA
return Optional.of(
HASH_NAME_PREFIX +
new LocalDateTime(System.currentTimeMillis).toString(Consts.TIME_DAY_FORMAT) +
"SITES"
);
}
}/<long>/<string>/<long>/<long>/<tuple2>/<code>
商品Top N
我們可以直接複用前面產生的orderStream,玩法與上面的GMV統計大同小異。這裡用1秒滾動窗口就可以了。
<code>WindowedStream<suborderdetail> merchandiseWindowStream = orderStream
.keyBy("merchandiseId")
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
DataStream<tuple2>> merchandiseRankStream = merchandiseWindowStream
.aggregate(new MerchandiseSalesAggregateFunc, new MerchandiseSalesWindowFunc)
.name("aggregate_merch_sales").uid("aggregate_merch_sales")
.returns(TypeInformation.of(new TypeHint<tuple2>> { }));/<tuple2>/<tuple2>/<suborderdetail>/<code>
聚合函數與窗口函數的實現更加簡單了,最終返回的是商品ID與商品銷量的二元組。
<code>public static final class MerchandiseSalesAggregateFunc
implements AggregateFunction<suborderdetail> {
private static final long serialVersionUID = 1L;
@Override
public Long createAccumulator {
return 0L;
}
@Override
public Long add(SubOrderDetail value, Long acc) {
return acc + value.getQuantity;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}
public static final class MerchandiseSalesWindowFunc
implements WindowFunction<long>, Tuple, TimeWindow> {
private static final long serialVersionUID = 1L;
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable<long> accs,
Collector<tuple2>> out) throws Exception {
long merchId = ((Tuple1<long>) key).f0;
long acc = accs.iterator.next;
out.collect(new Tuple2<>(merchId, acc));
}
}/<long>/<tuple2>/<long>/<long>/<suborderdetail>/<code>
既然數據最終都要落到Redis,那麼我們完全沒必要在Flink端做Top N的統計,直接利用Redis的有序集合(zset)就行了,商品ID作為field,銷量作為分數值,簡單方便。不過flink-redis-connector項目中默認沒有提供ZINCRBY命令的實現(必須再吐槽一次),我們可以自己加,步驟參照之前寫過的那篇加SETEX的命令的文章,不再贅述。RedisMapper的寫法如下。
<code>public static final class RankingRedisMapper implements RedisMapper<tuple2>> {
private static final long serialVersionUID = 1L;
private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";
@Override
public RedisCommandDescription getCommandDescription {
return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);
}
@Override
public String getKeyFromData(Tuple2<long> data) {
return String.valueOf(data.f0);
}
@Override
public String getValueFromData(Tuple2<long> data) {
return String.valueOf(data.f1);
}
@Override
public Optional<string> getAdditionalKey(Tuple2<long> data) {
return Optional.of(
ZSET_NAME_PREFIX +
new LocalDateTime(System.currentTimeMillis).toString(Consts.TIME_DAY_FORMAT) + ":" +
"MERCHANDISE"
);
}
}/<long>/<string>/<long>/<long>/<tuple2>/<code>
後端取數時,用ZREVRANGE命令即可取出指定排名的數據了。只要數據規模不是大到難以接受,並且有現成的Redis,這個方案完全可以作為各類Top N需求的通用實現。
The End
大屏的實際呈現需要保密,截圖自然是沒有的。以下是提交執行時Flink Web UI給出的執行計劃(實際有更多的統計任務,不止3個Sink)。通過複用源數據,可以在同一個Flink job內實現更多統計需求。
END
閱讀更多 zhisheng的blog 的文章