Apache Griffin with Cloudera Hadoop and Data Quality POC
Sunil Muniyal
Big data Operations (Cloudera) | Cloud Architecture (AWS / Azure) | Certified Azure Data Engineer
References
- https://github.com/apache/griffin/blob/master/griffin-doc/deploy/deploy-guide.md
- https://griffin.apache.org/
About Apache Griffin
Apache Griffin is an open source Data Quality solution for Big Data, which supports both batch and streaming mode. It offers an unified process to measure your data quality from different perspectives, helping you build trusted data assets, therefore boost your confidence for your business.
Apache Griffin offers a set of well-defined data quality domain model, which covers most of data quality problems in general. It also define a set of data quality DSL to help users define their quality criteria. By extending the DSL, users are even able to implement their own specific features/functions in Apache Griffin.
Prerequisites
- Java 1.8 or higher
- PostgreSQL 10.4 or MySQL 8.x
- NodeJS (NPM) 6.0+
- Hadoop 2.6 or higher
- Elastic Search 6.8.x (Latest version of Elastic Search i.e. 7.x is currently not supported. JIRA has been submitted for the same)
- Spark 2.x
- Livy (facilitates seamless submission of Spark jobs). Latest version of Livy does not support Spark 1.6.0 (packaged in CDH 5.15.x). Hence, Livy 0.4.0 is installed as a prerequisite.
Downloads
Note: Repository can be configured to download Elastic Search 6.x which will download Elastic Search 6.8.1
Port Requirements
Installation / Deployment of Apache Griffin (0.5.0)
Details can be referred at deployment guide of Apache Griffin available at this link
Prerequisites setup
1. Java 1.8.0+
yum install java-1.8.0-openjdk
2. MySQL 8.0
Install MySQL Server
yum install mysql-server-5.7
Create database and tables (only for POC)
mysql -u <username> -e "create database quartz" –p
mysql -u <username> -p quartz < Init_quartz_mysql_innodb.sql
3. Update .bashrc file to reflect below properties
JAVA_HOME
HADOOP_CONF_DIR
SPARK_HOME
LIVY_HOME
PATH= $PATH:$HIVE_HOME/bin:$HADOOP_HOME/bin:$SPARK_HOME/bin:$LIVY_HOME/bin
4. NodeJS (NPM) 6.0+
yum install nodejs
yum install npm
Check version using
node –v
npm -v
5. Hadoop 2.6 or higher
It is assumed that Cloudera Hadoop 5.x is already deployed and configured
6. Elastic Search 6.8.x
Latest version of Elastic Search i.e. 7.x is currently not supported. JIRA has been submitted for the same. GRIFFIN-346 - Support for Elastic Search latest version (7.9.1)
Create Elastic Search repository
[elasticsearch]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=0
autorefresh=1
type=rpm-md
Install Elastic Search
yum install --enablerepo=elasticsearch elasticsearch
Enable and start Elastic Search service
systemctl enable elasticsearch.service
systemctl start elasticsearch.service
systemctl stop elasticsearch.service
Test Elastic Search connection
curl -X GET "<ES Server IP>:9200/?pretty"
Configure Elastic Search (/etc/elasticsearch/elasticsearch.yml)
Update below fields:
network.host: <IP Address>
http.cors.enabled: true
http.cors.allow-origin: "*"
7. Spark 2.x (preferred).
Since, CDH 5.15.x is packaged with Spark 1.6.0 and hence, Livy 0.4.0 will be used
It is assumed that Spark is already deployed and configured as part of Cloudera Hadoop Cluster deployment
Check $SPARK_HOME/conf/spark-env.sh
HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
SPARK_MASTER_HOST=localhost
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8082
SPARK_LOCAL_IP=localhost
SPARK_PID_DIR=/apache/pids
Upload some files to HDFS otherwise you will hit Error: Could not find or load main class org.apache.spark.deploy.yarn.ApplicationMaster, when you schedule spark applications.
hdfs dfs -mkdir /home/spark_lib
hdfs dfs -mkdir /home/spark_conf
hdfs dfs -put $SPARK_HOME/jars/* hdfs:///home/spark_lib/
hdfs dfs -put $HIVE_HOME/conf/hive-site.xml hdfs:///home/spark_conf/
Restart Spark
8. Livy (facilitates seamless submission of Spark jobs). Latest version of Livy does not support Spark 1.6.0 (packaged in CDH 5.15.x). Hence, Livy 0.4.0 is installed as a prerequisite.
mkdir /apache/livy/logs
Update /apache/livy/conf/livy.conf
livy.server.host = <Livy Server IP>
livy.spark.master = yarn
livy.spark.deployMode = cluster
livy.repl.enableHiveContext = true
livy.server.port 8998
Start Livy Server:
/apache/livy/bin/livy-server start
Install and configure Griffin
1. Update Application Properties
Location: /<Griffin package>/service/src/main/resources/application.properties
# Apache Griffin server port (default 8080) server.port = 8080 spring.application.name=griffin_service # db configuration spring.datasource.url=jdbc:mysql://localhost:3306/quartz?autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true spring.datasource.username=root spring.datasource.password=Root@123 spring.jpa.generate-ddl=true spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.jpa.show-sql=true # Hive metastore hive.metastore.uris=thrift://master:9083 hive.metastore.dbname=default hive.hmshandler.retry.attempts=15 hive.hmshandler.retry.interval=2000ms # Hive cache time cache.evict.hive.fixedRate.in.milliseconds=900000 # Kafka schema registry kafka.schema.registry.url=https://localhost:8081 # Update job instance state at regular intervals jobInstance.fixedDelay.in.milliseconds=60000 # Expired time of job instance which is 7 days that is 604800000 milliseconds.Time unit only supports milliseconds jobInstance.expired.milliseconds=604800000 # schedule predicate job every 5 minutes and repeat 12 times at most #interval time unit s:second m:minute h:hour d:day,only support these four units predicate.job.interval=5m predicate.job.repeat.count=12 # external properties directory location external.config.location= # external BATCH or STREAMING env external.env.location= # login strategy ("default" or "ldap") login.strategy=default # hdfs default name fs.defaultFS=hdfs://master:8020 # yarn url yarn.uri=https://master:8088 # griffin event listener internal.event.listeners=GriffinJobEventHook # livy livy.uri=https://10.0.173.118:8998/batches
2. Update Quartz Properties
Location: service/src/main/resources/quartz.properties
org.quartz.scheduler.instanceName=spring-boot-quartz org.quartz.scheduler.instanceId=AUTO org.quartz.threadPool.threadCount=5 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX # If you use postgresql, set this property value to org.quartz.impl.jdbcjobstore.PostgreSQLDelegate # If you use mysql, set this property value to org.quartz.impl.jdbcjobstore.StdJDBCDelegate # If you use h2, it's ok to set this property value to StdJDBCDelegate, PostgreSQLDelegate or others org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.useProperties=true org.quartz.jobStore.misfireThreshold=60000 org.quartz.jobStore.tablePrefix=QRTZ_ org.quartz.jobStore.isClustered=true org.quartz.jobStore.clusterCheckinInterval=20000
3. Update Spark Properties
Location: /<Griffin package>/service/src/main/resources/sparkProperties.json
griffin measure path referred below is the location where you should put the jar file of measure module.
{ "file": "hdfs:///griffin/griffin-measure.jar", "className": "org.apache.griffin.measure.Application", "name": "griffin", "queue": "default", "numExecutors": 3, "executorCores": 1, "driverMemory": "1g", "executorMemory": "1g", "conf": { "spark.yarn.dist.files": "hdfs:///hive/hive-site.xml" }, "files": [ ], "jars": [ ] }
4. Update Env Batch and Env Streaming files
Location: /<Griffin package>/service/src/main/resources/env/env_batch.json AND env_streamining.json
Adjust sinks according to your requirement. At least, you will need to adjust HDFS output directory (hdfs:///griffin/persist by default), and Elasticsearch URL (https://<Elastic Search Server IP>:9200/griffin/accuracy by default). Similar changes are required in env_streaming.json
env_batch.json:
{ "spark": { "log.level": "WARN" }, "sinks": [ { "type": "CONSOLE", "config": { "max.log.lines": 10 } }, { "type": "HDFS", "config": { "path": "hdfs:///griffin/persist", "max.persist.lines": 10000, "max.lines.per.file": 10000 } }, { "type": "ELASTICSEARCH", "config": { "method": "post", "api": "https://worker3:9200/griffin/accuracy", "connection.timeout": "1m", "retry": 10 } } ], "griffin.checkpoint": [] }
env_streaming.json:
{ "spark": { "log.level": "WARN", "checkpoint.dir": "hdfs:///griffin/checkpoint/${JOB_NAME}", "init.clear": true, "batch.interval": "1m", "process.interval": "5m", "config": { "spark.default.parallelism": 4, "spark.task.maxFailures": 5, "spark.streaming.kafkaMaxRatePerPartition": 1000, "spark.streaming.concurrentJobs": 4, "spark.yarn.maxAppAttempts": 5, "spark.yarn.am.attemptFailuresValidityInterval": "1h", "spark.yarn.max.executor.failures": 120, "spark.yarn.executor.failuresValidityInterval": "1h", "spark.hadoop.fs.hdfs.impl.disable.cache": true } }, "sinks": [ { "type": "CONSOLE", "config": { "max.log.lines": 100 } }, { "type": "HDFS", "config": { "path": "hdfs:///griffin/persist", "max.persist.lines": 10000, "max.lines.per.file": 10000 } } ], "griffin.checkpoint": [ { "type": "zk", "config": { "hosts": "zk:2181", "namespace": "griffin/infocache", "lock.path": "lock", "mode": "persist", "init.clear": true, "close.clear": false } } ] }
5. Build Griffin package using maven.
Install Maven referring to link if not already available.
Build Griffin
mvn clean install
Copy jars from below two locations
/<Griffin Package>/service/target/service-0.5.0.jar
/<Griffin Package>/measure/target/measure-0.5.0.jar
Rename jars as below
mv measure-0.4.0.jar griffin-measure.jar
mv service-0.4.0.jar griffin-service.jar
Upload jars to HDFS
hdfs dfs -put *.jar /griffin/
Start Griffin
nohup java -jar <local jar location>/griffin-service.jar>service.out 2>&1 &
Access Griffin UI using below link
https://<Griffin server IP>:8080
Setup Demo data on HDFS for Griffin
1. Create /home/spark_conf
hdfs dfs -mkdir -p /home/spark_conf
2. Upload hive-site.xml
hdfs dfs -put hive-site.xml /home/spark_conf/
3. Prepare demo tables
Create Demo tables
CREATE EXTERNAL TABLE 'demo_src'( 'id' bigint, 'age' int, 'desc' string) PARTITIONED BY ( 'dt' string, 'hour' string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LOCATION 'hdfs://<Name Node IP>:9000/griffin/data/batch/demo_src';
CREATE EXTERNAL TABLE 'demo_tgt'('id' bigint, 'age' int, 'desc' string) PARTITIONED BY ( 'dt' string, 'hour' string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LOCATION ‘hdfs://<Name node IP>:9000/griffin/data/batch/demo_tgt';
Spawn Demo Data
wget https://griffin.apache.org/data/batch/gen_demo_data.sh
wget https://griffin.apache.org/data/batch/gen_delta_src.sh
wget https://griffin.apache.org/data/batch/demo_basic
wget https://griffin.apache.org/data/batch/delta_tgt
wget https://griffin.apache.org/data/batch/insert-data.hql.template
chmod 755 *.sh
./gen_demo_data.sh
./gen-hive-data.sh
#!/bin/bash #create table hive -f create-table.hql echo "create table done" #current hour sudo ./gen_demo_data.sh cur_date=`date +%Y%m%d%H` dt=${cur_date:0:8} hour=${cur_date:8:2} partition_date="dt='$dt',hour='$hour'" sed s/PARTITION_DATE/$partition_date/ ./insert-data.hql.template > insert-data.hql hive -f insert-data.hql src_done_path=/griffin/data/batch/demo_src/dt=${dt}/hour=${hour}/_DONE tgt_done_path=/griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}/_DONE hadoop fs -mkdir -p /griffin/data/batch/demo_src/dt=${dt}/hour=${hour} hadoop fs -mkdir -p /griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour} hadoop fs -touchz ${src_done_path} hadoop fs -touchz ${tgt_done_path} echo "insert data [$partition_date] done" #last hour sudo ./gen_demo_data.sh cur_date=`date -d '1 hour ago' +%Y%m%d%H` dt=${cur_date:0:8} hour=${cur_date:8:2} partition_date="dt='$dt',hour='$hour'" sed s/PARTITION_DATE/$partition_date/ ./insert-data.hql.template > insert-data.hql hive -f insert-data.hql src_done_path=/griffin/data/batch/demo_src/ tgt_done_path=/griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}/_DONE hadoop fs -mkdir -p /griffin/data/batch/demo_src/dt=${dt}/hour=${hour} hadoop fs -mkdir -p /griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour} hadoop fs -touchz ${src_done_path} hadoop fs -touchz ${tgt_done_path} echo "insert data [$partition_date] done" #next hours set +e while true do sudo ./gen_demo_data.sh cur_date=`date +%Y%m%d%H` next_date=`date -d "+1hour" '+%Y%m%d%H'` dt=${next_date:0:8} hour=${next_date:8:2} partition_date="dt='$dt',hour='$hour'" sed s/PARTITION_DATE/$partition_date/ ./insert-data.hql.template > insert-data.hql hive -f insert-data.hql src_done_path=/griffin/data/batch/demo_src/dt=${dt}/hour=${hour}/_DONE tgt_done_path=/griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour}/_DONE hadoop fs -mkdir -p /griffin/data/batch/demo_src/dt=${dt}/hour=${hour} hadoop fs -mkdir -p /griffin/data/batch/demo_tgt/dt=${dt}/hour=${hour} hadoop fs -touchz ${src_done_path} hadoop fs -touchz ${tgt_done_path} echo "insert data [$partition_date] done" sleep 3600 done set -e
Verify if Hive tables are created and data loaded in it as expected
4. Create Elastic Search Index
You need to create Elastic Search index in advance, in order to set number of shards, replicas, and other settings to desired values
curl -k -H "Content-Type: application/json" -X PUT https://<Elastic Server IP>:9200/griffin \ -d '{ "aliases": {}, "mappings": { "accuracy": { "properties": { "name": { "fields": { "keyword": { "ignore_above": 256, "type": "keyword" } }, "type": "text" }, "tmst": { "type": "date" } } } }, "settings": { "index": { "number_of_replicas": "2", "number_of_shards": "5" } } }'
Verify if configuration is correct by going to https://<Elastic Server IP>:9200/griffin
Access Apache Griffin (0.5.0)
Login to Griffin
Login to Griffin using below link
https://<Griffin Server IP>:8080
Griffin User guide
Please refer to user guide of Apache Griffin available at this link to understand how to create measure, jobs and see metrics.
Infrastructure Solution Architect | Cloud Specialist
4 年Thanks for sharing