Apache Storm and big data


A background:

Big data is here for a while now. At the practical level, big data helps us to better understand our clients, it helps us to better analysis information. It helps us to do tasks we couldn’t imagine that are doable, and eventually it might turn the world into a better one. For sure it changed the world.

Many open source projects are there to help us to manipulate big data. In general those can be divided into three categories:

  1. Batch processing frameworks: for instance: Apache Hadoop
  2. Stream processing frameworks: Apache Storm, Apache Samza
  3. Kind of hybrid frameworks: Apache Spark, Apache Flink

Processing frameworks are there to mimic humans thinking and help us in providing tools to overcome a huge amount of data.

Analyzing streaming of information always fascinated me. In real life scenarios data is gotten piece by piece, and optionally also based on previously information, in which we take decisions on. The thing here is that we need to process the incoming facts or data in no time, send alerts (or whatever it’s needed) etc. For that to be effective, a system needs to be able to scale and to be reused for many kinds of information.

Here in this article, we’re going to put the focus on streaming processing and specifically on Storm framework.

Streaming is considered unbounded as the information is kept coming in until the system is stopped.

Schematically the system of processing of a data streaming looks like this:

  1. So basically what we’ve got here is a source of streaming of data that pushes information into a shared component (the Mediator)
  2. The mediator tries to pre-process the piece of information inside the Preprocessor.
  3. That piece of information is now put inside workers that analyze and final process that piece of information.

If we don’t need to scale the system, it’s not that difficult to build an in-house one. But as we all know, this is not usually the case. Storm is offering out of the box a simple to use framework that takes care of all the above components for us.

About Storm:

Storm is a distributed stream processing framework which is written in Clojure. It was originally developed by Nathan Marzand. The project went its way into BackType and eventually went to open source by Twitter after BackType was acquired by Twitter. To date, last known version of Storm is 1.1.1.

Why considering using Storm:

Storm takes care of all aspects of the abstraction layer of managing and moving the information, so we left only to deal with aspects related directly to the business logic.

For instance, when using Storm, we gain a full control of what is happening inside the workers, we can process a streaming data and still touch base with an old data, if that’s needed. All that in a few lines of code.

Storm is providing us with flexibility on the flow of processing the data. We actually get to decide on the steps that’re needed. Which step will call the next step. How many workers should take place in the process. Etc. Storm is easily scaled and very mature.

It’s important to mention that Storm is stateless and failsafe so nothing is going to happen to our data or flow if one node is going down. Getting up a node is fast and done seamlessly.

To distribute and control the data Storm is using ZooKeeper. And as Storm is using Thrift workers can be written at any language that accepts Thrift. To date, Java, Ruby, Python, Javascript, Perl and a few more are supported.

Storm can go fast as processing over 1M per a second requests of small Tuples of data inside a single node.

And it’s nice to know that to date, companies such as Twitter, Yahoo!, Spotify, Yelp and many other, are using Storm.

Down to business:

In general, Storm is divided into three type of components.

  1. Spout (the preprocessor)
  2. Bolt (the worker)
  3. Topology (the mediator)

The basic data object that’s used inside Storm is called a Tuple.

A Tuple is kind of a list of objects that can contain almost any type of information.

 A Spout is the entry point for the streamed information that comes in.

Spout is using Tuples from an external source and emit them into the Bolts thru the Topology.

The flow of execution goes like this:

Spout uses a method nextTuple() to emit a new Tuple into the Bolt.

To complete the process, ack() and fail() are used. ack() is called when Storm detects that a Tuple emitted from the Spout successfully completed.

fail() is called when failed to complete the process.

Processing of the information is done inside Bolts. Typically, Bolts can do filtering, functions, aggregations, joins, inserting data into databases, and actually anything.

Bolt can also forward information to other Bolt for a further processing.

A Bolt is using execute(Tuple) to start processing the data (which is formed as a Tuple). If further processing is needed, the Bolt can emit the processed information via OutputCollector to the next Bolt in line. At the end, a Bolt must call the OutputCollector.ack() so that Storm can know when the Tuple is completed.

 Topology arranges the components, Spouts and Bolts, into a graph of a flow of actions.

So in reference to the previously general diagram:

?* each Tuple is a very small piece of information.

A quick reminder, Preprocessors are now named Spouts. Workers are Bolts. The Mediator will become the Topology.

In terms of code, what we got so far is:

public class DataSpout extends BaseRichSpout{
   
    private SpoutOutputCollector collector;
   
    /**
     *
     * @param conf
     * @param context
     * @param collector
     */
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
       
        this.collector = collector;
    }
   
    /**
     *
     */
    @Override
    public void nextTuple(){
       
        // This is the point where you put your ongoing data in.
        collector.emit(new Values(System.nanoTime()));
    }
 
    /**
     *
     * @param id
     */
    @Override
    public void ack(Object id){
    }
 
    /**
     *
     * @param id
     */
    @Override
    public void fail(Object id){
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer){
       
        declarer.declare(new Fields("time"));
    }
}


public class DataBolt extends BaseRichBolt{
 
    private OutputCollector collector;
   
    /**
     *
     * @param conf
     * @param context
     * @param collector
     */
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector){
       
        this.collector = collector;
    }
 
    /**
     *
     * @param tuple
     */
    @Override
    public void execute(Tuple tuple){
       
        long time = tuple.getLong(0);
        System.out.println(time);
        // Do something useful with this data.
        // Notify back we’re done with the processing.
        collector.ack(tuple);
    }
 
    /**
     *
     * @param declarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer){
       
        declarer.declare(new Fields("time"));
    }  
}


public class StormTopology{
 
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args){
       
        TopologyBuilder builder = new TopologyBuilder();
        // Optionally, using builder.setSpout("input",new DataSpout(),10); will create 10 instances.
        builder.setSpout("input",new DataSpout());
        // Optionally, using builder.setBolt("time", new DataBolt(),10) will create 10 instances to work in parallel.
        builder.setBolt("time", new DataBolt())
               .shuffleGrouping("input");
 
        Config conf = new Config();
        // A local cluster for testing.
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("topology",conf,builder.createTopology());
       
        // At the end, that should be called to stop Storm.
        //cluster.killTopology("topology");
        //cluster.shutdown();
    }
}

Enjoy Storm!

要查看或添加评论,请登录

Eran Shaham的更多文章

  • Microservices Chatbot and Coronavirus

    Microservices Chatbot and Coronavirus

    A few weeks ago I shared a short post about a new initiative of mine to have a fun bot to make life much easier in…

  • Docker image build vs. jib

    Docker image build vs. jib

    Jib is an open-source Java containerizer originally coming from Google. Jib allows to build Docker images from Java…

  • A JSON schema validator

    A JSON schema validator

    A simple JSON schema validator for the Vert.x world.

    2 条评论
  • vertx-lucene-classification

    vertx-lucene-classification

    Lucene is here for a long time, ML was added to Lucene for a few releases now, yet some aspects were left out. ML can…

  • Kafka vs. Pulsar

    Kafka vs. Pulsar

    Kafka is here for a long time. Perhaps too long.

    1 条评论
  • UMLet- an open source UML tool

    UMLet- an open source UML tool

    Some aspects of my day job work are drawing many diagrams. That's part of an architect role to create design documents…

    2 条评论
  • Revive- a Single Page Application framework

    Revive- a Single Page Application framework

    I'm uploading a short presentation about a new open sourced Revive which I've made public. Revive is a new light open…

  • A few words on Docker and Kubernetes

    A few words on Docker and Kubernetes

    We all know Docker Engine; it’s a container runtime. We can run “docker run” on a host whether it’s a server or a VM…

    2 条评论
  • A poor man Dependency Injection

    A poor man Dependency Injection

    Dependency Injection (DI) has been around for a while now. A typical use case would be, for instance, the same piece of…

  • Cassandra VS. MongoDB

    Cassandra VS. MongoDB

    Cassandra and MongoDB became to be the two of the most popular NOSQL databases that are running around in the last few…

    4 条评论

社区洞察

其他会员也浏览了