Procedure To Run Tensor-flow on Hadoop
Before jumping directly into "Tensor-flow on Hadoop" concept,first we will understand critical terms in this article.
What is Tensor?
[Fig 1 : Source - Towards Data Science]
What is TensorFlow?
In Fig 2 “a” & “b” are two tensors
[Fig 2: Source – Kdnuggets]
Why we need Hadoop in Data Science?
Hadoop is a platform that is capable of slicing your problem as a data parallel problem and for simple needs (design patterns like counting, aggregation, filtering etc.) we need Hadoop and for more complex Machine Learning stuff like doing some Bayesian, SVM you need Mahout which in turn needs Hadoop (Now Apache Spark) to solve our problem using a data-parallel approach.
So Hadoop is a good platform to learn and really important for your batch processing needs. Not only Hadoop but you also need to know Spark (Mahout Runs its algorithms utilizing Spark) & Twitter Storm (for your real time analytics needs). This list will continue and evolve so if you are good with the building blocks (Distributed Computing, Data-Parallel (refer fig 2) Problems and so on and know how one such platform (say Hadoop) operates you will fairly quickly be up to speed on others.
[Fig 3: source - Dallas Makerspace Talk]
How to run TensorFlow on Hadoop?
Before getting into HDFS (Hadoop Distributed File System), please go through the below four methods for importing data into a TensorFlow program
- tf.data API: Easily construct a complex input pipeline. (preferred method)
- Feeding: Python code provides the data when running each step.
- QueueRunner: a queue-based input pipeline reads the data from files at the beginning of a TensorFlow graph.
- Preloaded data: a constant or variable in the TensorFlow graph holds all the data (for small data sets).
HDFS (Hadoop Distributed File System)
What is HDFS?
DFS : Distributed File System
[Fig 4: Source - Pitchengine]
While using HDFS with TensorFlow, change the file paths you use to read and write data to an HDFS path.
For example:
filename_queue = tf.train.string_input_producer([
"hdfs://namenode:8020/path/to/file1.csv",
"hdfs://namenode:8020/path/to/file2.csv",
])
To use the namenode specified in your HDFS configuration files, then change the file prefix to hdfs://default/.
Before launching TensorFlow program, the following environment variables must be set:
· JAVA_HOME: The location of your Java installation.
· HADOOP_HDFS_HOME: The location of your HDFS installation. You can also set this environment variable by running:
source ${HADOOP_HOME}/libexec/hadoop-config.sh
· LD_LIBRARY_PATH: To include the path to libjvm.so, and optionally the path to libhdfs.so if your Hadoop distribution does not install libhdfs.so in $HADOOP_HDFS_HOME/lib/native. On Linux:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${JAVA_HOME}/jre/lib/amd64/server
· CLASSPATH: The Hadoop jars must be added prior to running your TensorFlow program. The CLASSPATH set by${HADOOP_HOME}/libexec/hadoop-config.sh is insufficient. Globs must be expanded as described in the libhdfs documentation:
CLASSPATH=$(${HADOOP_HDFS_HOME}/bin/hadoop classpath --glob) python your_script.py
For older version of Hadoop/libhdfs (older than 2.6.0), you have to expand the classpath wildcard manually.
For more details, please visit https://issues.apache.org/jira/browse/HADOOP-10903
If the Hadoop cluster is in secure mode, the following environment variable must be set:
KRB5CCNAME: The path of Kerberos ticket cache file. For example:
export KRB5CCNAME=/tmp/krb5cc_10002
If you are running Distributed TensorFlow, then all workers must have the environment variables set and Hadoop installed.
You must be thinking about Distributed TensorFlow!!!
Let’s understand Distributed TensorFlow in depth.
Distributed TensorFlow
Now we will discuss that how to create a cluster of TensorFlow servers, and how to distribute a computation graph across that cluster.
For Example: Below code shows the simple TensorFlow cluster in action,
# Start a TensorFlow server as a single-process "cluster".
$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, Data Scientist!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target) # Create a session on the server.
>>> sess.run(c)
' Hello, Data Scientist!'
The tf.train.Server.create_local_server method creates a single-process cluster, with an in-process server.
TensorFlow Cluster
TensorFlow “cluster” is a set of tasks, each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph. A cluster can also be divided into one or more "jobs", where each job contains one or more tasks.
Below steps helps to create a cluster,
1. Create a tf.train.ClusterSpec that describes all of the tasks in the cluster. This should be the same for each task.
2. Create a tf.train.Server, passing the tf.train.ClusterSpec to the constructor, and identifying the local task with a job name and task index.
Create a tf.train.ClusterSpec to describe the cluster
The cluster specification dictionary maps job names to lists of network addresses. Pass this dictionary to the tf.train.ClusterSpec constructor. For example:
[Source : TensorFlow]
Create a tf.train.Server instance in each task
A tf.train.Server object contains a set of local devices, a set of connections to other tasks in itstf.train.ClusterSpec, and a tf.Session that can use these to perform a distributed computation. Each server is a member of a specific named job and has a task index within that job. A server can communicate with any other server in the cluster.
For example, to launch a cluster with two servers running on localhost:2222 and localhost:2223, run the following snippets in two different processes on the local machine:
# In task 0:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)
Specifying distributed devices in your model
To place operations on a particular process, you can use the same tf.device function that is used to specify whether ops run on the CPU or GPU. For example:
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.Variable(...)
biases_2 = tf.Variable(...)
with tf.device("/job:worker/task:7"):
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
# ...
train_op = ...
with tf.Session("grpc://worker7.example.com:2222") as sess:
for _ in range(10000):
sess.run(train_op)
Glossary:
Client
A client is typically a program that builds a TensorFlow graph and constructs a tensorflow::Session to interact with a cluster. Clients are typically written in Python or C++. A single client process can directly interact with multiple TensorFlow servers and a single server can serve multiple clients.
Cluster
A TensorFlow cluster comprises a one or more "jobs", each divided into lists of one or more "tasks". A cluster is typically dedicated to a particular high-level objective, such as training a neural network, using many machines in parallel. A cluster is defined by a tf.train.ClusterSpec object.
Task
A task corresponds to a specific TensorFlow server, and typically corresponds to a single process. A task belongs to a particular "job" and is identified by its index within that job's list of tasks.
TensorFlow server A process running a tf.train.Server instance, which is a member of a cluster, and exports a "master service" and "worker service".
**This Article code source is TensorFlow**