About Apache Spark, Lightning-fast cluster computing (Big Data)
Abhishek Choudhary
Data Infrastructure Engineering in RWE/RWD | Healthtech DhanvantriAI
Apache Spark is a fast and general engine for large-scale data processing.
Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.
If still you are not impressed with Apache Spark , let me tell you one more thing Spark officially sets a new record in large-scale sorting, check https://sortbenchmark.org/
Well I am not here to compare Spark vs Hadoop, but still the figure is interesting
Now I believe if you are still reading it , then I am here trying to share few specific concepts in Apache spark -
Apache Spark is based on RDD and its entire framework is based on Directed Acyclic Graph [DAG], so an important datastructure to hold the data and same can be used to iterate & operate data efficiently.
Preprocessing of data like reading data, filter and merge the data, these all can be done using just a single line, well aren't you missing long lines of algorithm to do the same in hadoop :-) .
In Spark Standalone mode, number of executors are always static , so if you started with 5, it'll remain 5 but this can can be saved using YARN, which allows you for Dynamic allocation.
All Spark Master is doing is deciding and scheduling where each executor will run and Worker JVM will only start the worker JVM , so they actually dont need much memory, worker restarts the crashed JVM as well
In Spark if among nodes, if 1 is more efficient , that particular node can be assigned with more potential to work by using SPARK_WORKER_CORES in spark-env.sh file
SPARK_WORKER_CORES - How many core a worker JVM can give out to its underlying executor
Use Zookeeper to make spark Highly Available
Main standalone Mode settings-
Apps will always work in FIFO mode
spark.cores.max : max amount of CPU cores to request for the application from across the cluster
(Currently you cannot define number of executors in StandAlone Model but can define how many cores)
spark.executors.memory: How much memory you want to allocate to each executor , if you want your particular application to use all the memory available , just use this command. Sometimes if you define this low , even it will show your available memory as 4GB , it will use only 1 GB , because of avoiding this particular setting
Local Mode actually doesn’t need any Worker JVM
YARN Mode-
Bonus Point YARN Workflow
Once a client submit the application < resource Manager is being informed first and Resource Manager will look for any available Node Manager which can handle the client request but this depends on particular Application’s Application Master needs.Then Application master actually doesn’t do anything , its like similar to Job Scheduling of MR. App master immediately contacts Resource Manager to let it know about the required containers required by Application.Then Resource Manager based on containers demand, passes Node Manager address which have containers and there Keys.Then App Master contacts particular Nodemanager with container key to interact and once that happens ,and once container agreed to do the job, it registers back with the App master and immediately App master directly interacts with client for Monitoring Matrix. Make it clear , now Resource Manager doesn’t know anything about client interaction. Its all App master who handles that. So even if Resource Manager crashes , your application will still run but due to crash your running application can’t negotiate for any resource management, thats all.
Resource manager has a scheduler which decides where applications will be run and scheduled.Resource Manager as well have a Application MasterS , which handles the crashing or interruption of any running container application.
Running Spark in YARN-
Client Mode -
The Container , which constitutes Spark Executor(RDD) directly communicates with Driver i.e. with Client.Like n standalone mode, where worker starts the executor , in this mode NodeManager starts the executor but concept of actual YARN remains same like App master and other job.
Resource Manager scheduler in YARN just triggers the job in JVM Executor.
There is a internal scheduler in spark which decides the Driver. Which handles how RDD will be handled.
Cluster Mode -
When you don’t want your client i.e. Driver to get shut down if you turn off your computer. Here client submits the application including the driver and Driver will run within the application master.
Some yarn Settings-
- —num-executor : How many executors will be allocated
- —executor-memory: RAM for each executor
- —executor-cores: CPU cores for each executor
Dynamic Allocation is something which can be submitted in YARN during runtime of Spark application execution.
How dynamic Allocation Works -
- spark.dynamicAllocation.enabled = True
- spark.dynamicAllocation.minExecutors - 1
- spark.dynamicAllocation.maxExecutors - N
When job first started and dynamic Allocation is enabled , it’ll check for a flag spark.dynamicAllocation.sustainedSchedulerBacklogTimeout, when this timeout is hit , it will check is there any backlog of tasks are waiting to be run, if its there then it will increase the number of executors < maxExecutor to some value (not very high). The above timeout will run only Once.Then it will again run and then it will check for other timeout spark.dynamicAllocation..schedulerBacklogTimeout and if still backlogs are there , it’ll increase the number of executors to Exponential so it may increase from 10 executors to direct 70.
After it hits the max, t may be required to release the executors, it will check for spark.dynamicAllocation.executorIdleTimeout and once that is triggered, then if no task is being scheduled fir some amount of time, it will turn down the executors from max.
So Now Process Scheduling can be done-
Caching:
If you cache something using cache() method, its internally persists method and if your memory goes beyond the size of RDD partitions, then those partitions will be dropped simply not even persists to disk.You cannot priorities RDD to be cache, its actually LRU cache (Least Recently used) cached.
.persists(MEMORY_AND_DISK) it will save in memory , and if it doesn’t fit in memory, it will be saved to DISK, so oldest partition will move down to disk not the new one. it will always serialised when it will be down to disk.
LINEAGE:
Since spark is based on DAG , so we can follow a chain from child to parent to fetch any value, so its like tree traversal.
narrow transformation: where each partition of the parent RDD is used by at most one partition of the child RDDSo you can do map->filter with just using narrow dependencies.So if you consider to do map and filter on a set of data , so entire process of map to filter will happen inside only one Thread and i.e. known as Pipelining .
wide transformation: it needed shuffle , like groupbykey , we need to collect all key with value 15 into a single partition. It is where multiple child partitions may depend on it.So if we misses any RDD because of in-memory full (LRU) , so to recreate that particular RDD , it will have to go through entire previous block of RDDs which can be very expensive sometimes.
Building tooling that streamline the frontend development process
9 年Great Stuff