Article / 文章中心

新东方基于Hologres实时离线一体化数仓建设实践

发布时间:2022-02-18 点击数:499

事务介绍

新东方教育科技集团定坐落以学生全面成长为核心,以科技为驱动力的综合性教育集团。集团由1993年成立的北京新东方校园发展壮大而来,具有短期培训体系、基础教育体系、文化传播体系等事务。


在互联网大潮中,新东方在IT技术上也不断重构,持续投入大数据建造,研发大数据的相关技术和运用,然后快速而精准地呼应事务需求,并用数据为集团各级领导供给决议计划依据。新东方的大数据运用首要包含两部分:


  • 企业运用端的事务场景(B端):包含买卖,教育,人员等数据,数据规划为TB级。数据会被依照不同的条件和校园层级等,形成营收、教育、客服、财富人事等实时报表,为CRM体系的成千上万名事务参谋供给线索和商机的明细报表查询,一起也供各级管理人员了解事务的运行状况,辅佐事务决议计划。


  • 互联网直接面向用户场景(C端):首要为招生引流类、云教室等运用,包含网页版,App端,H5等,数据量为PB级。这部分数据记录了用户(学员和潜在用户)在新东方的教育闭环轨道,C端数据除了生成常规的运营报表外,还会制作用户画像,从而开发引荐体系和圈选等运用,改善C端各种运用的用户体验,进一步精细化运营。


数仓建造和运用痛点

为了满意日益增长的事务需求,集团开始投入数仓建造。在数据仓库建造的初期,以事务驱动为主。通过阿里云的MaxCompute为核心构建数据仓库,直接集成事务库数据以及WEB运用的OSS日志等,然后在数据仓库中剖析事务数据并发生统计剖析成果。初期的架构如下:

image.png


依据事务需求,将中小型规划的成果导入MySQL并支撑数据更新。数据规划较大的只读成果则导入 MongoDB。


然后Web服务层查询MySQL和MongoDB并向用户供给服务接口, Web服务层也能够通过Lightning加快接口直接查询MaxCompute的数据,


Lightning协议是MaxCompute查询加快服务,支撑以PostgreSQL协议及语法连接拜访MaxCompute数据,相比MaxCompute供给的odps jdbc接口速度要快得多。原因是后者把每次拜访作为一个Map-Reduce处理,即使很小的数据量查询呼应时刻也要超越10秒,而 Lightning能将延时降到百毫秒内,满意事务成果报表的展示需求。目前Lightning服务进入服务下线阶段,新的加快服务由Hologres加快集群代替。


运用这套架构能够在较短的时刻内满意报表开发、用户画像和引荐服务等需求,为新东方的日常运营和招生引流供给较好的数据支撑。可是跟着事务的开展,这套架构越来越难以满意用户的需求,首要体现在:

  1. 实时性,事务期望能够达到1分钟级甚至秒级的实时性,而运用MaxCompute只能完结批量处理,一般只能供给分钟级(一般5分钟以上)的延时
  2. 来自Web服务层的高并发查询,MaxCompute的大数据量查询只能支撑到100左右的QPS,满意不了来自C端运用的高并发查询
  3. 杂乱逻辑的大数据量剖析和Ad-hoc查询,跟着剖析数据敏捷从数百G上涨到TB级,在多个数亿行以上的数据进行杂乱报表开发,单实例MySQL难以支撑;而MongoDB无法运用规范的SQL进行杂乱查询,一起MongoDB本身杂乱的查询事务,开发功率很低。
  4. Lightning接口尽管支撑规范的SQL而且某些场景上速度比较快,可是Lightning开始逐步下线,需求找到替换的办法。


实时数仓选型

要处理以上的事务痛点,就需求找到能满意实时数仓建造需求的产品。大数据团队调研了多种实时数仓方案,依据新东方的数据和运用特色进行选型,方案比对如下:



产品

Ad-hoc查询

高并发支撑(QPS)

SQL支撑

TP(买卖)支撑

与MaxCompute/Flink集成

文档和技术支撑

ClickHouse 20.1

支撑PB级以上

默认支撑100的并发查询,qps取决于单个查询的呼应时刻

单表查询支撑较好,杂乱报表查询支撑较弱

通过mutation支撑update,较弱

支撑

文档丰厚,社区支撑较好

Doris 0.9

支撑PB级以上

数百

兼容MySQL

不支撑

通过兼容MySQL与MaxCompute集成,与Flink的集成 不明确

文档和社区都较差

Hologres 1.1

支撑PB级以上

数万以上

兼容PostgreSQL

DDL支撑

与MaxCompute直接在存储层集成,而且都兼容PostgreSQL,供给Flink Connector集成

阿里在线文档和技术支撑

Tidb 4.x (含Tiflash)

支撑PB级以上

数万以上

兼容MySQL

支撑

支撑

文档丰厚,社区支撑较好

Elastic Search 7.x

支撑PB级以上

数万以上

不支撑规范SQL

不支撑

支撑与MaxCompute集成,Flink Connector只支撑Source

文档丰厚,社区支撑较好



从以上的表格能看出,Tidb和Hologres能够较好地处理新东方在大数据方面面对的问题。可是Tidb需求私有云布置并运维,而MaxCompute布置在公有云,两者在不同的云环境。Hologres是阿里云供给的云原生服务,并与MaxCompute都布置在公有云,且在Pangu文件层紧密集成,数据交换功率远高于其他外部体系,两者都兼容PostgreSQL,从离线数据仓库开发迁移到实时数据仓库开发难度降低。


依据以上的剖析,挑选Hologres作为实时数仓。


实时数仓建造

实时数仓是在离线数仓的基础上,依据Lambda架构构建,离线和实时一起进行建造。有关Lambda的,参阅:Lambda architecture

新东方2.png


架构的各组件阐明:

1)数据源:

  • Binlog,即各类运用(B端和C端)的数据库Binlog,关于SQL SERVER的数据库则是CT log;
  • App音讯,即App运行时上报的事件;
  • Web日志/埋点日志,即Web服务器所发生的ngix日志,以及Web app/H5运行时埋点服务发生的日志


2)CDC数据总线(简称CDC)

  • CDC数据总线收集数据源,写入Kafka Topic。关于离线数仓和实时数仓, CDC都是直接交互的数据源/
  • CDC包含Source Connector、Kafka 集群、Sink Connector三部分。 Source Connector 负责从数据源收集数据并写入Kafka集群的Topic,而Sink Connector则将Kafka Topic的数据ETL到方针库,包含实时和离线数仓。
  • CDC易于布置和监控,并供给了简略的数据过滤,本钱较低,数据ETL使命尽量选用CDC。


3)离线数据处理

  • 离线数据处理依据MaxCompute建立,用于核算全量数据,数据源来自于CDC的实时导入。离线数据通过离线数仓核算(ODS->DWD/DWS→ADS)导入Hologres作为存量数据,一部分离线的DWD/DWS数据也导入Hologres作为维表的存量数据。
  • Flink核算使命会将ADS层成果Sink到MaxCompute, 用于数据备份。


4)实时数据处理

实时数据处理依据阿里云托管的 Flink流式核算引擎。与离线数仓处理固定日期的数据(如T+1)不同,实时数仓处理的是流式数据,从使命发动开始,就一直运行,除非反常终止,不然不会结束。数仓的层次与离线数仓类似,依据实时处理的特色做了简化。如下表所示:

数仓层次

描绘

数据载体

ODS层

与数据源表结构相似,数据未通过处理

Kafka Topic/cdc Connector

DWD/DWS层

数据仓库层,依据事务线/主题处理数据,可复用

Kafka Topic

DIM层

维度层

holo 维表,Kafka Topic

ADS层

运用层,面向运用创立,存储处理成果

holo实时成果表,Kafka Topic


5)Hologres 数据查询

Hologres一起作为实时数据和MaxCompute离线数据加快查询的剖析引擎,存储所有的实时数仓所需的数据表,包含维度数据表(维表)、实时成果表、存量数据表以及查询View和外表等。数据表的界说和用途如下表所示:

数据表名称

描绘

数仓层次

数据源

维度数据表

维度建模后的数据表,在实时核算时现实表通过JDBC查询

DIM层

  1. 初始化数据来自离线数仓dim 层
  2. CDC
  1. Flink维表核算使命

实时成果表

实时数仓的核算成果表

实时数仓DWS/ADS层

实时数仓的DWS/ADS层核算使命

存量成果表

离线数仓的核算成果表

实时数仓DWS/ADS层

离线数仓的DWS/ADS层核算使命

查询view

合并实时和存量成果,对外供给一致的展示View

实时数仓ADS层

存量成果表

实时成果表

外表

来自MaxCompute的数据表引证

各层次

离线数仓

备份表

备份实时核算一段时刻内的数据,用于做数据校验和问题确诊

DWD/DWS层

实时数仓


运用场景

通过新的架构,支撑了新东方集团内如下运用场景:

  1. 实时报表查询:为CRM体系的成千上万名事务参谋供给线索和商机的明细报表查询,一起为管理层供给实时活动看板服务,延时秒级,辅佐事务决议计划。
  2. Ad-hoc查询:B端和C端运营人员能够直接通过Hologres定制自己的杂乱事务查询
  3. 用户轨道和画像场景:实时处理用户来自B端和C端的数据,生成用户轨道和标签,为事务快速决议计划供给依据。
  4. 引荐体系和圈选事务:通过Maxcompute训练离线模型,并通过Flink数据调整模型的参数。依据用户的实时轨道数据圈选出契合条件的用户并推送服务,进一步精细化运营。


运用实践

一个典型的实时使命处理流程如下图所示:

新东方3.png


  1. ODS层数据通过CDC数据总线导入MaxCompute, 供给离线核算源数据。 一起也会将数据写入到Hologres,用于做数据验证。 在Hologres中,维表存储全量数据。而其他类型的ODS数据表一般存储时刻>离线的核算周期即可,如离线T+1,则存储2天,无相应的离线核算使命依据验证数据周期而定。


  1. Flink使命读取ODS层数据作为输入,与存储在Hologres中的维表做相关,核算的成果存储到DWD/DWS层的Kafka Topic中,一起将成果写入到Hologres用于数据验证,数据存储时刻与ODS层相同。


  1. Flink使命读取DWD/DWS层数据,与存储在Hologres中的维表做相关, 将结算的成果存储到Hologres。依据运用需求,假如是Lambda架构,存储时刻>离线的核算周期即可,如离线T+1,则存储2天,假如是Kappa架构,保留悉数数据, 一起将成果数据写入离线数仓用于离线剖析用(可选)。


下面具体介绍在每一步处理流程中的运用实践与经验优化,以协助达到更好的效果。


数据验证

因为实时处理源数据和成果都是动态的,数据验证无法在使命中进行。能够在Hologres中,对实时数仓的各层落仓成果进行验证。因为实时处理和时刻相关,每一层次的数据都需求带上一个处理时刻戳(Process Time)。在Lambda架构中,将实时成果和离线成果进行比对,假定离线处理周期为T+1, 则实时处理取时刻戳与昨天的数据进行比对,核算出准确率。假如是Kappa架构,需求进行逻辑验证,并与事务人员处理的成果数据进行比对。


全量数据初始化

Kafka Topic一般存储几天内的数据,不能供给全量数据,所以需求从离线数仓进行全量数据初始化,将维表、ADS层成果等导入Hologres。


Hologres维表的Lookup和功能优化

1)Lookup

在Flink核算使命中,流表和Hologres的维度数据表Join,便是Lookup。Lookup需求处理两个问题:

  1. 维表索引:实际处理过程是每条流表的数据,运用Join 条件的列去维表中查询,将成果回来。Hologres的维表的索引需求和Flink SQL的Join key一致。
  2. 维表的推迟:因为维表的数据导入是另外的使命(CDC使命或许Flink使命),就会呈现数据不同步的状况,流表数据已到,而相关的维度数据没有入库。


关于问题1, 在创立Hologres的维度表时,需求依据Flink SQL的需求去设置表的各类索引,尤其是Distribution key和Clustering key,使之与Join的相关条件列一致,有关Hologres维表的索引会在后边末节说到。


关于问题2,维表和流表Join中,处理两者数据不同步的问题,通过设置窗口能够处理大部分问题,可是因为watermark触发窗口履行,需求兼顾维表数据推迟较多的状况,因而watermark duration设置较高,然后导致了数据处理使命的Latency很高,有时不契合快速呼应的事务要求,这时能够选用联合Join,,将双流Join和Lookup结合起来。


维表数据包含两部分: 1. Hologres维表,查询全量数据. 2. 从维表对应的Kafka Topic创立的流表,查询最新的数据。Join时,先取维表对应的流表数据,假如不存在取Hologres维表的数据。


以下是一个例子,t_student(学员表)的流表和t_account(用户表) Join获取学员的user id

combined join //stream table:stream_uc_account val streamUcAccount: String = s""" CREATE TABLE `stream_t_account` ( `user_id` VARCHAR ,`mobile` VARCHAR .......(omitted) ,WATERMARK FOR event_time AS event_time - INTERVAL '20' SECOND ) WITH (  'connector' = 'kafka'  .......(omitted) ) """.stripMargin //dim table:t_account val odsUcAccount: String = s""" CREATE TABLE `t_account` WITH ( 'connector' = 'jdbc', .......(omitted) ) LIKE stream_t_account (excluding ALL) """.stripMargin //query sql: combined join val querySql:String = s""" select  coalesce(stm_acc.user_id,acc.user_id) as user_id from t_student stu LEFT JOIN stm_acc ON stu.stu_id  = stm_acc.student_id AND stu.modified_time  BETWEEN stm_acc.modified_time - INTERVAL '5' MINUTE  AND stm_acc.modified_time + INTERVAL '5' SECOND LEFT JOIN uc_account FOR SYSTEM_TIME AS OF stu.process_time AS acc ON stu.stu_id = acc.student_id


2)维表功能的优化

Flink SQL在Lookup时,流表每一条数据到来,会对Join的维表履行一次点查,Join的条件便是查询条件,例如关于流表stm_A和维表dim_B,Join条件为stm_A.id = dim.B.id

当 id=id1的stm_A数据到来时,会发生一条查询: select  from dim_B where  id=id1,因为维表查询的频率非常高,所以Join的维表列应该有索引。


Hologres索引包含: distribution key,clustering key,bitmap key,segment key(event timestamp) , 有关索引,能够参阅 holo表的创立和索引


注意:维表引荐用Hologres行存表,可是在实际状况中,因为维表还用于adhoc一类的剖析查询事务,所以本实践中大部分维表是列存表,以下实践定论是依据列存表和查询状况设定的,仅供参阅,请依据事务状况合理设置。


实践定论1:维表的Join列设置成distribution key

因为当时运用列存作为维度表,维表的列数会影响查询功能,关于同一个维表,8个列和16个列的功能相差50%以上,主张join用到的列都设置成distribution key,不能多也不能少。假如运用行存表,没有这个限制。


实践定论2:尽可能减少维表的特点列

在运用中,维表可能有多个维度列会被用于Join,例如表T1,有两个维度列F1、F2别离用做和流表A,B的Join条件。依据F1和F2之间的联系,假如F1..F2→1..n,就在F1上创立distribution key, 反过来则在F2上创立,即在粒度较大的维度列上创立distribution key。


实践定论3: 一个维度表有多个维度列而且是Hierarchy时,在粒度较大的列上创立distribution key,并用在Join条件中


假如 F1..F2是多对多的联系,阐明一个维表有两个交织的维度,而不是层次维度(hierarchy)上,需求进行拆分。查询时,不管Lookup是否必须用到distribution key索引列,都要把distribution key索引放在Join条件里。

示例: 维表t1有两个维度列:stu_code和roster_code,distribution key加在stu_code上

流表stm_t2需求 Lookup 维表t1,相关条件是两个表的roster_code相同

select <field list> from FROM stm_t2 stm JOIN t1 FOR SYSTEM_TIME AS OF stm.process_time AS dim ON stm.stu_code = dim.stu_code and stm.roster_code = dim.roster_code

事务价值

通过半年的实时数仓建造,并在集团内广泛运用。为事务的带来的价值如下:

  1. 为运营人员供给了1分钟级/秒级的实时看板服务和实时报表,事务人员能够及时了解到用户的反馈和事务的进程,然后调整运营战略,进步运营功率
  2. 为C端运用供给了秒级的实时用户画像服务和用户圈选服务,然后能够让引荐体系及时依据用户反馈调整引荐产品列表,进步用户体验
  3. 开发功率大为进步,开发人员从之前的架构迁移到Hologres+Flink SQL上后,开发功率进步了1-2倍,学习的梯度也降低了许多。
  4. 运维本钱下降,之前需求保护MySQL,  MongoDB等异构体系,而Hologres是云原生服务,无需保护一个运维团队。


作者:陈毓林, 新东方互联网技术中心大数据工程师。在新东方从事多年大数据架构研发,数据集成渠道开发,以及事务数据剖析等,首要技术领域包含依据flink的实时核算和优化,kafka相关的数据交换和集成等阿里云的云原生技术。