大迭代(一):离线属性计算解耦

需求背景

大迭代的第一个方向:解决离线计算属性的开发成本问题。

每次新增一个离线计算属性(比如"昨日浏览量"、"近7天平均评分"),都需要写 Java 代码、上线部署。这类代码的逻辑高度相似,却要一遍遍重复开发。

问题现状

以新增"昨日浏览量"属性为例,传统做法是这样的:

@Service
public class YesterdayViewCountService {
    public void calculateYesterdayViewCount() {
        // 1. 查询浏览日志表(亿级数据量)
        List<ViewLog> logs = viewLogRepository.findByDate(yesterday);

        // 2. 按图书ID聚合
        Map<Long, Long> viewCountMap = logs.stream()
            .collect(Collectors.groupingBy(ViewLog::getBookId, Collectors.counting()));

        // 3. 写入扩展属性表
        viewCountMap.forEach((bookId, count) -> {
            BookExtensionAttribute attr = new BookExtensionAttribute();
            attr.setBookId(bookId);
            attr.setAttributeKey("yesterday_view_count");
            attr.setAttributeValue(String.valueOf(count));
            attr.setAttributeType("Long");
            eavRepository.save(attr);
        });
    }
}

然后配一个定时任务每天执行。

这段代码有几个问题:在 Java 里处理亿级数据,性能很差;每次新增属性都要写类似的代码;无法利用大数据计算框架的优势。

核心矛盾

这类离线计算的本质是:对大规模数据做聚合运算,结果写入存储。这正是大数据计算框架最擅长的事,用 Java 来做是在用错误的工具解决问题。

方案设计

引入 Hive + DTS + MQ 的数据流水线,用 HiveSQL 替代 Java 代码做离线计算。

整体流程:

数据源(Hive 日分区表)
  → HiveSQL 计算任务
  → 结果写入 Hive 结果表
  → DTS 同步到 Kafka
  → MQ 消费者写入 EAV 表
  → 触发 ES 异构

步骤一:编写 HiveSQL

同样是计算"昨日浏览量",HiveSQL 只需要几行:

INSERT OVERWRITE TABLE book_yesterday_view_count
SELECT
    book_id,
    COUNT(*) as view_count,
    '${dt}' as calc_date
FROM view_log
WHERE dt = '${dt}'
GROUP BY book_id;

复杂一点的,比如"近7天综合评分"(浏览量30% + 评论数40% + 收藏数30%):

INSERT OVERWRITE TABLE book_comprehensive_score
SELECT
    t.book_id,
    (t.norm_view * 0.3 + t.norm_comment * 0.4 + t.norm_favorite * 0.3) * 100 as score,
    '${dt}' as calc_date
FROM (
    SELECT
        v.book_id,
        v.view_count / max_v.max_view as norm_view,
        c.comment_count / max_c.max_comment as norm_comment,
        f.favorite_count / max_f.max_favorite as norm_favorite
    FROM book_yesterday_view_count v
    LEFT JOIN book_comment_stats c ON v.book_id = c.book_id AND c.dt = '${dt}'
    LEFT JOIN book_favorite_stats f ON v.book_id = f.book_id AND f.dt = '${dt}'
    CROSS JOIN (SELECT MAX(view_count) as max_view FROM book_yesterday_view_count WHERE dt = '${dt}') max_v
    CROSS JOIN (SELECT MAX(comment_count) as max_comment FROM book_comment_stats WHERE dt = '${dt}') max_c
    CROSS JOIN (SELECT MAX(favorite_count) as max_favorite FROM book_favorite_stats WHERE dt = '${dt}') max_f
) t;

步骤二:DTS 配置同步

DTS(Data Transfer Service)负责把 Hive 计算结果同步到 Kafka,配置化完成,无需代码:

source:
  type: hive
  table: book_yesterday_view_count
  partition: dt='${dt}'
target:
  type: kafka
  topic: book_attribute_sync
  format: json
transform:
  - field: book_id       → book_id
  - field: view_count    → attribute_value
  - const: "yesterday_view_count" → attribute_key
  - const: "Long"        → attribute_type

步骤三:通用 MQ 消费者

消费者是通用的,不需要针对每个属性单独写:

@KafkaListener(topics = "book_attribute_sync", batch = "true")
public void consumeBatch(List<AttributeSyncMessage> messages) {
    List<BookExtensionAttribute> attrs = messages.stream()
        .map(this::convertToAttribute)
        .collect(Collectors.toList());
    eavRepository.saveAll(attrs);  // 批量幂等写入
}

数据一致性保障

Hive 计算完成后,延时一定时间再触发 DTS 同步,确保下游数据就绪。同时定时运行对账 SQL,发现不一致主动修正:

SELECT h.book_id, h.attribute_key, h.value as hive_value, e.attribute_value as mysql_value
FROM hive_result h
LEFT JOIN book_extension_attribute e
  ON h.book_id = e.book_id AND h.attribute_key = e.attribute_key
WHERE e.attribute_value IS NULL OR h.value != e.attribute_value;

成本对比

优化前(新增"昨日浏览量"):写 Java 计算逻辑 + 处理性能问题 + 单元测试 + 上线,耗时 2-3 天

优化后:写 HiveSQL(10行)+ 配置 DTS 任务,耗时 半天

后续再新增属性:只需写新的 HiveSQL,DTS 和消费者复用,耗时 2 小时

维度优化前优化后
新增属性周期2-3 天半天(首次)/ 2小时(后续)
代码上线必须任务上线,风险更低
大数据计算能力受限Hive 全量支持

这一步解决了什么

离线计算属性从此不需要写 Java 代码了,只需要写 HiveSQL 任务。计算能力也从 Java 单机升级到了 Hive 分布式,可以处理亿级数据。

大迭代的第一个方向完成。接下来是第二个方向:对 CQRS 的异构链路进行重构。