Apache Griffin with Cloudera Hadoop and Data Quality POC
https://pic1.zhimg.com/v2-0625279d51ccb65382e93d503b85c27a_1440w.jpg?source=172ae18b

Apache Griffin with Cloudera Hadoop and Data Quality POC

References

  • https://github.com/apache/griffin/blob/master/griffin-doc/deploy/deploy-guide.md
  • https://griffin.apache.org/

About Apache Griffin

Apache Griffin is an open source Data Quality solution for Big Data, which supports both batch and streaming mode. It offers an unified process to measure your data quality from different perspectives, helping you build trusted data assets, therefore boost your confidence for your business.

Apache Griffin offers a set of well-defined data quality domain model, which covers most of data quality problems in general. It also define a set of data quality DSL to help users define their quality criteria. By extending the DSL, users are even able to implement their own specific features/functions in Apache Griffin.

Prerequisites

  1. Java 1.8 or higher
  2. PostgreSQL 10.4 or MySQL 8.x
  3. NodeJS (NPM) 6.0+
  4. Hadoop 2.6 or higher
  5. Elastic Search 6.8.x (Latest version of Elastic Search i.e. 7.x is currently not supported. JIRA has been submitted for the same)
  6. Spark 2.x
  7. Livy (facilitates seamless submission of Spark jobs). Latest version of Livy does not support Spark 1.6.0 (packaged in CDH 5.15.x). Hence, Livy 0.4.0 is installed as a prerequisite.

Downloads

  • NodeJS (NPM) can be downloaded from link.
  • Elastic Search can be downloaded from link.

Note: Repository can be configured to download Elastic Search 6.x which will download Elastic Search 6.8.1

  • Apache Griffin Package can be downloaded from link.
  • Apache Livy 0.4.0 can be downloaded from link.

Port Requirements

No alt text provided for this image

Installation / Deployment of Apache Griffin (0.5.0)

Details can be referred at deployment guide of Apache Griffin available at this link

Prerequisites setup

1. Java 1.8.0+

yum install java-1.8.0-openjdk

2. MySQL 8.0

Install MySQL Server

yum install mysql-server-5.7

 Create database and tables (only for POC)

mysql -u <username> -e "create database quartz" –p

mysql -u <username> -p quartz < Init_quartz_mysql_innodb.sql

3. Update .bashrc file to reflect below properties

JAVA_HOME

HADOOP_CONF_DIR

SPARK_HOME

LIVY_HOME

PATH= $PATH:$HIVE_HOME/bin:$HADOOP_HOME/bin:$SPARK_HOME/bin:$LIVY_HOME/bin

4. NodeJS (NPM) 6.0+

yum install nodejs

yum install npm

Check version using

node –v

npm -v

5. Hadoop 2.6 or higher

It is assumed that Cloudera Hadoop 5.x is already deployed and configured

6.      Elastic Search 6.8.x

Latest version of Elastic Search i.e. 7.x is currently not supported. JIRA has been submitted for the same. GRIFFIN-346 - Support for Elastic Search latest version (7.9.1)

Create Elastic Search repository

[elasticsearch]

name=Elasticsearch repository for 6.x packages

baseurl=https://artifacts.elastic.co/packages/6.x/yum

gpgcheck=1

gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch

enabled=0

autorefresh=1

type=rpm-md

 Install Elastic Search

yum install --enablerepo=elasticsearch elasticsearch

 Enable and start Elastic Search service

systemctl enable elasticsearch.service

systemctl start elasticsearch.service

systemctl stop elasticsearch.service

Test Elastic Search connection

curl -X GET "<ES Server IP>:9200/?pretty" 

Configure Elastic Search (/etc/elasticsearch/elasticsearch.yml)

Update below fields:

network.host: <IP Address>

http.cors.enabled: true

http.cors.allow-origin: "*"

7. Spark 2.x (preferred).

Since, CDH 5.15.x is packaged with Spark 1.6.0 and hence, Livy 0.4.0 will be used

It is assumed that Spark is already deployed and configured as part of Cloudera Hadoop Cluster deployment

Check $SPARK_HOME/conf/spark-env.sh

HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

SPARK_MASTER_HOST=localhost

SPARK_MASTER_PORT=7077

SPARK_MASTER_WEBUI_PORT=8082

SPARK_LOCAL_IP=localhost

SPARK_PID_DIR=/apache/pids 

Upload some files to HDFS otherwise you will hit Error: Could not find or load main class org.apache.spark.deploy.yarn.ApplicationMaster, when you schedule spark applications.

hdfs dfs -mkdir /home/spark_lib

hdfs dfs -mkdir /home/spark_conf

hdfs dfs -put $SPARK_HOME/jars/* hdfs:///home/spark_lib/

hdfs dfs -put $HIVE_HOME/conf/hive-site.xml hdfs:///home/spark_conf/

Restart Spark

8. Livy (facilitates seamless submission of Spark jobs). Latest version of Livy does not support Spark 1.6.0 (packaged in CDH 5.15.x). Hence, Livy 0.4.0 is installed as a prerequisite.

mkdir /apache/livy/logs

Update /apache/livy/conf/livy.conf

livy.server.host = <Livy Server IP>

livy.spark.master = yarn

livy.spark.deployMode = cluster

livy.repl.enableHiveContext = true

livy.server.port 8998

 Start Livy Server:

/apache/livy/bin/livy-server start

Install and configure Griffin

1. Update Application Properties

Location: /<Griffin package>/service/src/main/resources/application.properties

# Apache Griffin server port (default 8080)
server.port = 8080
spring.application.name=griffin_service


# db configuration
spring.datasource.url=jdbc:mysql://localhost:3306/quartz?autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=Root@123
spring.jpa.generate-ddl=true
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.jpa.show-sql=true


# Hive metastore
hive.metastore.uris=thrift://master:9083
hive.metastore.dbname=default
hive.hmshandler.retry.attempts=15
hive.hmshandler.retry.interval=2000ms
# Hive cache time
cache.evict.hive.fixedRate.in.milliseconds=900000


# Kafka schema registry
kafka.schema.registry.url=https://localhost:8081


# Update job instance state at regular intervals
jobInstance.fixedDelay.in.milliseconds=60000


# Expired time of job instance which is 7 days that is 604800000 milliseconds.Time unit only supports milliseconds
jobInstance.expired.milliseconds=604800000


# schedule predicate job every 5 minutes and repeat 12 times at most


#interval time unit s:second m:minute h:hour d:day,only support these four units
predicate.job.interval=5m
predicate.job.repeat.count=12


# external properties directory location
external.config.location=


# external BATCH or STREAMING env
external.env.location=


# login strategy ("default" or "ldap")
login.strategy=default


# hdfs default name
fs.defaultFS=hdfs://master:8020


# yarn url
yarn.uri=https://master:8088


# griffin event listener
internal.event.listeners=GriffinJobEventHook


# livy
livy.uri=https://10.0.173.118:8998/batches

2. Update Quartz Properties

Location: service/src/main/resources/quartz.properties

org.quartz.scheduler.instanceName=spring-boot-quartz
org.quartz.scheduler.instanceId=AUTO
org.quartz.threadPool.threadCount=5
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
# If you use postgresql, set this property value to org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
# If you use mysql, set this property value to org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# If you use h2, it's ok to set this property value to StdJDBCDelegate, PostgreSQLDelegate or others
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true

org.quartz.jobStore.clusterCheckinInterval=20000

3. Update Spark Properties

Location: /<Griffin package>/service/src/main/resources/sparkProperties.json

griffin measure path referred below is the location where you should put the jar file of measure module.

{
    "file": "hdfs:///griffin/griffin-measure.jar",
    "className": "org.apache.griffin.measure.Application",
    "name": "griffin",
    "queue": "default",
    "numExecutors": 3,
    "executorCores": 1,
    "driverMemory": "1g",
    "executorMemory": "1g",
    "conf": {
        "spark.yarn.dist.files": "hdfs:///hive/hive-site.xml"
    },
    "files": [
    ],
    "jars": [
    ]
}

4. Update Env Batch and Env Streaming files

Location: /<Griffin package>/service/src/main/resources/env/env_batch.json AND env_streamining.json

Adjust sinks according to your requirement. At least, you will need to adjust HDFS output directory (hdfs:///griffin/persist by default), and Elasticsearch URL (https://<Elastic Search Server IP>:9200/griffin/accuracy by default). Similar changes are required in env_streaming.json

env_batch.json:

{
  "spark": {
    "log.level": "WARN"
  },
  "sinks": [
    {
      "type": "CONSOLE",
      "config": {
        "max.log.lines": 10
      }
    },
    {
      "type": "HDFS",
      "config": {
        "path": "hdfs:///griffin/persist",
        "max.persist.lines": 10000,
        "max.lines.per.file": 10000
      }
	},
	{
      "type": "ELASTICSEARCH",
      "config": {
        "method": "post",
        "api": "https://worker3:9200/griffin/accuracy",
        "connection.timeout": "1m",
        "retry": 10
      }
    }
  ],
  "griffin.checkpoint": []
}

env_streaming.json:

{
  "spark": {
    "log.level": "WARN",
    "checkpoint.dir": "hdfs:///griffin/checkpoint/${JOB_NAME}",
    "init.clear": true,
    "batch.interval": "1m",
    "process.interval": "5m",
    "config": {
      "spark.default.parallelism": 4,
      "spark.task.maxFailures": 5,
      "spark.streaming.kafkaMaxRatePerPartition": 1000,
      "spark.streaming.concurrentJobs": 4,
      "spark.yarn.maxAppAttempts": 5,
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.max.executor.failures": 120,
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.hadoop.fs.hdfs.impl.disable.cache": true
    }
  },
  "sinks": [
    {
      "type": "CONSOLE",
      "config": {
        "max.log.lines": 100
      }
    },
    {
      "type": "HDFS",
      "config": {
        "path": "hdfs:///griffin/persist",
        "max.persist.lines": 10000,
        "max.lines.per.file": 10000
      }
    }
  ],
  "griffin.checkpoint": [
    {
      "type": "zk",
      "config": {
        "hosts": "zk:2181",
        "namespace": "griffin/infocache",
        "lock.path": "lock",
        "mode": "persist",
        "init.clear": true,
        "close.clear": false
      }
    }
  ]
}

5. Build Griffin package using maven.

Install Maven referring to link if not already available.

Build Griffin

mvn clean install 

Copy jars from below two locations

/<Griffin Package>/service/target/service-0.5.0.jar

/<Griffin Package>/measure/target/measure-0.5.0.jar

 Rename jars as below

mv measure-0.4.0.jar griffin-measure.jar

mv service-0.4.0.jar griffin-service.jar

 Upload jars to HDFS

hdfs dfs -put *.jar /griffin/ 

Start Griffin

nohup java -jar <local jar location>/griffin-service.jar>service.out 2>&1 &

 Access Griffin UI using below link

https://<Griffin server IP>:8080

Setup Demo data on HDFS for Griffin

1. Create /home/spark_conf

hdfs dfs -mkdir -p /home/spark_conf

2. Upload hive-site.xml

hdfs dfs -put hive-site.xml /home/spark_conf/ 

3. Prepare demo tables

Create Demo tables

CREATE EXTERNAL TABLE 'demo_src'( 'id' bigint, 'age' int, 'desc' string) PARTITIONED BY ( 'dt' string, 'hour' string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LOCATION 'hdfs://<Name Node IP>:9000/griffin/data/batch/demo_src';

 CREATE EXTERNAL TABLE 'demo_tgt'('id' bigint, 'age' int, 'desc' string) PARTITIONED BY ( 'dt' string, 'hour' string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LOCATION ‘hdfs://<Name node IP>:9000/griffin/data/batch/demo_tgt';

 Spawn Demo Data

wget https://griffin.apache.org/data/batch/gen_demo_data.sh

wget https://griffin.apache.org/data/batch/gen_delta_src.sh

wget https://griffin.apache.org/data/batch/demo_basic

wget https://griffin.apache.org/data/batch/delta_tgt

wget https://griffin.apache.org/data/batch/insert-data.hql.template

chmod 755 *.sh

./gen_demo_data.sh

./gen-hive-data.sh

#!/bin/bash


#create table
hive -f create-table.hql
echo "create table done"


#current hour
sudo ./gen_demo_data.sh
cur_date=`date +%Y%m%d%H`
dt=${cur_date:0:8}
hour=${cur_date:8:2}
partition_date="dt='$dt',hour='$hour'"
sed s/PARTITION_DATE/$partition_date/ ./insert-data.hql.template > insert-data.hql
hive -f insert-data.hql
src_done_path=/griffin/data/batch/demo_src/dt=${dt}/hour=${hour}/_DONE
tgt_done_path=/griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}/_DONE
hadoop fs -mkdir -p /griffin/data/batch/demo_src/dt=${dt}/hour=${hour}
hadoop fs -mkdir -p /griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}
hadoop fs -touchz ${src_done_path}
hadoop fs -touchz ${tgt_done_path}
echo "insert data [$partition_date] done"


#last hour
sudo ./gen_demo_data.sh
cur_date=`date -d '1 hour ago' +%Y%m%d%H`
dt=${cur_date:0:8}
hour=${cur_date:8:2}
partition_date="dt='$dt',hour='$hour'"
sed s/PARTITION_DATE/$partition_date/ ./insert-data.hql.template > insert-data.hql
hive -f insert-data.hql
src_done_path=/griffin/data/batch/demo_src/
tgt_done_path=/griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}/_DONE
hadoop fs -mkdir -p /griffin/data/batch/demo_src/dt=${dt}/hour=${hour}
hadoop fs -mkdir -p /griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}
hadoop fs -touchz ${src_done_path}
hadoop fs -touchz ${tgt_done_path}
echo "insert data [$partition_date] done"


#next hours
set +e
while true
do
  sudo ./gen_demo_data.sh
  cur_date=`date +%Y%m%d%H`
  next_date=`date -d "+1hour" '+%Y%m%d%H'`
  dt=${next_date:0:8}
  hour=${next_date:8:2}
  partition_date="dt='$dt',hour='$hour'"
  sed s/PARTITION_DATE/$partition_date/ ./insert-data.hql.template > insert-data.hql
  hive -f insert-data.hql
  src_done_path=/griffin/data/batch/demo_src/dt=${dt}/hour=${hour}/_DONE
  tgt_done_path=/griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}/_DONE
  hadoop fs -mkdir -p /griffin/data/batch/demo_src/dt=${dt}/hour=${hour}
  hadoop fs -mkdir -p /griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}
  hadoop fs -touchz ${src_done_path}
  hadoop fs -touchz ${tgt_done_path}
  echo "insert data [$partition_date] done"
  sleep 3600
done
set -e

Verify if Hive tables are created and data loaded in it as expected

4. Create Elastic Search Index

You need to create Elastic Search index in advance, in order to set number of shards, replicas, and other settings to desired values

curl -k -H "Content-Type: application/json" -X PUT https://<Elastic Server IP>:9200/griffin \
 -d '{
    "aliases": {},
    "mappings": {
        "accuracy": {
            "properties": {
                "name": {
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    },
                    "type": "text"
                },
                "tmst": {
                    "type": "date"
                }
            }
        }
    },
    "settings": {
        "index": {
            "number_of_replicas": "2",
            "number_of_shards": "5"
        }
    }
}'

Verify if configuration is correct by going to https://<Elastic Server IP>:9200/griffin

Access Apache Griffin (0.5.0)

Login to Griffin

Login to Griffin using below link

https://<Griffin Server IP>:8080

 Griffin User guide

Please refer to user guide of Apache Griffin available at this link to understand how to create measure, jobs and see metrics.

?

Manish Warang

Infrastructure Solution Architect | Cloud Specialist

4 年

Thanks for sharing

要查看或添加评论,请登录

Sunil Muniyal的更多文章

  • Migrate Nifi standalone to cluster mode

    Migrate Nifi standalone to cluster mode

    We do have articles and Apache Nifi documents that helps to setup a new Nifi cluster. However, I was unable to find any…

    4 条评论

社区洞察

其他会员也浏览了