新东方基于Hologres实时离线一体化数仓建设实践
事务介绍
新东方教育科技集团定坐落以学生全面成长为核心,以科技为驱动力的综合性教育集团。集团由1993年成立的北京新东方校园发展壮大而来,具有短期培训体系、基础教育体系、文化传播体系等事务。
在互联网大潮中,新东方在IT技术上也不断重构,持续投入大数据建造,研发大数据的相关技术和运用,然后快速而精准地呼应事务需求,并用数据为集团各级领导供给决议计划依据。新东方的大数据运用首要包含两部分:
- 企业运用端的事务场景(B端):包含买卖,教育,人员等数据,数据规划为TB级。数据会被依照不同的条件和校园层级等,形成营收、教育、客服、财富人事等实时报表,为CRM体系的成千上万名事务参谋供给线索和商机的明细报表查询,一起也供各级管理人员了解事务的运行状况,辅佐事务决议计划。
- 互联网直接面向用户场景(C端):首要为招生引流类、云教室等运用,包含网页版,App端,H5等,数据量为PB级。这部分数据记录了用户(学员和潜在用户)在新东方的教育闭环轨道,C端数据除了生成常规的运营报表外,还会制作用户画像,从而开发引荐体系和圈选等运用,改善C端各种运用的用户体验,进一步精细化运营。
数仓建造和运用痛点
为了满意日益增长的事务需求,集团开始投入数仓建造。在数据仓库建造的初期,以事务驱动为主。通过阿里云的MaxCompute为核心构建数据仓库,直接集成事务库数据以及WEB运用的OSS日志等,然后在数据仓库中剖析事务数据并发生统计剖析成果。初期的架构如下:
依据事务需求,将中小型规划的成果导入MySQL并支撑数据更新。数据规划较大的只读成果则导入 MongoDB。
然后Web服务层查询MySQL和MongoDB并向用户供给服务接口, Web服务层也能够通过Lightning加快接口直接查询MaxCompute的数据,
Lightning协议是MaxCompute查询加快服务,支撑以PostgreSQL协议及语法连接拜访MaxCompute数据,相比MaxCompute供给的odps jdbc接口速度要快得多。原因是后者把每次拜访作为一个Map-Reduce处理,即使很小的数据量查询呼应时刻也要超越10秒,而 Lightning能将延时降到百毫秒内,满意事务成果报表的展示需求。目前Lightning服务进入服务下线阶段,新的加快服务由Hologres加快集群代替。
运用这套架构能够在较短的时刻内满意报表开发、用户画像和引荐服务等需求,为新东方的日常运营和招生引流供给较好的数据支撑。可是跟着事务的开展,这套架构越来越难以满意用户的需求,首要体现在:
- 实时性,事务期望能够达到1分钟级甚至秒级的实时性,而运用MaxCompute只能完结批量处理,一般只能供给分钟级(一般5分钟以上)的延时
- 来自Web服务层的高并发查询,MaxCompute的大数据量查询只能支撑到100左右的QPS,满意不了来自C端运用的高并发查询
- 杂乱逻辑的大数据量剖析和Ad-hoc查询,跟着剖析数据敏捷从数百G上涨到TB级,在多个数亿行以上的数据进行杂乱报表开发,单实例MySQL难以支撑;而MongoDB无法运用规范的SQL进行杂乱查询,一起MongoDB本身杂乱的查询事务,开发功率很低。
- Lightning接口尽管支撑规范的SQL而且某些场景上速度比较快,可是Lightning开始逐步下线,需求找到替换的办法。
实时数仓选型
要处理以上的事务痛点,就需求找到能满意实时数仓建造需求的产品。大数据团队调研了多种实时数仓方案,依据新东方的数据和运用特色进行选型,方案比对如下:
产品 |
Ad-hoc查询 |
高并发支撑(QPS) |
SQL支撑 |
TP(买卖)支撑 |
与MaxCompute/Flink集成 |
文档和技术支撑 |
ClickHouse 20.1 |
支撑PB级以上 |
默认支撑100的并发查询,qps取决于单个查询的呼应时刻 |
单表查询支撑较好,杂乱报表查询支撑较弱 |
通过mutation支撑update,较弱 |
支撑 |
文档丰厚,社区支撑较好 |
Doris 0.9 |
支撑PB级以上 |
数百 |
兼容MySQL |
不支撑 |
通过兼容MySQL与MaxCompute集成,与Flink的集成 不明确 |
文档和社区都较差 |
Hologres 1.1 |
支撑PB级以上 |
数万以上 |
兼容PostgreSQL |
DDL支撑 |
与MaxCompute直接在存储层集成,而且都兼容PostgreSQL,供给Flink Connector集成 |
阿里在线文档和技术支撑 |
Tidb 4.x (含Tiflash) |
支撑PB级以上 |
数万以上 |
兼容MySQL |
支撑 |
支撑 |
文档丰厚,社区支撑较好 |
Elastic Search 7.x |
支撑PB级以上 |
数万以上 |
不支撑规范SQL |
不支撑 |
支撑与MaxCompute集成,Flink Connector只支撑Source |
文档丰厚,社区支撑较好 |
从以上的表格能看出,Tidb和Hologres能够较好地处理新东方在大数据方面面对的问题。可是Tidb需求私有云布置并运维,而MaxCompute布置在公有云,两者在不同的云环境。Hologres是阿里云供给的云原生服务,并与MaxCompute都布置在公有云,且在Pangu文件层紧密集成,数据交换功率远高于其他外部体系,两者都兼容PostgreSQL,从离线数据仓库开发迁移到实时数据仓库开发难度降低。
依据以上的剖析,挑选Hologres作为实时数仓。
实时数仓建造
实时数仓是在离线数仓的基础上,依据Lambda架构构建,离线和实时一起进行建造。有关Lambda的,参阅:Lambda architecture
架构的各组件阐明:
1)数据源:
- Binlog,即各类运用(B端和C端)的数据库Binlog,关于SQL SERVER的数据库则是CT log;
- App音讯,即App运行时上报的事件;
- Web日志/埋点日志,即Web服务器所发生的ngix日志,以及Web app/H5运行时埋点服务发生的日志
2)CDC数据总线(简称CDC)
- CDC数据总线收集数据源,写入Kafka Topic。关于离线数仓和实时数仓, CDC都是直接交互的数据源/
- CDC包含Source Connector、Kafka 集群、Sink Connector三部分。 Source Connector 负责从数据源收集数据并写入Kafka集群的Topic,而Sink Connector则将Kafka Topic的数据ETL到方针库,包含实时和离线数仓。
- CDC易于布置和监控,并供给了简略的数据过滤,本钱较低,数据ETL使命尽量选用CDC。
3)离线数据处理
- 离线数据处理依据MaxCompute建立,用于核算全量数据,数据源来自于CDC的实时导入。离线数据通过离线数仓核算(ODS->DWD/DWS→ADS)导入Hologres作为存量数据,一部分离线的DWD/DWS数据也导入Hologres作为维表的存量数据。
- Flink核算使命会将ADS层成果Sink到MaxCompute, 用于数据备份。
4)实时数据处理
实时数据处理依据阿里云托管的 Flink流式核算引擎。与离线数仓处理固定日期的数据(如T+1)不同,实时数仓处理的是流式数据,从使命发动开始,就一直运行,除非反常终止,不然不会结束。数仓的层次与离线数仓类似,依据实时处理的特色做了简化。如下表所示:
数仓层次 |
描绘 |
数据载体 |
ODS层 |
与数据源表结构相似,数据未通过处理 |
Kafka Topic/cdc Connector |
DWD/DWS层 |
数据仓库层,依据事务线/主题处理数据,可复用 |
Kafka Topic |
DIM层 |
维度层 |
holo 维表,Kafka Topic |
ADS层 |
运用层,面向运用创立,存储处理成果 |
holo实时成果表,Kafka Topic |
5)Hologres 数据查询
Hologres一起作为实时数据和MaxCompute离线数据加快查询的剖析引擎,存储所有的实时数仓所需的数据表,包含维度数据表(维表)、实时成果表、存量数据表以及查询View和外表等。数据表的界说和用途如下表所示:
数据表名称 |
描绘 |
数仓层次 |
数据源 |
维度数据表 |
维度建模后的数据表,在实时核算时现实表通过JDBC查询 |
DIM层 |
|
实时成果表 |
实时数仓的核算成果表 |
实时数仓DWS/ADS层 |
实时数仓的DWS/ADS层核算使命 |
存量成果表 |
离线数仓的核算成果表 |
实时数仓DWS/ADS层 |
离线数仓的DWS/ADS层核算使命 |
查询view |
合并实时和存量成果,对外供给一致的展示View |
实时数仓ADS层 |
存量成果表 实时成果表 |
外表 |
来自MaxCompute的数据表引证 |
各层次 |
离线数仓 |
备份表 |
备份实时核算一段时刻内的数据,用于做数据校验和问题确诊 |
DWD/DWS层 |
实时数仓 |
运用场景
通过新的架构,支撑了新东方集团内如下运用场景:
- 实时报表查询:为CRM体系的成千上万名事务参谋供给线索和商机的明细报表查询,一起为管理层供给实时活动看板服务,延时秒级,辅佐事务决议计划。
- Ad-hoc查询:B端和C端运营人员能够直接通过Hologres定制自己的杂乱事务查询
- 用户轨道和画像场景:实时处理用户来自B端和C端的数据,生成用户轨道和标签,为事务快速决议计划供给依据。
- 引荐体系和圈选事务:通过Maxcompute训练离线模型,并通过Flink数据调整模型的参数。依据用户的实时轨道数据圈选出契合条件的用户并推送服务,进一步精细化运营。
运用实践
一个典型的实时使命处理流程如下图所示:
- ODS层数据通过CDC数据总线导入MaxCompute, 供给离线核算源数据。 一起也会将数据写入到Hologres,用于做数据验证。 在Hologres中,维表存储全量数据。而其他类型的ODS数据表一般存储时刻>离线的核算周期即可,如离线T+1,则存储2天,无相应的离线核算使命依据验证数据周期而定。
- Flink使命读取ODS层数据作为输入,与存储在Hologres中的维表做相关,核算的成果存储到DWD/DWS层的Kafka Topic中,一起将成果写入到Hologres用于数据验证,数据存储时刻与ODS层相同。
- Flink使命读取DWD/DWS层数据,与存储在Hologres中的维表做相关, 将结算的成果存储到Hologres。依据运用需求,假如是Lambda架构,存储时刻>离线的核算周期即可,如离线T+1,则存储2天,假如是Kappa架构,保留悉数数据, 一起将成果数据写入离线数仓用于离线剖析用(可选)。
下面具体介绍在每一步处理流程中的运用实践与经验优化,以协助达到更好的效果。
数据验证
因为实时处理源数据和成果都是动态的,数据验证无法在使命中进行。能够在Hologres中,对实时数仓的各层落仓成果进行验证。因为实时处理和时刻相关,每一层次的数据都需求带上一个处理时刻戳(Process Time)。在Lambda架构中,将实时成果和离线成果进行比对,假定离线处理周期为T+1, 则实时处理取时刻戳与昨天的数据进行比对,核算出准确率。假如是Kappa架构,需求进行逻辑验证,并与事务人员处理的成果数据进行比对。
全量数据初始化
Kafka Topic一般存储几天内的数据,不能供给全量数据,所以需求从离线数仓进行全量数据初始化,将维表、ADS层成果等导入Hologres。
Hologres维表的Lookup和功能优化
1)Lookup
在Flink核算使命中,流表和Hologres的维度数据表Join,便是Lookup。Lookup需求处理两个问题:
- 维表索引:实际处理过程是每条流表的数据,运用Join 条件的列去维表中查询,将成果回来。Hologres的维表的索引需求和Flink SQL的Join key一致。
- 维表的推迟:因为维表的数据导入是另外的使命(CDC使命或许Flink使命),就会呈现数据不同步的状况,流表数据已到,而相关的维度数据没有入库。
关于问题1, 在创立Hologres的维度表时,需求依据Flink SQL的需求去设置表的各类索引,尤其是Distribution key和Clustering key,使之与Join的相关条件列一致,有关Hologres维表的索引会在后边末节说到。
关于问题2,维表和流表Join中,处理两者数据不同步的问题,通过设置窗口能够处理大部分问题,可是因为watermark触发窗口履行,需求兼顾维表数据推迟较多的状况,因而watermark duration设置较高,然后导致了数据处理使命的Latency很高,有时不契合快速呼应的事务要求,这时能够选用联合Join,,将双流Join和Lookup结合起来。
维表数据包含两部分: 1. Hologres维表,查询全量数据. 2. 从维表对应的Kafka Topic创立的流表,查询最新的数据。Join时,先取维表对应的流表数据,假如不存在取Hologres维表的数据。
以下是一个例子,t_student(学员表)的流表和t_account(用户表) Join获取学员的user id
combined join //stream table:stream_uc_account val streamUcAccount: String = s""" CREATE TABLE `stream_t_account` ( `user_id` VARCHAR ,`mobile` VARCHAR .......(omitted) ,WATERMARK FOR event_time AS event_time - INTERVAL '20' SECOND ) WITH ( 'connector' = 'kafka' .......(omitted) ) """.stripMargin //dim table:t_account val odsUcAccount: String = s""" CREATE TABLE `t_account` WITH ( 'connector' = 'jdbc', .......(omitted) ) LIKE stream_t_account (excluding ALL) """.stripMargin //query sql: combined join val querySql:String = s""" select coalesce(stm_acc.user_id,acc.user_id) as user_id from t_student stu LEFT JOIN stm_acc ON stu.stu_id = stm_acc.student_id AND stu.modified_time BETWEEN stm_acc.modified_time - INTERVAL '5' MINUTE AND stm_acc.modified_time + INTERVAL '5' SECOND LEFT JOIN uc_account FOR SYSTEM_TIME AS OF stu.process_time AS acc ON stu.stu_id = acc.student_id
2)维表功能的优化
Flink SQL在Lookup时,流表每一条数据到来,会对Join的维表履行一次点查,Join的条件便是查询条件,例如关于流表stm_A和维表dim_B,Join条件为stm_A.id = dim.B.id
当 id=id1的stm_A数据到来时,会发生一条查询: select from dim_B where id=id1,因为维表查询的频率非常高,所以Join的维表列应该有索引。
Hologres索引包含: distribution key,clustering key,bitmap key,segment key(event timestamp) , 有关索引,能够参阅 holo表的创立和索引
注意:维表引荐用Hologres行存表,可是在实际状况中,因为维表还用于adhoc一类的剖析查询事务,所以本实践中大部分维表是列存表,以下实践定论是依据列存表和查询状况设定的,仅供参阅,请依据事务状况合理设置。
实践定论1:维表的Join列设置成distribution key
因为当时运用列存作为维度表,维表的列数会影响查询功能,关于同一个维表,8个列和16个列的功能相差50%以上,主张join用到的列都设置成distribution key,不能多也不能少。假如运用行存表,没有这个限制。
实践定论2:尽可能减少维表的特点列
在运用中,维表可能有多个维度列会被用于Join,例如表T1,有两个维度列F1、F2别离用做和流表A,B的Join条件。依据F1和F2之间的联系,假如F1..F2→1..n,就在F1上创立distribution key, 反过来则在F2上创立,即在粒度较大的维度列上创立distribution key。
实践定论3: 一个维度表有多个维度列而且是Hierarchy时,在粒度较大的列上创立distribution key,并用在Join条件中
假如 F1..F2是多对多的联系,阐明一个维表有两个交织的维度,而不是层次维度(hierarchy)上,需求进行拆分。查询时,不管Lookup是否必须用到distribution key索引列,都要把distribution key索引放在Join条件里。
示例: 维表t1有两个维度列:stu_code和roster_code,distribution key加在stu_code上
流表stm_t2需求 Lookup 维表t1,相关条件是两个表的roster_code相同
select <field list> from FROM stm_t2 stm JOIN t1 FOR SYSTEM_TIME AS OF stm.process_time AS dim ON stm.stu_code = dim.stu_code and stm.roster_code = dim.roster_code
事务价值
通过半年的实时数仓建造,并在集团内广泛运用。为事务的带来的价值如下:
- 为运营人员供给了1分钟级/秒级的实时看板服务和实时报表,事务人员能够及时了解到用户的反馈和事务的进程,然后调整运营战略,进步运营功率。
- 为C端运用供给了秒级的实时用户画像服务和用户圈选服务,然后能够让引荐体系及时依据用户反馈调整引荐产品列表,进步用户体验
- 开发功率大为进步,开发人员从之前的架构迁移到Hologres+Flink SQL上后,开发功率进步了1-2倍,学习的梯度也降低了许多。
- 运维本钱下降,之前需求保护MySQL, MongoDB等异构体系,而Hologres是云原生服务,无需保护一个运维团队。
作者:陈毓林, 新东方互联网技术中心大数据工程师。在新东方从事多年大数据架构研发,数据集成渠道开发,以及事务数据剖析等,首要技术领域包含依据flink的实时核算和优化,kafka相关的数据交换和集成等阿里云的云原生技术。