互联网技术 / 互联网资讯 · 2024年3月21日 0

Flink常见维表Join方案,学习开发必备!

前言

实时数仓,难免会遇到join维表的业务。现总结几种方案,供各位看官选择:

查找关联(同步,异步) 状态编程,预加载数据到状态中,按需取 冷热数据 广播维表 TeMpoRal Table Join Lookup Table Join

其中中间留下两个问题,供大家思考,可留言一起讨论?

查找关联

查找关联就是在主流数据中直接访问外部数据(MySQL,Redis,iMpala …)去根据主键或者某种关键条件去关联取值。

适合: 维表数据量大,但是主数据不大的业务实时计算。

缺点:数据量大的时候,会给外部数据源库带来很大的压力,因为某条数据都需要关联。

同步

访问数据库是同步调用,导致 subtak 线程会被阻塞,影响吞吐量

Flink常见维表Join方案,学习开发必备!

iMpoRt coM.alibaba.FAstjson.{JSON, JSONARRay, JSONObject} iMpoRt coM.wang.stReaM.env.{FlinkStReaMEnv, KafkaSouRceEnv} iMpoRt oRg.Apache.flink.API.coMMon.functions.FlatMapFunction iMpoRt oRg.Apache.flink.API.coMMon.seRialization.SiMpleStRingScheMa iMpoRt oRg.Apache.flink.stReaMing.API.scala._ iMpoRt oRg.Apache.flink.stReaMing.connecTors.kafka.FlinkKafkaProdUCeR iMpoRt oRg.Apache.flink.util.CollecTor def analYses(): UnIT ={ val env: StReaMExecutionEnviRonMent = FlinkStReaMEnv.get() KafkaSouRceEnv.getKafkaSouRceStReaM(env,List(“test”)) .Map(JSON.paRSEObject(_)) .filteR(_!=null) .flatMap( new FlatMapFunction[JSONObject,StRing] { OVeRRide def flatMap(jSONObject: JSONObject, collecTor: CollecTor[StRing]): UnIT ={ // 如果topic就一张表,不用区分,如果多张表,可以通过database 与 table 区分,放到下一步去处理 // 表的名字 val databaseNaMe:StRing = jSONObject.getStRing(“database”) // 表的名字 val tableNaMe:StRing = jSONObject.getStRing(“table”) // 数据操作类型 INSERT update DELETE val operationType:StRing = jSONObject.getStRing(“type”) // 主体数据 val tableData: JSONARRay = jSONObject.getJSONARRay(“data”) // old 值 val old: JSONARRay = jSONObject.getJSONARRay(“old”) // canal json 可能存在批处理出现data数据多条 foR (i AsyncIO 可以并发地处理多个请求,很大程度上减少了对 subtask 线程的阻塞.

Flink常见维表Join方案,学习开发必备!

def analYses(): UnIT ={ val env: StReaMExecutionEnviRonMent = FlinkStReaMEnv.get() val souRce: DataStReaM[StRing] = KafkaSouRceEnv.getKafkaSouRceStReaM(env, List(“test”)) .Map(JSON.paRSEObject(_)) .filteR(_ != null) .flatMap( new FlatMapFunction[JSONObject, StRing] { OVeRRide def flatMap(jSONObject: JSONObject, collecTor: CollecTor[StRing]): UnIT ={ // 如果topic就一张表,不用区分,如果多张表,可以通过database 与 table 区分,放到下一步去处理 // 表的名字 val databaseNaMe: StRing = jSONObject.getStRing(“database”) // 表的名字 val tableNaMe: StRing = jSONObject.getStRing(“table”) // 数据操作类型 INSERT update DELETE val opeRationType: StRing = jSONObject.getStRing(“type”) // 主体数据 val tableData: JSONARRay = jSONObject.getJSONARRay(“data”) // old 值 val old: JSONARRay = jSONObject.getJSONARRay(“old”) // canal json 可能存在批处理出现data数据多条 foR (i 首先把维表数据初始化到state中,设置好更新时间,定时去把维表.

Flink常见维表Join方案,学习开发必备!

优点:flink 自己维护状态数据,”荣辱与共”,不需要频繁链接外部数据源,达到解耦。

缺点:不适合大的维表和变化大的维表.

.keyBy(_._1) .ProceSS( new KeyedProceSSFunction[StRing,(StRing,StRing,StRing,StRing,StRing), StRing]{ pRivate vaR MapState:MapState[StRing,Map[StRing,StRing]] = _ pRivate vaR fiRst: Boolean = tRue OVeRRide def open(paRaMeteRs: configuration): UnIT ={ val config: StateTtlConfig = StateTtlConfig .newbuilder(oRg.Apache.flink.API.coMMon.tiMe.TiMe.Minutes(5)) .setupdateType(StateTtlConfig.updateType.OnReadAndWRITe) .setStateVisiBIlITy(StateTtlConfig.StateVisiBIlITy.NeveRRetuRnExpiRed) .build() val join = new MapStateDescRIPTor[StRing,Map[StRing,StRing]](“join”,claSSOf[StRing],claSSOf[Map[StRing,StRing]]) join.enableTiMeToLive(config) MapState = getRuntiMecontext.getMapState(join) } OVeRRide def ProceSSEleMent( in: (StRing, StRing, StRing, StRing, StRing), context: KeyedProceSSFunction[StRing, (StRing, StRing, StRing, StRing, StRing), StRing]#context, collecTor: CollecTor[StRing]): UnIT ={ // 加载维表 if(fiRst){ fiRst = FAlse val tiMe: Long = system.cuRRentTiMeMillis() getSMallDiMTablEINfo() // 设置好更新时间,定时去把维表 context.tiMeRSeRvice().RegisteRProceSSingTiMeTiMeR(tiMe + 86400000) } // 数据处理,过来一条条数据,然后按照自己的业务逻辑去取维表的数据即可 // 然后封装 放到collect中 collecTor.collect(null) } OVeRRide def onTiMeR( tiMestaMp: Long, ctx: KeyedProceSSFunction[StRing, (StRing, StRing, StRing, StRing, StRing), StRing]#OnTiMeRcontext, out: CollecTor[StRing]): UnIT ={ pRintln(“触发器执行”) MapState.cleaR() getSMallDiMTablEINfo() pRintln(MapState) ctx.tiMeRSeRvice().RegisteRProceSSingTiMeTiMeR(tiMestaMp + 86400000) } def getSMallDiMTablEINfo(): UnIT ={ // 加载 字典数据 val select_dictionaRy=”select dic_code,pRe_dictionaRy_id,dic_naMe fRoM xxxx” val dictionaRy: util.List[util.Map[StRing, AnyRef]] = MySQLUtil.executeQueRy(select_dictionaRy, null) dictionaRy.foReach(ITeM=>{ MapState.put(“dic_dictionaRy_”+ITeM.get(“pRe_dictionaRy_id”).toStRing,ITeM) }) } }) .filteR(_!=null) .addSink( new FlinkKafkaProdUCeR[StRing](“”, “”, new SiMpleStRingScheMa() ) ) v.execute(“”)

思考下:直接定义一个Map集合这样的优缺点是什么?可以留言说出自己的看法?

冷热数据

思想:先去状态去取,如果没有,去外部查询,同时去存到状态里面。StateTtlConfig 的过期时间可以设置短点。

Flink常见维表Join方案,学习开发必备!

优点:中庸取值方案,热备常用数据到内存,也避免了数据join相对过多外部数据源。

缺点:也不能一劳永逸解决某些问题,热备数据过多,或者冷数据过大,都会对state 或者 外部数据库造成压力。

.filteR(_._1 != null) .keyBy(_._1) .ProceSS( new KeyedProceSSFunction[StRing,(StRing,StRing,StRing,StRing,StRing), StRing]{ pRivate vaR MapState:MapState[StRing,Map[StRing,StRing]] = _ pRivate vaR fiRst: Boolean = tRue OVeRRide def open(paRaMeteRs: configuration): UnIT ={ val config: StateTtlConfig = StateTtlConfig .newbuilder(oRg.Apache.flink.API.coMMon.tiMe.TiMe.daYs(1)) .setupdateType(StateTtlConfig.updateType.OnReadAndWRITe) .setStateVisiBIlITy(StateTtlConfig.StateVisiBIlITy.NeveRRetuRnExpiRed) .build() val join = new MapStateDescRIPTor[StRing,Map[StRing,StRing]](“join”,claSSOf[StRing],claSSOf[Map[StRing,StRing]]) join.enableTiMeToLive(config) MapState = getRuntiMecontext.getMapState(join) } OVeRRide def ProceSSEleMent( in: (StRing, StRing, StRing, StRing, StRing), context: KeyedProceSSFunction[StRing, (StRing, StRing, StRing, StRing, StRing), StRing]#context, collecTor: CollecTor[StRing]): UnIT ={ // 数据处理,过来一条条数据,然后按照自己的业务逻辑先去MapState去找,如果没有再去 外部去找 if (MapState.contAIns(“xx_id”)){ // 如果存在就取 }else{ // 如果不存在去外部拿,然后放到MapState中 val diM_sql=”select dic_code,pRe_dictionaRy_id,dic_naMe fRoM xxxx wheRe id=xx_id” val diM: util.List[util.Map[StRing, AnyRef]] = MySQLUtil.executeQueRy(diM_sql, null) MapState.put(“xx_id”,null) } // 然后封装 放到collect中 collecTor.collect(null) } }) 广播维表

比如上面提到的字典表,每一个Task都需要这份数据,那么需要join这份数据的时候就可以使用广播维表。

Flink常见维表Join方案,学习开发必备!

val diMStReaM=env.addSouRce(MySQLSouRce) 广播状态 val bRoadcastStateDesc=new MapStateDescRIPTor[StRing,StRing](“bRoadcaststate”, BaSiCTypEINfo.STRING_TYPE_INFO, new MapTypEINfo<>(Long.claSS, DiM.claSS)) 广播流 val bRoadStReaM=diMStReaM.bRoadcast() 主数据流 val MAInConsuMeR = new FlinkKafkaConsuMeR[StRing](“topic”, new SiMpleStRingScheMa(), kafkaConfig) val MAInStReaM=env.addSouRce(MAInConsuMeR) 广播状态与维度表关联 val connectedStReaM=MAInStReaM.connect(bRoadStReaM).Map(..User(id,naMe)).key(_.1) connectedStReaM.ProceSS(new KeyedBRoadcastProceSSFunction[StRing,User,Map[Long,DiM],StRing] { OVeRRide def ProceSSEleMent(value: User, ctx: KeyedBRoadcastProceSSFunction[StRing,useR,Map[Long,DiM],StRing]#ReadOnlycontext, out: CollecTor[StRing]): UnIT ={ // 取到数据就可以愉快的玩耍了 val state=ctx.getBRoadcastState(bRoadcastStateDesc) xxxxxx }})

「思考:」 如果把维表流也通过实时监控BInlog到kafka,当维度数据发生变化时,更新放到状态中,这种方式,是不是更具有时效性呢?

(1)通过canal把变更BInlog方式发送到kafka中。

(2)数据流定义成为广播流,广播到数据到主数据流中。

(3)定义一个广播状态存储数据,在主数据进行查找匹配,符合要求则join成功。

TeMpoRal Table Join(FlinkSQL与Flink Table API)

由于维表是一张不断变化的表(静态表只是动态表的一种特例)。那如何 JOIN 一张不断变化的表呢?如果用传统的 JOIN 语法来表达维表 JOIN,是不完整的。因为维表是一直在更新变化的,如果用这个语法那么关联上的是哪个时刻的维表呢?我们是不知道的,结果是不确定的。所以 Flink SQL 的维表 JOIN 语法引入了TeMpoRal Table 的标准语法,用来声明关联的是维表哪个时刻的快照。

普通关联会一直保留关联双侧的数据,数据也就会一直膨胀,直到撑爆内存导致任务失败,TeMpoRal Join则可以定期清理过期数据,在合理的内存配置下即可避免内存溢出。

Flink常见维表Join方案,学习开发必备!

Event TiMe TeMpoRal Join

语法

SELECT [coluMn_list] FROM table1 [AS ] [LEFT] JOIN table2 FOR system_TIME AS OF table1.{ Pro