Skip to content
This repository has been archived by the owner on May 7, 2022. It is now read-only.

WeiWan5675/FlinkSupport

Repository files navigation

Fkink Support

License

交流

  有想一起交流的同学可以扫下面的二维码添加我的微信,一起学习一起写代码谢谢!。

简介

  FlinkSupport是一个基于Flink框架,采用Java语言进行开发的支持类框架,框架主要分为几个模块,包含快速开发模块、数据ETL模块、FlinkSql交互式任务模块、FlinkUtils模块,通过对日常工作中Flink常用场景进行封装,提高Flink开发的便利性,提供插件化的Flink开发能力。

  FlinkSupport提供了一些快速开发的能力,能够帮助开发者快速进行程序开发,极大的降低了开发成本。并且框架提供了ETL数据插件的相关接口,使用ETL模块,能够快速将数据ETL任务通过配置文件+ETL插件的形式进行数据落地。

  目前FlinkSupport仅支持Flink-1.11.1以上版本,后期会支持Flink1.9+

快速开始

  • 要求

    • hadoop相关环境(运行) 2.6+
    • flink (运行) 1.9+
    • jdk 1.8+
    • maven 3.4.5+
  • Clone本项目

    git clone [email protected]:WeiWan5675/FlinkSupport.git
  • 打包

    mvn clean package install -Dmaven.test.skip=true -f pom.xml
  • 解压

    tar -zxvf FlinkSupport-1.0.tar.gz -C FlinkSupport-1.0
  • 部署运行

    • 初始化

      第一次运行,请执行以下命令,需要配置FLINK_HOME HADOOP_HOME 环境变量,如未配置,可以手动

      $FLINK_SUPPORT_HOME/bin/flink-support init [-hadoopHome "" -flinkHome ""]
    • 任务提交

      $FLINK_SUPPORT_HOME/bin/support-submit -c ./resource_dir/app-conf.yaml -resources resource_dir
    • 任务停止

      $FLINK_SUPPORT_HOME/bin/flink-support stop jobId
  • 程序开发

    • 引入依赖

          <dependency>
          	<groupId>com.weiwan</groupId>
          	<artifactId>support-core</artifactId>
          </dependency>
          <dependency>
          	<groupId>com.weiwan</groupId>
          	<artifactId>etl-framework</artifactId>
          </dependency>
          <dependency>
          	<groupId>com.weiwan</groupId>
          	<artifactId>support-utils</artifactId>
          </dependency>
    • 开发

      具体的程序开发相关说明请查看快速开发说明文档

        普通Flink程序Demo

        ETL框架插件

  • 其它

    FlinkSupport支持相关运行命令请查看命令手册

模块

  FlinkSupport模块划分主要分为两部分,包括面向用户快速开发模块以及FlinkSupport自身的运行时模块。

  • 面向用户(快速开发模块)
    • support-core
    • support-etl-framework
    • support-utils
  • FlinkSupport(运行时模块)
    • support-runtime
    • support-launcher
    • support-monitor

  具体的模块说明及相关的设计文档请查看设计文档

ETL模式

  ETL模式,通过对Flink相关能力进行封装,基于InputFormatSource、OutputFormatSink、MapFunction 抽象出三大数据插件 Reader、Processer、Writer。

  通过插件化,可以快速的在异构数据源中进行数据同步,数据处理等操作。

应用配置

  • 默认配置

    ##########################################################
    # FlinkSupport默认配置文件,包含以下内容:
    # 1. Support相关变量配置
    # 2. 大数据组件的ENV变量配置
    # 3. Flink任务相关参数设置
    # 4. 其它
    # 默认配置文件中,所有选项都可以在app-conf.yaml中进行覆盖,也可以通过修改默认配置文件,达到全局任务配置修改的目的
    ##########################################################
    
    HADOOP_HOME:
    YARN_HOME:
    HIVE_HOME:
    FLINK_HOME:
    FLINK_VERSION: 1.11.1
    SCALA_VERSION: 2.11
    HADOOP_USER_NAME: easylife
    
    SUPPORT_TASK_LOGDIR: /tmp/flink_support/logs
    
    flink:
      task:
        type: stream
        name: FlinkApplication
        common:
          parallelism: 1 #并行度
          restartMode: none #fixed-delay | failure-rate | none  默认fixed-delay
          restartNum: 1  #重启次数  默认3
          restartInterval: 30000  #重启延迟  默认30S
          restartFailMaxNum: 1 #最大重启失败次数
          queue: root.default
        batch:
          sessionTimeout:     #保存作业的中间结果的超时时间 暂未启用
        stream:
          timeCharacteristic:    #流处理的事件模式  默认processTime eventTime
        checkpoint:
          enable: false       #是否启用检查点
          interval: 60000         #检查点间隔  单位毫秒
          timeout: 60000       #检查点超时时间 单位毫秒
          mode: EXACTLY_ONCE #检查点模式: AT_LEAST_ONCE  EXACTLY_ONCE
          minInterval: 500 #最小检查点间隔 单位毫秒
          maxConcurrent: 1   #z最多有多少checkpoint可以在运行
          externalized:
            enable: false    #是否开启checkpoint的外部持久化
            cleanUp: DELETE_ON_CANCELLATION #DELETE_ON_CANCELLATION  自动删除   RETAIN_ON_CANCELLATION 保留
          onFail: true  #当checkpoint发生错误时,是否认为任务失败 true 失败  false 拒绝checkpoint继续任务
        stateBackend:
          type: Memory #三种 Memory  FileSystem  RocksDB
          async: true #仅在配置为Memory FileSystem 时生效 RocksDB默认为异步
          path:  #支持hdfs路径 或者本地文件路径 hdfs:https://namenode:40010/flink/checkpoints  file:https:///data/flink/checkpoints
    
    
    app:
      class: com.weiwan.tester.run.Tester
      name: Support Application
      etlMode: false
      sqlMode: false
    
  • ETL配置

    ###########################################
    # FlinkSupport-Etl模式模式配置文件
    # 1. 定义插件
    # 2. 重写默认应用配置文件
    ###########################################
    flink:
      task: #Flink相关任务配置
        type: stream
    
    app: #应用相关配置
      name: SupportEtlApplication
      etlMode: true
      sqlMode: false
    
    etl:
      reader:
        name: ExampleReader #Reader插件
        class: com.weiwan.support.plugins.reader.ExampleReader
        parallelism: 1
        example:
          readereVar: 1000
      processer:
        name: ExampleProcesser #Processer插件
        class: com.weiwan.support.plugins.processer.ExampleProcesser
        parallelism: 1
        example:
          channelVar: channel_var
      writer:
        name: ExampleWriter #Writer插件
        class: com.weiwan.support.plugins.writer.ExampleWriter
        parallelism: 1
        example:
          writerVar: writer_var
  • 其它

      配置文件由三部分组成,Flink配置、Application配置、ETL模块配置,关于配置文件详细的内容以及支持的参数等可以查看应用配置文档

注解支持

  FlinkSupport支持注解主要集中自动化接入数据方面,详见FlinkSupport注解说明

关于FlinkSupport

  关于该框架以后的维护,主要考虑从几个方向进行,框架代码优化、数据插件、注解支持、监控及控制台支持Flink1.9+等。

  在开发这个框架过程中,由于能力有限,代码写的比较简陋,我会继续努力,如果有大佬可以指点下,万分感激。

License

Apache License 2.0, see LICENSE.

About

Flink应用程序开发支持框架

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published