大迭代(一):离线属性计算解耦
需求背景
大迭代的第一个方向:解决离线计算属性的开发成本问题。
每次新增一个离线计算属性(比如"昨日浏览量"、"近7天平均评分"),都需要写 Java 代码、上线部署。这类代码的逻辑高度相似,却要一遍遍重复开发。
问题现状
以新增"昨日浏览量"属性为例,传统做法是这样的:
@Service
public class YesterdayViewCountService {
public void calculateYesterdayViewCount() {
// 1. 查询浏览日志表(亿级数据量)
List<ViewLog> logs = viewLogRepository.findByDate(yesterday);
// 2. 按图书ID聚合
Map<Long, Long> viewCountMap = logs.stream()
.collect(Collectors.groupingBy(ViewLog::getBookId, Collectors.counting()));
// 3. 写入扩展属性表
viewCountMap.forEach((bookId, count) -> {
BookExtensionAttribute attr = new BookExtensionAttribute();
attr.setBookId(bookId);
attr.setAttributeKey("yesterday_view_count");
attr.setAttributeValue(String.valueOf(count));
attr.setAttributeType("Long");
eavRepository.save(attr);
});
}
}
然后配一个定时任务每天执行。
这段代码有几个问题:在 Java 里处理亿级数据,性能很差;每次新增属性都要写类似的代码;无法利用大数据计算框架的优势。
核心矛盾
这类离线计算的本质是:对大规模数据做聚合运算,结果写入存储。这正是大数据计算框架最擅长的事,用 Java 来做是在用错误的工具解决问题。
方案设计
引入 Hive + DTS + MQ 的数据流水线,用 HiveSQL 替代 Java 代码做离线计算。
整体流程:
数据源(Hive 日分区表)
→ HiveSQL 计算任务
→ 结果写入 Hive 结果表
→ DTS 同步到 Kafka
→ MQ 消费者写入 EAV 表
→ 触发 ES 异构
步骤一:编写 HiveSQL
同样是计算"昨日浏览量",HiveSQL 只需要几行:
INSERT OVERWRITE TABLE book_yesterday_view_count
SELECT
book_id,
COUNT(*) as view_count,
'${dt}' as calc_date
FROM view_log
WHERE dt = '${dt}'
GROUP BY book_id;
复杂一点的,比如"近7天综合评分"(浏览量30% + 评论数40% + 收藏数30%):
INSERT OVERWRITE TABLE book_comprehensive_score
SELECT
t.book_id,
(t.norm_view * 0.3 + t.norm_comment * 0.4 + t.norm_favorite * 0.3) * 100 as score,
'${dt}' as calc_date
FROM (
SELECT
v.book_id,
v.view_count / max_v.max_view as norm_view,
c.comment_count / max_c.max_comment as norm_comment,
f.favorite_count / max_f.max_favorite as norm_favorite
FROM book_yesterday_view_count v
LEFT JOIN book_comment_stats c ON v.book_id = c.book_id AND c.dt = '${dt}'
LEFT JOIN book_favorite_stats f ON v.book_id = f.book_id AND f.dt = '${dt}'
CROSS JOIN (SELECT MAX(view_count) as max_view FROM book_yesterday_view_count WHERE dt = '${dt}') max_v
CROSS JOIN (SELECT MAX(comment_count) as max_comment FROM book_comment_stats WHERE dt = '${dt}') max_c
CROSS JOIN (SELECT MAX(favorite_count) as max_favorite FROM book_favorite_stats WHERE dt = '${dt}') max_f
) t;
步骤二:DTS 配置同步
DTS(Data Transfer Service)负责把 Hive 计算结果同步到 Kafka,配置化完成,无需代码:
source:
type: hive
table: book_yesterday_view_count
partition: dt='${dt}'
target:
type: kafka
topic: book_attribute_sync
format: json
transform:
- field: book_id → book_id
- field: view_count → attribute_value
- const: "yesterday_view_count" → attribute_key
- const: "Long" → attribute_type
步骤三:通用 MQ 消费者
消费者是通用的,不需要针对每个属性单独写:
@KafkaListener(topics = "book_attribute_sync", batch = "true")
public void consumeBatch(List<AttributeSyncMessage> messages) {
List<BookExtensionAttribute> attrs = messages.stream()
.map(this::convertToAttribute)
.collect(Collectors.toList());
eavRepository.saveAll(attrs); // 批量幂等写入
}
数据一致性保障
Hive 计算完成后,延时一定时间再触发 DTS 同步,确保下游数据就绪。同时定时运行对账 SQL,发现不一致主动修正:
SELECT h.book_id, h.attribute_key, h.value as hive_value, e.attribute_value as mysql_value
FROM hive_result h
LEFT JOIN book_extension_attribute e
ON h.book_id = e.book_id AND h.attribute_key = e.attribute_key
WHERE e.attribute_value IS NULL OR h.value != e.attribute_value;
成本对比
优化前(新增"昨日浏览量"):写 Java 计算逻辑 + 处理性能问题 + 单元测试 + 上线,耗时 2-3 天。
优化后:写 HiveSQL(10行)+ 配置 DTS 任务,耗时 半天。
后续再新增属性:只需写新的 HiveSQL,DTS 和消费者复用,耗时 2 小时。
| 维度 | 优化前 | 优化后 |
|---|---|---|
| 新增属性周期 | 2-3 天 | 半天(首次)/ 2小时(后续) |
| 代码上线 | 必须 | 任务上线,风险更低 |
| 大数据计算能力 | 受限 | Hive 全量支持 |
这一步解决了什么
离线计算属性从此不需要写 Java 代码了,只需要写 HiveSQL 任务。计算能力也从 Java 单机升级到了 Hive 分布式,可以处理亿级数据。
大迭代的第一个方向完成。接下来是第二个方向:对 CQRS 的异构链路进行重构。