11-azkaban

发布时间:2023年12月19日

一、简介

遇到了什么问题才会使用Azkaban ?
    比如: 想启动hadoop集群
      先启动 zk集群,再启动 hdfs ,再启动  yarn,再启动日志系统 
    工作过程中总会遇到 多个脚本执行的时候有顺序。
    任务可以有一个编排的工具。
1. 一个完整的大数据分析系统通常都是由大量任务单元组成:shell脚本程序,mapreduce程序、hive脚本、spark程序等。

hadoop jar topN.jar 参数
hive -e "sql语句"
spark任务  flink任务等

2. 各任务单元之间存在时间先后及前后依赖关系:先后关系、依赖关系、定时执行。
3. 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行。

Azkban 就是一个任务调度和编排工具(类似于一个小型机器人)。
Oozie 也是一个这样的软件,但是一般没人使用了。比较的笨重,安装包都有1~2G左右,但是功能强大。
DolphinScheduler 也是一个相同功能的软件(目前非常流行的工具)

二、Azkaban的体系结构

image.png

主要分为三部分:

 WebServer :暴露Restful API,提供分发作业和调度作业功能;

      什么是接口? --> 一个功能,比如登录接口 ,你需要给我传递用户名密码,我返回true,false 

- ExecServer :对WebServer 暴露 API ,提供执行作业的功能;
- MySQL :数据存储,实现Web 和 Exec之间的数据共享和部分状态的同步。

如果是一个webServer +  1个  execServer = 单机版
      一个webServer +  N个  execServer = 集群版

?

三、Azkaban的安装步骤

安装分为两种,第一种使用源码安装,第二种使用已经编译好的安装包安装。

第一种方式:下载源码,编译(git,node,maven等工具)

Azkaban

第二种:使用安装包安装

?1、上传,解压

yum install unzip
-d的意思是解压到别的目录
unzip azkaban-3.56.0.zip -d /opt/installs/

[root@bigdata01 modules]# mv azkaban-3.56.0 /opt/installs/
[root@bigdata01 modules]# cd /opt/installs/
[root@bigdata01 installs]# mv azkaban-3.56.0/ azkaban

2、生成mysql的元数据

在mysql中创建一个数据库,叫做azkban

?

3、配置web-server

进入web-server文件夹:

cd /opt/installs/azkaban/web-server

在web-server文件夹下:
执行该命令为了生成keystore 文件
?

keytool -keystore keystore -alias jetty -genkey -keyalg RSA

密码默认都设置为:123456

请输入 Y ,否则再问一遍

修改web-server 下的conf 下的 azkaban.properties文件:

Jetty 是一个 web容器,用于运行web应用的,跟tomcat类似,只是比较小巧。

tomcat  开源免费 功能不算强大,不算一个正儿八经的web容器,只能算是servlet容器
Jetty  web容器,比tomcat 还小,功能还少
WebLogic  是一个收费的,重量级的web容器。

web容器:把你的网站页面和代码都放在这个软件的里面,一运行就出现页面,并且代码也可以运行。
# Azkaban Personalization Settings
azkaban.name=Test
azkaban.label=My Local Azkaban
azkaban.color=#FF3601
azkaban.default.servlet.path=/index
# 此处需要编写绝对路径,如果跟我的安装路径一样,不需要改
web.resource.dir=/opt/installs/azkaban/web-server/web
# 时区默认是美国时区,修改为上海时区
default.timezone.id=Asia/Shanghai

# Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager
user.manager.xml.file=/opt/installs/azkaban/web-server/conf/azkaban-users.xml

# Loader for projects
executor.global.properties=/opt/installs/azkaban/web-server/conf/global.properties
azkaban.project.dir=projects
# 配置mysql数据库连接的
database.type=mysql
mysql.port=3306
mysql.host=bigdata01
mysql.database=azkaban
mysql.user=root
mysql.password=123456
mysql.numconnections=100

# Velocity dev mode
velocity.dev.mode=false

# Azkaban Jetty server properties.
jetty.use.ssl=false
jetty.maxThreads=25
jetty.port=8081

# 此处是我们的生成的秘钥密码
jetty.keystore=keystore
jetty.password=123456
jetty.keypassword=123456
jetty.truststore=keystore
jetty.trustpassword=123456

# Azkaban Executor settings
executor.maxThreads=50
executor.port=12321
executor.flow.threads=30

# mail settings
#mail.sender=邮箱
#mail.host=什么邮箱
#mail.user=邮箱
#mail.password=授权码
# User facing web server configurations used to construct the user facing server URLs. They are useful when there is a reverse proxy between Azkaban web servers and users.

# enduser -> myazkabanhost:443 -> proxy -> localhost:8081

# when this parameters set then these parameters are used to generate email links.

# if these parameters are not set then jetty.hostname, and jetty.port(if ssl configured jetty.ssl.port) are used.
# azkaban.webserver.external_hostname=myazkabanhost.com
# azkaban.webserver.external_ssl_port=443
# azkaban.webserver.external_port=8081
job.failure.email=
job.success.email=
lockdown.create.projects=false
cache.directory=cache

# JMX stats
jetty.connector.stats=true
executor.connector.stats=true
azkaban.native.lib=false
# Azkaban plugin settings

azkaban.jobtype.plugin.dir=plugins/jobtypes

web-server/conf 目录下 azkaban-users.xml

<user password="admin" roles="admin,metrics" username="admin" />

4、配置exec-server

修改exec-webserver 下 的conf 下的azkaban.properties

# Azkaban Personalization Settings
azkaban.name=Azkaban
azkaban.label=My Local Azkaban
azkaban.color=#FF3601
azkaban.default.servlet.path=/index
web.resource.dir=/opt/installs/azkaban/web-server/web/
default.timezone.id=Asia/Shanghai

# Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager
user.manager.xml.file=/opt/installs/azkaban/web-server/conf/azkaban-users.xml

# Loader for projects
executor.global.properties=/opt/installs/azkaban/exec-server/conf/global.properties
azkaban.project.dir=projects/
azkaban.execution.dir=execution/
executor.flow.threads=30
flow.num.job.threads=10
job.log.chunk.size=100
job.log.backup.index=10
job.max.Xms=1
job.max.Xmx=2
azkaban.server.flow.max.running.minutes=-1



# Azkaban mysql settings by default. Users should configure their own username and password.
database.type=mysql
mysql.port=3306
mysql.host=bigdata01
mysql.database=azkaban
mysql.user=root
mysql.password=123456
mysql.numconnections=100
# Azkaban Executor settings
executor.maxThreads=50
executor.flow.threads=30


# Azkaban Executor settings
executor.maxThreads=50
executor.port=12321
executor.flow.threads=30

# JMX stats
jetty.connector.stats=true
executor.connector.stats=true

azkaban.native.lib=lib/
#azkaban.jobtype.plugin.dir=plugins/jobtypes


# uncomment to enable inmemory stats for azkaban
#executor.metric.reports=true
#executor.metric.milisecinterval.default=60000

修改exec-webserver 下插件下的一个配置文件:

plugins/jobtypes/commonprivate.properties

set execute-as-user
execute.as.user=false
memCheck.enabled=false

修改驱动包:

因为之前编译这个azkaban的时候使用的驱动包比较老,所以删除掉,换新的。

此处记得修改两个文件夹下的lib (web-server,exec-server)

cd /opt/installs/azkaban/exec-server/lib
rm -rf mysql-connector-java-5.1.28.jar

cp /opt/installs/sqoop/lib/mysql-connector-java-8.0.26.jar ./


cd /opt/installs/azkaban/web-server/lib
rm -rf mysql-connector-java-5.1.28.jar

cp /opt/installs/sqoop/lib/mysql-connector-java-8.0.26.jar ./

5、修改所有的.sh 的执行权限

需要将exec-server下以及 web-server 下所有的执行脚本都赋权限

修改web-server 下的执行权限:

cd /opt/installs/azkaban/web-server/bin
chmod 777 start-web.sh shutdown-web.sh
cd internal/
chmod 777 internal-start-web.sh util.sh 

修改exec-server 下的执行权限:

cd /opt/installs/azkaban/exec-server/bin
chmod 777 start-exec.sh shutdown-exec.sh
cd internal/
chmod 777 internal-start-executor.sh util.sh 

6、启动 web-server 以及 exec-server

cd   /opt/installs/azkaban-3.56.0/web-server/bin
./start-web.sh

image.png

cd   /opt/installs/azkaban-3.56.0/exec-server/bin
./start-exec.sh

启动无法访问,可以通过查看日志进行:

四、Azkaban的使用

1、使用Flow1.0(比较老旧)

需要注意的点:

1. azkaban的job流文件,后缀是.job
	1)  type属性 必须赋值
		值有:command,java,pig
2. azkaban执行的job必须要提前打包,打包的格式必须是zip格式
     不能打包文件夹,打包文件
3. 流文件里的书写格式:
	1)一定要注意行末不要有空格
	2)编码集的问题,如果在window上实在不行,可以上传到linux进行zip压缩,然后下载到windows上,再上传到azkaban上

先在我们的azkaban上创建一个项目:

Name 不能是中文的

Description 不能为空,可以是中文

编写一个job任务:?

type=command
command=echo "hello world"

使用windows编辑这个文件的时候,一般要注意,格式,需要是unix:

对这个hello.job 进行打包,不要打包文件夹,格式必须是zip格式

打成zip 包

上传至项目中:

执行的时候报了错:

进行如下修复:

在exec-server中,修改配置文件 azkaban.proerties 添加如下代码,注意安装路径

azkaban.jobtype.plugin.dir=/opt/installs/azkaban/exec-server/plugins/jobtypes

重启exec-server即可。

从这个里面是不是可以看到一个任务,被调用了。

通过案例可以看到:我们可以将编写好的shell,以及mr任务,以及定时任务以及hive脚本都可以写成job任务,让azkaban帮助我们调度。

2、Flow2.0的用法

1、小试牛刀

需要创建两个文件 JobA.flow JobA.project

在JobA.project中编写一句话:

azkaban-flow-version: 2.0

在JobA.flow中,编写任务,需要注意YAML语句:

nodes:
   - name: jobA
     type: command
     config:
        command: echo "this is a simple test"

注意事项:

如果是如下界面,选择如图所示:

打包两个文件到一个zip中

2、YAML格式的数据

听说过JSON、XML,后来出现YAML,作用跟json和xml是一样的都是一种文件,可以传递信息。

?

JSON  和  XML 都是用来存储和传输数据的。
JSON  存储数据,比如 datax中大量使用了json
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": [
              "id",
              "age"
            ],
            "splitPk": "id",
            "connection": [
              {
                "table": [
                  "user"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://bigdata01:3306/sqoop"
                ]
              }
            ]
          }
        }
编写好了一个json ,里面含有大量的信息,肯定是有人用了。用的人肯定解析了这个文件。
JSON 传输数据:   html --> Java(SSM SSH SpringBoot)
xml 也可以做到。但是慢慢的淘汰了。
这三类文件格式都可以充当配置文件(存储数据)
JSON:  core.json
XML : hdfs-site.xml
YAML : 即将开始

比如SpringBoot 里面支持两种配置文件:application.properties 和 application.yaml

1. 大小写敏感
2. 使用缩进表示层级关系 ;
3. 缩进长度没有限制,只要元素对齐就表示这些元素属于一个层级;
4. 使用#表示注释 ;
5. 字符串默认不用加单双引号,但单引号和双引号都可以使用,双引号表示不需要对特殊字符进行转义;
6. YAML 中提供了多种常量结构,包括:整数,浮点数,字符串,NULL,日期,布尔,时间。
7、缩进不允许使用tab,只允许空格

语法格式:

# value 与 : 符号之间必须要有一个空格
key: value

map的写法

# 写法一 同一缩进的所有键值对属于一个map
key: 
      key1: value1
      key2: value2

# 写法二
{key1: value1, key2: value2}

数组的表示方式

# 写法一 使用一个短横线加一个空格代表一个数组项
- a
- b
- c

# 写法二
[a,b,c]

关于单双引号

s1: '内容\n 字符串'
s2: "内容\n 字符串"

转换后:
{ s1: '内容\\n 字符串', s2: "内容\n 字符串" }

如果你想表示一个换行:
s1: "内容\n 字符串"
Flow 2.0 建议将公共参数定义在 `config` 下,并通过 `${}` 进行引用。

3、多任务依赖

many.flow

nodes:
  - name: jobE
    type: command
    config:
      command: echo "This is job E"
    # jobE depends on jobD
    dependsOn: 
      - jobD
    
  - name: jobD
    type: command
    config:
      command: echo "This is job D"
    # jobD depends on jobA、jobB、jobC
    dependsOn:
      - jobA
      - jobB
      - jobC

  - name: jobA
    type: command
    config:
      command: echo "This is job A"

  - name: jobB
    type: command
    config:
      command: echo "This is job B"

  - name: jobC
    type: command
    config:
      command: echo "This is job C"

?many.project

azkaban-flow-version: 2.0

可以一次执行多个任务,也可以单独执行某个任务:

总结:多任务编排,靠dependsOn 这个属性。

定时任务:

实战:将我们刚才的多任务的job,定时执行:

4、内嵌流(嵌套流)案例

Azkaban中一个流可以包含其他的流,方便分组和排序,也方便协作
因为在工作过程中不一定azkaban是你一个人在用,假如你的同事已经写好了好几个流,你需要放入到你的流文件中,可以直接copy,不需要修改其他的。

不带参数传递的内嵌流的使用:

nodes:
  - name: jobC
    type: command
    config:
      command: echo "This is job C"
    dependsOn:
      - embedded_flow
  - name: embedded_flow
    type: flow
    nodes:
      - name: jobB
        type: command
        config:
          command: echo "This is job B"
        dependsOn:
          - jobA
      - name: jobA
        type: command
        config:
          command: echo "This is job A"

带有参数的内嵌流:

nodes:
  - name: jobC
    type: command
    config:
      command: echo "This is job C"
    dependsOn:
      - embedded_flow

  - name: embedded_flow
    type: flow
    config:
      name: zhangsan
    nodes:
      - name: jobB
        type: command
        config:
          command: echo "This is job B ${name}"
        dependsOn:
          - jobA

      - name: jobA
        type: command
        config:
          command: echo "This is job A"

neiqianflow.project

azkaban-flow-version: 2.0

一定注意字符集,使用utf-8的无bom格式

5、动态传参

代码演示:

nodes:
  - name: jobC
    type: command
    config:
      command: echo "This is job C"
    dependsOn:
      - test_flow
  - name: test_flow
    #此处是type=flow
    type: flow
    config:
      prop: ${canshu}
    #此处有nodes
    nodes:
      - name: jobB
        type: command
        config:
          command: echo "This is job B ${prop}"
        dependsOn:
          - jobA
      - name: jobA
        type: command
        config:
          command: echo "This is job A"

打包,上传

3、Azkaban的报警机制

1)邮箱通知

必须有一个邮箱服务器,你想接受一个邮件,肯定是另一个邮箱给你发送的。

拿到授权码,设置中找到POP3/SMTP.....选项,获取授权码

接着进行配置:在azkaban的 web-server 端,进行邮箱的配置:

?

mail.sender=邮箱
mail.host=smtp.163.com
mail.user=邮箱
mail.password=授权码

重启web-server服务。

执行一个任务的时候,就可以设置要接受的邮箱地址了。

?

image.png

?

2)电话报警机制

睿象云-智能运维管理平台-智能运维系统-自动化运维性能监控平台

?

会生成一个appKey:?

?

?

最后一个Azkaban的重试机制:

 重试次数
retries: 3
# 每次重试时间间隔
retry.backoff: 10000


案例:
nodes:
   - name: jobA
     type: command
     config:
        command: echo "this is a simple test"
        retries: 3
        retry.backoff: 10000

?4、关于azkaban脚本优化的问题

目前遇到的问题:
1、启动azkaban的时候,需要启动两个甚至多个脚本,比较麻烦
    start-web.sh  start-exec.sh
2、日志查看没有放在一块,需要切换目录比较麻烦

优化方案:
编写一个脚本  azkaban.sh

源码如下:

#!/bin/bash
# 参数个数一个或者两个 not equal to --> ne
if  [ $# -lt 1 ] && [ $# -gt 2 ] 
then
echo -e  "\033[32mUsage1:\033[0m azkaban.sh 1 \033[32mor\033[0m azkaban.sh 0"
echo -e  "\033[32mUsage2:\033[0m azkaban.sh 1 w \033[32mor\033[0m azkaban.sh 0 w"
exit;
fi


# 输入 1 为启动, 输入 0 为关闭
v1="web"
v2="exec"

if [ $1 -eq 1 ]
then
stat="start-"
elif [ $1 -eq 0 ]
then
stat="shutdown-"
else
echo -e "\033[32mUsage:\033[0m azkaban.sh 1 \033[32mor\033[0m azkaban.sh 0"
exit;
fi

cd /opt/installs/azkaban/logs
# 传入第二个参数 w 代表 web-server , e 代表 exec-server 不传第二个默认全部启动/停止
if [ "$2" = 'w' ]
then
path1=/opt/installs/azkaban/${v1}-server/bin/${stat}${v1}.sh
echo -e "\033[32m${stat}${v1}.sh\033[0m"
sh  ${path1}
elif [ "$2" = 'e' ]
then
path2=/opt/installs/azkaban/${v2}-server/bin/${stat}${v2}.sh
echo -e "\033[32m${stat}${v2}.sh\033[0m"
sh  ${path2}
elif [ 'a'"$2"'a' = 'aa' ]
then
path1=/opt/installs/azkaban/${v1}-server/bin/${stat}${v1}.sh
echo -e "\033[32m${stat}${v1}.sh\033[0m"
sh  ${path1}
sleep 2
path2=/opt/installs/azkaban/${v2}-server/bin/${stat}${v2}.sh
echo -e "\033[32m${stat}${v2}.sh\033[0m"
sh  ${path2}
else
echo -e "\033[32mUsage1:\033[0m azkaban.sh 1 w \033[32mor\033[0m azkaban.sh 0 w"
echo -e "\033[32mUsage2:\033[0m azkaban.sh 1 e \033[32mor\033[0m azkaban.sh 0 e"
exit;
fi


sleep 2
echo -e "\033[32m-------------------jps--------------------\033[0m"
jps

配置脚本:

cd /usr/local/bin
将脚本放入进去

chmod 777 azkaban.sh

创建logs目录:

cd /opt/installs/azkaban/
用于存储两个服务的日志
mkdir logs 

使用:

# 使用方法
# 1.启动azkaban
[root@hadoop10 ~]# azkaban.sh 1
# 单独启动Web
[root@hadoop10 ~]# azkaban.sh 1 w
# 单独启动exec
[root@hadoop10 ~]# azkaban.sh 1 e

# 2.关闭azkaban
[root@hadoop10 ~]# azkaban.sh 0
# 单独关闭Web
[root@hadoop10 ~]# azkaban.sh 0 w
# 单独关闭exec
[root@hadoop10 ~]# azkaban.sh 0 e

五、使用java代码编写一个任务,给别人发邮件。

<dependency>
            <groupId>com.sun.mail</groupId>
            <artifactId>javax.mail</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>javax.mail</groupId>
            <artifactId>javax.mail-api</artifactId>
            <version>1.4.7</version>
        </dependency>

编写代码:


package com.email;

/***
 * @Date(时间)2023-05-12
 * @Author xx
 *
 *
 *
 * 发送邮件
 */
public class Email_Code {

    //测试一下
    public static void main(String[] args) throws Exception {

        for (int i = 0; i < 10; i++) {
            EmailUtils.sendEmail("发件人邮箱","收件人邮箱","发件人","收件人","信息","信息");
        }

    }


}

工具类代码:

package com.email;

import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.util.Date;
import java.util.Properties;

public class EmailUtils {

    public static void sendEmail(String fromAccount,String toAccount,String fromName,String toName,String title,String content) throws Exception {

        // 参数配置,?于连接邮件服务器
        Properties props = new Properties();
        // 使?协议
        props.setProperty("mail.transport.protocol", "smtp");
        // 发件?邮箱的 SMTP 服务器地址
        props.setProperty("mail.smtp.host", "smtp.163.com");
        // 需要请求认证
        props.setProperty("mail.smtp.auth", "true");
        // 创建会话对象,?于与邮箱服务器交互
        Session session = Session.getInstance(props);
        // 设置为debug模式,在控制台中可以查看详细的发送?志
        session.setDebug(true);

        // 1.创建邮件对象
        MimeMessage message = new MimeMessage(session);
        // 2.设置发件?,其中 InternetAddress 有三个参数,分别为:邮箱,显示的昵称,昵称的字符集编码
        message.setFrom(new InternetAddress(fromAccount, fromName, "UTF-8"));

        // 3.设置收件? - MimeMessage.RecipientType.TO
        message.setRecipient(MimeMessage.RecipientType.TO, new InternetAddress(toAccount ,toName, "UTF-8"));

        // 7.设置邮件主题
        message.setSubject(title,"UTF-8");

        // 8.设置邮件正?内容,指定格式为HTML格式
        message.setContent(content, "text/html;charset=UTF-8");
        // 9.设置显示发件时间
        message.setSentDate(new Date());

        // 10.保存设置
        message.saveChanges();

        // 根据 Session 获取邮件传输对象
        Transport transport = session.getTransport();
        // 连接邮件服务器
        transport.connect("18638147931@163.com", "MAGBDQDGKEHCBVQA");
        // 发送邮件
        transport.sendMessage(message, message.getAllRecipients());
        // 关闭连接
        transport.close();
    }
}

文章来源:https://blog.csdn.net/qq_43819048/article/details/132762622
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。