大数据 - DWD&DIM 业务数据

数据,dwd,dim,业务 · 浏览次数 : 195

小编点评

**业务数据的变化,我们可以通过 FlinkCDC 采集到,但是 FlinkCDC 是把全部数据统一写入一个 Topic 中,这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 HBase,将事实数据写回 Kafka 作为业务数据的 DWD 层实现动态分流功能** **因为 FlinkCDC 是把全部数据统一写入一个 Topic 中,这样显然不利于日后的数据处理,所以需要把各个表拆开处理。** **由于每个表有不同的特点,有些表是维度表,有些表是事实表。** **在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。** **一般把事实数据写入流中,进行进一步处理,最终形成宽表。** **维度数据不放 Redis 的原因:** 1. **数据量很大**:User 用户维度数据量很大,其它维度还行。 2. **并发压力大**:并发压力大这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。 **所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。** **两种方案实现一种是用 Zookeeper 存储,通过 Watch 感知数据变化;另一种是用 mysql 数据库存储,周期性的同步。** **本文选择第二种方案,主要是 MySQL 对于配置数据初始化和维护管理,使用 FlinkCDC 读取配置信息表,将配置流作为广播流与主流进行连接。** **获取执行环境消费Kafka ods_base_db 主题数据创建流将每行数据转换为JSON对象并过滤(delete) 主流使用FlinkCDC消费配置表并处理成 广播流连接主流和广播流分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理)提取Kafka流数据和HBase流数据将Kafka数据写入Kafka主题,将HBase数据写入Phoenix表启动任务table_processtable_process主健:sourceTable + typesourceTable根据表名分流type用来区分新增、变更的数据,不同类型的数据放到不同主题表不sinkType放Kafka还是其它地方sinkTable如果是维度表,就是Phoenix表名,如果是 kafka 就是 主题sinkColumns提供字段,为了自动建表pkPhoenix 建表必须有主健extend指定要不要做分区表,等等**

正文

业务数据的变化,我们可以通过 FlinkCDC 采集到,但是 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 HBase,将事实数据写回 Kafka 作为业务数据的 DWD 层

实现动态分流功能

由于 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表。

在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。

维度数据不放 Redis 的原因:User 用户维度数据量很大,其它维度还行。
为什么不放 MySQL: 并发压力大

这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。

这种可以有两个方案实现

  • 一种是用 Zookeeper 存储,通过 Watch 感知数据变化;

  • 另一种是用 mysql 数据库存储,周期性的同步;(有配置表,指定哪些表的数据发给哪些主题)

  • 另一种是用 mysql 数据库存储,使用广播流。
    这里选择第二种方案,主要是 MySQL 对于配置数据初始化和维护管理,使用 FlinkCDC 读取配置信息表,将配置流作为广播流与主流进行连接。
    image

  • 获取执行环境

  • 消费Kafka ods_base_db 主题数据创建流

  • 将每行数据转换为JSON对象并过滤(delete) 主流

  • 使用FlinkCDC消费配置表并处理成 广播流

  • 连接主流和广播流

  • 分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理)

  • 提取Kafka流数据和HBase流数据

  • 将Kafka数据写入Kafka主题,将HBase数据写入Phoenix表

  • 启动任务

table_process

table_process 主健:sourceTable + type
sourceTable 根据表名分流
type 用来区分新增、变更的数据,不同类型的数据放到不同主题表不
sinkType 放Kafka还是其它地方
sinkTable 如果是维度表,就是Phoenix表名,如果是 kafka 就是 主题
sinkColumns 提供字段,为了自动建表
pk Phoenix 建表必须有主健
extend 指定要不要做分区表,等等

Demo

sourceTable type sinkType sinkTable
base_trademark insert hbase dim_xxx(Phoenix 表名)
order_info insert kafka dwd_xxx(主题名)
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

image

1.读取状态2.过滤数据
3.分流

ODS:

  • 数据源:行为数据,业务数据
  • 架构分析:
    FlinkCDC: DataStream/FlinkSOIFlinkCDC/Maxwell/Canal保持数据原貌,不做任何修改! ods_base_log,ods_base_db

DWD-DIM:

行为数据:DWD(Kafka)

1.过滤脏数据 --> 侧输出流 脏数据率
2.新老用户校验 --> 前台校验不准
3.分流 --> 侧输出流 页面、启动、曝光、动作、错误
4.写入Kafka

业务数据:DWD (Kafka)-DIM(Phoenix)

1.过滤数据-->删除数据
2.读取配置表创建广播流
3.连接主流和广播流并处理
1)广播流数据:

  • 解析数据
  • Phoenix 建表(HBase)
  • 写入状态广播

2)主流数据

  • 读取状态
  • 过滤字段
  • 分流(添加 SinkTable 字段)

4.提取Kafka和 HBase 流,分别对应的位置
5.HBase流:自定义 Sink
6.Kafka流:自定义序列化方式

image

大数据-数据仓库-实时数仓架构分析
大数据-业务数据采集-FlinkCDC
大数据 - DWD&DIM 行为数据
大数据 - DWD&DIM 业务数据
大数据 DWM层 业务实现

与大数据 - DWD&DIM 业务数据 相似的内容:

大数据 - DWD&DIM 业务数据

业务数据的变化,我们可以通过 FlinkCDC 采集到,但是 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 HBase,将事

大数据 - DWD&DIM 行为数据

我们前面采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 Kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回 Kafka 不同主题中,作为日

大数据 - ODS&DWD&DIM-SQL分享

大数据 ODS&DWD&DIM-SQL分享 需求 思路一:等差数列 断2天、3天,嵌套太多 1.1 开窗,按照 id 分组,同时按照 dt 排序,求 Rank -- linux 中空格不能用 tab 键 select id,dt,rank() over(partition by id order b

大数据 - DWS层 业务实现

统计主题 需求指标【ADS】输出方式计算来源来源层级 访客【DWS】pv可视化大屏page_log 直接可求dwd UV(DAU)可视化大屏需要用 page_log 过滤去重dwm UJ 跳出率可视化大屏需要通过 page_log 行为判断dwm 进入页面数可视化大屏需要识别开始访问标识dwd 连续

大数据 - DWM层 业务实现

DWM 建表,需要看 DWS 需求。 DWS 来自维度(访客、商品、地区、关键词),为了出最终的指标 ADS 需求指标 DWT 为什么实时数仓没有DWT,因为它是历史的聚集,累积结果,实时数仓中不需要 DWD 不需要加工 DWM 需要加工的数据 统计主题 需求指标【ADS】输出方式计算来源来源层级

孙荣辛|大数据穿针引线进阶必看——Google经典大数据知识

大数据技术的发展是一个非常典型的技术工程的发展过程,荣辛通过对于谷歌经典论文的盘点,希望可以帮助工程师们看到技术的探索、选择过程,以及最终历史告诉我们什么是正确的选择。 何为大数据 “大数据”这个名字流行起来到现在,差不多已经有十年时间了。在这十年里,不同的人都按照自己的需要给大数据编出了自己的解释

大数据-数据仓库-实时数仓架构分析

![image](https://img2023.cnblogs.com/blog/80824/202211/80824-20221128173125005-1682211493.png) ![image](https://img2023.cnblogs.com/blog/80824/202211/

大数据-业务数据采集-FlinkCDC

CDC CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 的种类 CDC 主要分为基于查询和基于 Binl

大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property

Caused by: org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: The server time zone value '�й���׼ʱ��' is unrecognized or

大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format

Caused by: org.apache.kafka.connect.errors.ConnectException: The MySQL server is not configured to use a ROW binlog_format, which is required for this