爱奇艺数据湖实战
2023-02-04 18:52
热度:1724
作者:龙成创业
01 什么是数据湖?
数据湖概念于2010年[1]首次提出,经过多年的演变,目前演化出两种不同的定义——公有云数据湖、非公有云数据湖。
公有云数据湖
AWS[2]、GoogleCloud[3]以及国内的阿里云、腾讯云等公有云厂商对数据湖的定义是一个集中的、近乎无限空间的数据存储区,支持结构化、半结构化、非结构化等各种类型数据。在公有云厂商的语境下,数据湖一般就是各家的云存储产品,比如AWSS3、GoogleCloudStorage、阿里云OSS等。
在云计算出现之前,公司数据主要分散在不同的业务数据库中,由于存储空间有限,存放的是经过处理后的结构化数据,丢失了部分原始信息。随着业务发展,这类传统的数据库/数据仓库已不能满足多样化的数据应用场景需求。开源Hadoop及公有云云存储的出现正是为了解决这一痛点,将不同类型的业务数据导入到Hadoop或云存储中进行后续不同场景的处理,随用随取,因此被称为数据湖。
关于各家公有云的数据湖架构及解决方案,可以参看这篇介绍文章:《数据湖|一文读懂DataLake的概念、特征、架构与案例》[4]。
非公有云数据湖
Hadoop、公有云存储支持文件级别的操作,如上传文件、删除文件,不支持对文件内容里行级别的操作,如添加/删除/更新某行。因此,基于Hadoop或公有云存储构建的数据仓库不支持实时增量数据更新、不支持流式数据,延迟通常在小时级乃至T+1。
为此,Uber、Netflix、Databricks等几家公司在2017-2019期间相继推出了Hudi[5]、Iceberg[6]、DeltaLake等,试图在Hadoop、公有云存储层之上提供一个通用的表格格式(TableFormat)层。国内(非公有云场合)一般称这三者为数据湖。这种叫法是不准确的,但业界一般都这么称呼,我们也跟着“将错就错”。在非公有云场合,如果不特别说明,数据湖一般就是指Hudi、Iceberg、DeltaLake三者之一。
爱奇艺数据湖
综合以上两种定义,我们理解的数据湖应当具备以下几个特性:
统一存储:支持灵活的存储底座(公有云/私有云、HDD/SSD/缓存),具备集中的、足够大的存储空间
通用数据抽象/组织层:支持结构化、半结构化、非结构化等不同数据类型,并抽象出一层统一的数据组织形式,Hudi、Iceberg、DeltaLake等目前仅统一了结构化数据格式
支持批处理、流计算、机器学习等不同计算类型
统一的数据管理(元数据中心、生命周期、数据治理等),避免形成数据孤岛
这类数据湖,我们称之为广义数据湖,与之对应的狭义数据湖就专指Hudi、Iceberg、DeltaLake。我们目前启动了多个项目推进广义数据湖建设,后续会另有专文阐述,本文及本系列后续文章将集中介绍狭义数据湖(下文简称“数据湖”)。
02 为什么需要数据湖?
在上一节解释数据湖的定义时,已大体介绍了业务对数据湖的需求,本节将结合三个典型的场景来说明业务为什么需要数据湖。
场景一-事件流实时分析
事件流实时分析是大数据中常见的需求,典型的业务场景如分析广告投放效果、视频实时运营、日志排障等。而相应的技术产品也非常多
各OLAP引擎特点比较
数据湖相比其他产品有如下核心优势:
时效性好:数据湖里的数据近实时(1-5分钟)可见,比Hive离线延时优势明显,能满足大部分业务的要求
规模大:数据湖存储使用的是HDFS,写入吞吐几乎无瓶颈
成本低:数据湖无需单独服务器,机器成本低,运维成本低
查询支持:数据湖支持明细数据查询,也支持各种复杂的联合分析
数据分享:数据湖支持Spark、Trino、Flink等各类引擎分析,而数据进入ElasticSearch、Kudu、Druid、ClickHouse后都只能使用专用的查询引擎分析,数据分享需额外导出为Parquet等格式
传统上受规模、成本等限制,业务仅最终报表或者关键链路会使用实时分析,其他场景仍以离线分析为主。随着业务发展,需要一种更大规模、更便宜的近实时分析手段,数据湖就提供了这样的解决方案。
场景二-变更数据分析
数据变更有如下几种类型,一种是行级变更,如MySQL中会员订单,用户特征等,其拥有确定的行主键,且列变更频繁,对于此类数据进行聚合分析主要有如下方案:
大数据支持行更新常见方案
离线导出:缺点是同步延迟大,通常是T+1;另一方面是同步代价大,每次需全量导出,给数据源MySQL/HBase带来较大读压力;
实时同步进Kudu:面临规模小、成本大等痛点
数据变更的另一种形式是新增行或列。Hive若一个分区计算已经完成,那么晚到的数据只能丢弃,否则需要覆写整个分区。Hive历史分区新增一个列也需要覆写整个分区。
数据湖解决了业务的上述痛点,业务能将MySQL、MongoDB中的变更近实时地入湖。新增行仅需将相应行写文件即可,新增列也支持单独写列文件,无需覆写分区。
场景三-流批一体
爱奇艺大量业务采用图2-2上半部分所示的Lambda架构,实时链路采用Kafka+Flink技术,为业务提供实时推荐、实时监控等能力;离线链路采用HDFS+HiveQL技术,用于校准数据和扫描查询,该架构具有如下痛点:
离线通路时效性差、而实时通路容量低
维护两套逻辑,开发效率低
容易出现数据不一致
维护两套服务机器,成本高
传统Lambda架构和数据湖流批一体架构
而使用数据湖业务能实现流批一体,消费Kafka数据近实时入湖(5分钟延迟),既能满足离线的批量扫描、数据覆盖场景,也能满足大部分实时场景(需要秒级延迟的除外)。由此具有如下优势:
支持海量数据近实时更新
一套代码,避免重复开发
避免数据不一致
服务存储和计算成本更低
满足上述场景产品特性总结
总结上述几类场景,可以用图2-3描述数据湖产品所需要的能力,归纳起来其需要具备如下关键特性:
规模大,成本低:能支持PB级别数据规模
支持更新:包括历史分区新增数据、行级更新等
增量拉取:将表的变更转成流数据用于构建下游表
时效性:近实时(5分钟)
查询快:交互级查询速度
归纳数据湖产品具备的能力
03 数据湖选型及原理说明
爱奇艺从2020年开始调研Hudi、Iceberg、DeltaLake这三个主流的数据湖产品(见表3-1),综合评估各产品的读写性能、平台适配(爱奇艺实时计算平台是基于Flink构建的)、未来发展潜力等因素,我们选择使用ApacheIceberg作为爱奇艺数据湖的核心基础。
狭义数据湖3大开源产品调研比较
本章后续内容将介绍Iceberg的基本原理,来进一步阐述数据湖的优势。
Iceberg表格式
表格式定义
表格式[7]是Iceberg设计的核心概念,因而需要首先明确表格式的定义。从用户的角度,表格式用于回答“表里面有哪些数据”,表格式的关键目标是“让用户和工具能高效地处理表下的数据”。Iceberg是一种新设计的用于大规模数据集分析开源表格式,图3-1简单说明了Iceberg所属的生态位:
Iceberg不是存储引擎:其支持HDFS、S3对象存储作为底层存储引擎
Iceberg不是文件格式:使用Parquet存储数据文件
Iceberg不是查询引擎:可通过Spark/Flink/Trino/Hive查询
Iceberg生态位示意图
Hive表格式
Hive是一个非常宽泛的概念,包括Hive表格式、HiveQL、Hive执行引擎等。具体到Hive表格式上,其自十多年前诞生以来改变并不大,Hive表格式可以用图3-2简单地进行说明,其设计关键点是:
MySQLMetastore存储元数据,包括库、表和分区信息,不包含文件信息,最小的原子操作是分区级替换
用目录树组织数据文件,通过LIST目录接口获取分区下的数据文件列表,可实现分区级过滤
Hive表格式说明
Hive表格式概念非常简单,并成为事实上的标准,几乎所有的处理引擎均支持。然而Hive表格式的设计在大数据量和变更场景下有缺陷。
设计一元信息不包含文件信息。Hive元信息仅存储了分区级的信息,获取分区下文件需通过文件系统列举分区目录,导致如下缺点:
制定执行计划慢:假设一个表以小时分区且每小时有100个子分区,则7天范围共16.8K个分区,一个简单地扫描任务需执行O(N)次NameNodeRPC调用,N=扫描分区数,假设一次RPC调用需2ms,制定执行计划需耗时33.6秒;
无法应用文件级过滤:例如表存储的是广告点击记录,且写入时按照广告主ID排序,此时我们查询特定广告主ID的记录,每个分区下仅命中少量文件,但Hive并无相关信息用于过滤掉其他文件;
设计二最小的原子操作为分区替换。Hive大量操作都是先将数据写到临时目录,然后通过将临时目录移动到目标路径完成操作,其缺点是:
不支持修改:分区任何修改都需执行分区级覆盖,如历史分区新增一列;不支持行级修改;
不支持增量:若有一个任务消费表A更新表B,假设表A新增了迟到的增量文件,无法获取表A的增量更新部分触发计算更新表B。业务要么选择丢弃增量部分不往下游传递,要么对整个分区进行重算;
依赖文件系统重命名:对于对象存储不友好;
Iceberg表格式
Iceberg的表格式简化说明可参考图3-3。HiveMetastore记录表名,并指向当前快照S1,其指向了本次提交包含的所有数据文件;读操作访问快照S1,写服务更新快照S2,S2在被提交前读不可见;S2指向本次提交的增量部分数据文件,并将Parent指向S1,S2全量文件为S2和S1的总和;
Iceberg表格式说明
Iceberg和Hive最大的区别是,Hive元数据仅记录到分区级别,Iceberg元数据记录到文件级别。这一根本的修改,使得Iceberg具备如下几个优势。
优势一快照之间的隔离
读写互不干扰:读写可操作不同的快照,写在提交前不可见;
支持并行写入:采用乐观锁的机制,写的过程不加锁,提交前检查是否冲突,无冲突则提交成功,包含冲突内容则放弃提交稍后重试;
优势二更快地计划和执行速度
执行计划快:如前文所述Hive制定执行计划耗时和查询涉及的分区数正相关,而Iceberg直接读取元数据文件即可获得文件列表,制定执行计划耗时大幅缩短;
文件过滤加速执行:Iceberg记录了文件的统计信息,不同的执行引擎可基于统计信息(MinMax值、字典、布隆过滤器等)过滤掉无关的文件,大幅减少实际读取的文件数加速执行;
优势三高效地实现小的修改
新增数据:Iceberg支持往已有的表/分区中添加少量文件,无需分区级覆盖;
获取增量:Iceberg支持获取2次快照间的文件变化,并支持流式地读取变更,从而实现增量更新下游表;
行级更新支持
行级更新是数据湖的一个非常有吸引力的特性,有行级更新能力后可以支持很多新的应用场景,典型的如MySQL表实时同步到数据湖进行分析。Iceberg在V2格式[8]中实现了行级更新,采取MergeOnRead的策略,其原理示意可用图3-4进行说明,关键概念如下:
新定义DeleteFile:格式上仍然是DataFile,记录本次提交删除的行;
MergeonRead:读取时将DataFile和DeleteFile内容合并,得到准确的结果
Iceberg行级更新一个例子
以一个示例进行说明,快照2包含一个DataFile(id=4)和DeleteFile(id=2),读取S2快照实际返回id集合是(1,3,4);
从原理分析可知,当Delete文件非常多时,相关表的查询性能会非常差。Iceberg通过合并操作将DataFile和DeleteFile复写为真正的DataFile,V2格式表需配置合并任务,定期合并以控制表的文件数量。
技术小结-Iceberg如何实现设计目标
前文介绍了数据湖的诸多特性和其设计原理,接下来我们总结一下其每个特性是如何实现的,有什么具体的限制:
04 数据湖业务落地
本节介绍数据湖在爱奇艺一些业务具体的落地,重点介绍各业务原架构的痛点,使用数据湖后架构的演进和相应的业务价值。
Venus日志采集平台
业务痛点
Venus是爱奇艺自研的日志服务平台,支持采集机器、容器上的日志进行集中存储分析。之前使用ElasticSearch作为存储,图4-1上半部分展现了Venus使用ElasticSearch作为存储引擎的架构。Venus场景的特点是业务众多,且各个业务日志流写QPS大小不一,单个业务流量还可能会快速增加。由于ElasticSearch成本高昂,且单集群支持的写QPS有限,Venus团队做出如下优化:
大部分业务配置的是0副本:因ES写入成本高,所有业务配置1副本写入成本需翻倍;0副本导致任意硬盘/结点/集群故障都会影响部分业务写入;
业务隔离:给高优业务以独立ElasticSearch集群,低优业务共用公共集群,避免低优业务流量增长影响高优业务,但无法解决高优业务自身增长的问题;
流量调度:单个集群流量到瓶颈时,将部分流量调度到其他空闲集群;单个集群故障时,将业务流量调度到其他ElasticSearch集群;
即使应用以上优化,仍面临如下几个痛点:
写入失败多:业务排查时经常遇到日志延迟半小时以上,甚至写入失败,日志丢失等情况
排障压力大:由于0Replica很容易导致写入失败,每天需处理10+的运维请求;
成本高:ElasticSearch设计上是牺牲写入时性能以换取查询性能,而日志类特点是写入QPS大,查询QPS低,Venus机器经常磁盘达到瓶颈,而CPU和内存大量浪费;
新架构
深入分析Venus的业务需求后,我们可以总结其核心需求:
数据延迟低:日志采集到查询需要分钟级的延时;
查询速度快:交互式排障需要查询在秒级返回;
写入带宽高:峰值QPS千万/秒,总数据量在PB级;
Iceberg完全符合上述业务要求,并且由于Venus平台封装了查询的入口,替换底下的存储引擎对业务是透明的。切换后架构图见图4-1下半部分::
Venus日志存储由ElasticSearch切换为Iceberg
落地效果
Venus团队在2021年三季度开始逐步灰度流量,于2022年一季度全部切换为Iceberg,最终取得如下收益:
成本优化:Iceberg存储复用的HDFS,查询所有业务共用一个Trino集群,无需部署独立的集群,节省大量机器成本;
写入稳定:由于Iceberg存储是HDFS3副本,单个硬盘/结点故障不影响写入,且Iceberg写入带宽近乎无限,几乎不再发生达到写入瓶颈、存储容量不足、日志丢失的情况;
排障减少:Venus团队统计入湖后运维量降低80%,节省一个运维人力;
审核数据
业务痛点
审核团队业务原架构可用图4-2虚线以上半部分进行说明,它包含如下关键组件:
MongoDB:存储全量审核数据,规模在百亿行,仅对ID构建索引,无法对其他列开启索引;
ElasticSearch:存储用于检索的列,因数据量限制不存储原始消息;线上服务查询某个关键字的记录时,先通过ElasticSearch服务筛选命中的ID列表,再对ID列表逐一查询MongoDB获得原始记录;
MySQL:针对一些报表需求,通过定时任务查询ES并将聚合结果存储在MySQL;
Hive:业务原计划将MongoDB全量导出为Hive用于离线分析;
在原有通路中,报表分析场景面临诸多痛点:
开发成本高:每新增一个报表需求,需开发一个ES定时查询任务,将结果记录为一个MySQL表,并在报表页面进行适配,无法满足快速变化的分析需求;
ES查询瓶颈:当定时查询任务较多时,给ES服务造成较大的压力,影响线上通路性能和稳定性;
数据质量:当历史数据发生变更,如曾经审核通过的记录当前审核不通过,并不会更新已算好的统计值(如审核通过率),从而报表数据质量会逐渐下降;
存储容量:ES容量有限,当前MongoDB诸多大表不在ES通路;
Hive通路:业务初步调研后发现不可行,一方面全量导出耗时很久,执行一天仍未完成,另一方面导出过程给MongoDB造成较大的压力;
新架构
审核业务数据架构图
在和审核团队共同评估业务需求后,可归纳出业务需求的特性:
行级更新:审核的记录会一直变化,如审核状态、修改时间等;
高效查询:支持基于不同列的高效过滤分析,支持和其他表联合分析;
容量大:支持百亿量级,且未来还有更多场景接入;
分析下来Iceberg满足业务的需求,由此设计了采用Iceberg完成报表分析的方案,见图4-2虚线以下部分。其关键为审核将变更消息投递到Kafka,通过实时计算平台配置任务消费Kafka更新Iceberg。业务初始时需将MongoDB全量导出到Iceberg,后续通过行级更新保证Iceberg数据的一致性。报表系统使用SparkSQL查询Iceberg,业务除报表外还可通过魔镜满足各类即席分析场景。Iceberg方案具有如下优势:
开发成本低:撰写SQL即可;
查询可扩展:SparkSQL算力可水平扩展,且不影响线上通路;
数据质量高:行级更新保证Iceberg数据和MongoDB完全一致;
存储容量大:Iceberg存储是HDFS,支持PB以上的规模;
时效性好:数据延迟在5分钟,近实时地反映数据变更;
落地效果
审核团队在Iceberg表落地后赋予业务了一系列新的可能性,审核团队基于Iceberg表拓展了一系列从无到有的场景,其中部分场景如下:
数据统计:审核团队人效统计、风险监控实时报警;
基于关键字下线:原先需对ElasticSearch表全量扫描,影响线上稳定性,现在批量扫描Iceberg表即可;
导出数据:由于MongoDB无法做GroupBy分析,需导出到CSV后再用Shell脚本处理,需十几个小时;当前大幅降低工作量,执行SQL语句即可,耗时缩短到5小时;
降低风险:对数由原先16小时缩短到5小时,降低漏审/误审带来的内容安全风险。
Pingback流批一体
业务痛点
端上埋点在爱奇艺内部习惯被称为Pingback,其本质是对事件的描述,在一些特定过程中收集用户行为数据,来研究对象的使用状况,为后续的优化和运营策略提供数据支撑。当前Pingback链路可以见图4-4实时通路和离线通路两部分,其中离线通路以HDFS、Hive/Spark作为技术栈构建,将数据以分区管理,数据的延时在小时级别,支持全量读取与分区读取。实时通路以Kafka、Flink作为技术栈构建,支持记录级别的增量获取,数据延时在秒级,不支持全量数据读取。Pingback当前通路有如下问题:
离线通路有小时以上的延时,无法对最新数据做分析;
实时通路不支持数据明细查询,全量分析;
为了同时支持全量分析和低延时数据可见性,构建Lambda架构,而同时维护两个开发链路导致开发维护成本高、实时离线数据不一致等问题;
新架构
Pingback原Lambda架构和新添加的近实时通路
基于Iceberg的特点,Pingback计划新构建图4-4中的流批一体的“近实时通路”,该通路具体包括如下核心环节:
生产ODS层表:使用Flink增量消费Kafka中的全量数据,解析并按投递规则拆分生成ODS层表,一个Flink任务会拆分生成数百张Iceberg表;
生产DWD层表:通过Flink增量消费Iceberg表,进行维度扩展、标准化等加工生成DWD层表
下游Pipeline:下游业务可通过RCP实时计算平台、Babel离线计算平台继续构建Pipeline,也可在RAP实时分析平台、魔镜离线分析平台进行查询分析
理论上近实时通路可以完全代替掉离线通路,并能替换部分接受5分钟延时的实时通路;近实时通路将带来如下收益:
相比离线通路:数据可见性延时降低到5分钟以内,并且支持增量读取、版本回退等新特性;
相比实时通路:如果可接受5分钟的延时,具备实时通路增量读取的特性,并能支持全量读取、明细数据查询,具有更好的容错性;
相比Lambda架构,能做到存储计算的流批一体,避免开发维护两套代码及实时离线数据不一致的问题;
成本收益:预期近实时通路成本和离线通路接近,同时节省大量实时通路资源;
落地效果
当前已支持按需生产Pingback数据湖数据,应用情况如下:
播放Pingback;已生产播放Pingback峰值QPS百万级的数据,并使用增量读取数据湖的方式构建了爱奇艺的点播、直播报表,数据与已有离线数据一致,延时在1分钟左右,相比实时通路成本下降90%
QOSPingback:QOSPingback是监控APP运行状态的埋点信息,用于监控和排障。相比通过离线明细数据进行故障定位,使用近实时通路,在发现问题后,可立即查询明细数据定位故障,将大幅缩短故障定位时间。当前已稳定生产了QOSPingback的600多张表,正在推动业务迁移到近实时通路。
后续计划逐步推动业务将离线通路,及接受分钟级延时的实时通路迁移到近实时通路,构建分钟级延时的Pingback流批一体通路。
会员订单
业务痛点
会员订单信息是公司非常关键的信息,其原始信息存储在MySQL中,有非常多场景需要对订单信息进行聚合分析。MySQL聚合分析性能不好,以及需要和其他Hive表JOIN,因而需将订单表同步到OLAP引擎,目前通路可见图4-5虚线以上部分,主要有2种不同的通路:
通路一MySQL全量导出到Hive,其具有如下缺点:
数据时延大:当前导出是天级,业务只能分析一天前的数据;
MySQL压力大:每天全量导出数据量非常大,容易打满MySQLCPU;
通路二消费CDC变更流直接写Kudu,其具有如下缺点:
Kudu压力大:订单表消耗了KuduTB级的写内存,一方面机器成本高,经常需运维集群,另一方面未来扩展性差,难以承接其他MySQL场景;
写任务运维:Kudu集群写入性能有波动,会造成消费CDC变更流写Kudu任务堆积,需运维处理;
Spark任务失败:风控业务定期扫描分析Kudu表,一旦Kudu表Tablet有迁移会造成任务失败;
新架构
IcebergV2格式支持行级更新后,官方给出了Flink消费CDC入湖的解决方案,经过多个版本的迭代,解决了一致性、避免重复消费Binlog等问题。在此基础上大数据也解决了多个实际使用的问题,如写入性能差、小文件合并、BloomFilter加速查询等。以订单表为例,接入数据湖有如下优势:
延时低:近实时延迟,低至5分钟/1分钟;
查询快:通过SparkSQL查询,结合文件合并等优化,性能和Kudu方案接近;
成本低:Iceberg无需单独集群,机器成本非常低;
运维低:不会给MySQL造成巨大压力,无需特殊运维;
会员订单表MySQL同步到OLAP引擎聚合分析
落地效果
目前订单表已完成入湖,数据延时在1分钟,通过应用小文件合并、BloomFilter等技术,SparkSQL查询速度和Impala/Kudu方案接近。
05 总结及规划
当前数据湖发展非常迅速,Iceberg社区、爱奇艺内部应用都在快速成长,在已有的落地场景中可以看到其能给业务带来巨大的价值,优化业务的数据架构,加速数据分析,降低成本。本篇文章介绍了数据湖的基本原理和公司已落地的场景,我们也看到一些潜在的需求场景,如用户增长业务可应用数据湖进行实时归因、智能出价;奇谱视频元信息因会频繁变更,实时数据存储在HBase中,分析需先导出为Parquet,入湖后可直接分析;进一步推广流批一体在广告、BI的落地;使用数据湖将特征生产提速到分钟级、支持晚到数据和样本修正。