Apache Storm and big data
A background:
Big data is here for a while now. At the practical level, big data helps us to better understand our clients, it helps us to better analysis information. It helps us to do tasks we couldn’t imagine that are doable, and eventually it might turn the world into a better one. For sure it changed the world.
Many open source projects are there to help us to manipulate big data. In general those can be divided into three categories:
- Batch processing frameworks: for instance: Apache Hadoop
- Stream processing frameworks: Apache Storm, Apache Samza
- Kind of hybrid frameworks: Apache Spark, Apache Flink
Processing frameworks are there to mimic humans thinking and help us in providing tools to overcome a huge amount of data.
Analyzing streaming of information always fascinated me. In real life scenarios data is gotten piece by piece, and optionally also based on previously information, in which we take decisions on. The thing here is that we need to process the incoming facts or data in no time, send alerts (or whatever it’s needed) etc. For that to be effective, a system needs to be able to scale and to be reused for many kinds of information.
Here in this article, we’re going to put the focus on streaming processing and specifically on Storm framework.
Streaming is considered unbounded as the information is kept coming in until the system is stopped.
Schematically the system of processing of a data streaming looks like this:
- So basically what we’ve got here is a source of streaming of data that pushes information into a shared component (the Mediator)
- The mediator tries to pre-process the piece of information inside the Preprocessor.
- That piece of information is now put inside workers that analyze and final process that piece of information.
If we don’t need to scale the system, it’s not that difficult to build an in-house one. But as we all know, this is not usually the case. Storm is offering out of the box a simple to use framework that takes care of all the above components for us.
About Storm:
Storm is a distributed stream processing framework which is written in Clojure. It was originally developed by Nathan Marzand. The project went its way into BackType and eventually went to open source by Twitter after BackType was acquired by Twitter. To date, last known version of Storm is 1.1.1.
Why considering using Storm:
Storm takes care of all aspects of the abstraction layer of managing and moving the information, so we left only to deal with aspects related directly to the business logic.
For instance, when using Storm, we gain a full control of what is happening inside the workers, we can process a streaming data and still touch base with an old data, if that’s needed. All that in a few lines of code.
Storm is providing us with flexibility on the flow of processing the data. We actually get to decide on the steps that’re needed. Which step will call the next step. How many workers should take place in the process. Etc. Storm is easily scaled and very mature.
It’s important to mention that Storm is stateless and failsafe so nothing is going to happen to our data or flow if one node is going down. Getting up a node is fast and done seamlessly.
To distribute and control the data Storm is using ZooKeeper. And as Storm is using Thrift workers can be written at any language that accepts Thrift. To date, Java, Ruby, Python, Javascript, Perl and a few more are supported.
Storm can go fast as processing over 1M per a second requests of small Tuples of data inside a single node.
And it’s nice to know that to date, companies such as Twitter, Yahoo!, Spotify, Yelp and many other, are using Storm.
Down to business:
In general, Storm is divided into three type of components.
- Spout (the preprocessor)
- Bolt (the worker)
- Topology (the mediator)
The basic data object that’s used inside Storm is called a Tuple.
A Tuple is kind of a list of objects that can contain almost any type of information.
A Spout is the entry point for the streamed information that comes in.
Spout is using Tuples from an external source and emit them into the Bolts thru the Topology.
The flow of execution goes like this:
Spout uses a method nextTuple() to emit a new Tuple into the Bolt.
To complete the process, ack() and fail() are used. ack() is called when Storm detects that a Tuple emitted from the Spout successfully completed.
fail() is called when failed to complete the process.
Processing of the information is done inside Bolts. Typically, Bolts can do filtering, functions, aggregations, joins, inserting data into databases, and actually anything.
Bolt can also forward information to other Bolt for a further processing.
A Bolt is using execute(Tuple) to start processing the data (which is formed as a Tuple). If further processing is needed, the Bolt can emit the processed information via OutputCollector to the next Bolt in line. At the end, a Bolt must call the OutputCollector.ack() so that Storm can know when the Tuple is completed.
Topology arranges the components, Spouts and Bolts, into a graph of a flow of actions.
So in reference to the previously general diagram:
?* each Tuple is a very small piece of information.
A quick reminder, Preprocessors are now named Spouts. Workers are Bolts. The Mediator will become the Topology.
In terms of code, what we got so far is:
public class DataSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
/**
*
* @param conf
* @param context
* @param collector
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
this.collector = collector;
}
/**
*
*/
@Override
public void nextTuple(){
// This is the point where you put your ongoing data in.
collector.emit(new Values(System.nanoTime()));
}
/**
*
* @param id
*/
@Override
public void ack(Object id){
}
/**
*
* @param id
*/
@Override
public void fail(Object id){
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("time"));
}
}
public class DataBolt extends BaseRichBolt{
private OutputCollector collector;
/**
*
* @param conf
* @param context
* @param collector
*/
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector){
this.collector = collector;
}
/**
*
* @param tuple
*/
@Override
public void execute(Tuple tuple){
long time = tuple.getLong(0);
System.out.println(time);
// Do something useful with this data.
// Notify back we’re done with the processing.
collector.ack(tuple);
}
/**
*
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("time"));
}
}
public class StormTopology{
/**
* @param args the command line arguments
*/
public static void main(String[] args){
TopologyBuilder builder = new TopologyBuilder();
// Optionally, using builder.setSpout("input",new DataSpout(),10); will create 10 instances.
builder.setSpout("input",new DataSpout());
// Optionally, using builder.setBolt("time", new DataBolt(),10) will create 10 instances to work in parallel.
builder.setBolt("time", new DataBolt())
.shuffleGrouping("input");
Config conf = new Config();
// A local cluster for testing.
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("topology",conf,builder.createTopology());
// At the end, that should be called to stop Storm.
//cluster.killTopology("topology");
//cluster.shutdown();
}
}
Enjoy Storm!