General Architecture

DB1

Before We start

It’s totally theoritical I have no practical experience in this topic. I want that(practical experience) that’s why I am writing this blog 😉 . It’s copied mainly from documentation and this picture is from a book.

Let’s start

Where we should/’nt use Apache spark ?

Spark wasn’t made with online transaction processing ( OLTP ) applications in mind (fast, numerous, atomic transactions). It’s better suited for online analytical processing ( OLAP ): batch jobs and data mining.

How spark fulfill shortcomings of hadoop ?

Hadoop bas one basic shortcoming (not totally) that for distribution of data it uses reading and writing from hard disk, for which max speed can be 160 MB/s and if you use SSD it will be 300 MB/s where as for DRAM it is 20420 MB/s, So by desing it is a lot faster. Spark’s core concept is an in-memory execution model that enables caching job data in memory instead of fetching it from disk every time, as MapReduce does.

Spark Core.

RDD

It represents a collection of elements that is immutable, resilient and distributed. They don’t handle fault tolerance like hadoop by replicating data but by logging the transformations used to build a dataset, If a node fails, only a subset of the dataset that resided on the failed node needs to be recomputed. RDD is a really abstract concept, it looks quite simple from top but it’s not there is lot of things going beneath the hood.

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
  • One important parameter for parallel collections is the number of partitions to cut the dataset into
  • Spark tries to set the number of partitions automatically based on your cluster
  • you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10))
Spark Context

Spark context sets up internal services and establishes a connection to a Spark execution environment.A Spark context is essentially a client of Spark’s execution environment and acts as the master of your Spark application. DB2 Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines.

val distFile = sc.textFile("data.txt")
  • The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.
  • RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects.
    RDD operation
    val lines = sc.textFile("data.txt")
    val lineLengths = lines.map(s => s.length)
    val totalLength = lineLengths.reduce((a, b) => a + b)
    
  • This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation
  • Again, lineLengths is not immediately computed, due to laziness.
  • Finally, we run reduce, which is an action. if we also wanted to use lineLengths again later, we could add:linelengths.persist().

There are some other function like flatmap, union, groupByKey etc. Visit-All

Spark Sql

Spark SQL provides functions for manipulating large sets of distributed, structured data using an SQL subset supported by Spark and Hive SQL (Hive QL )..Operations on DataFrame s and DataSet s at some point translate to operations on RDD s and execute as ordinary Spark jobs. Spark SQL provides a query optimization framework called Catalyst that can be extended by custom optimization rules.

Spark Streaming

Spark Streaming is a framework for ingesting real-time streaming data from various sources. The supported streaming sources include HDFS , Kafka, Flume, Twitter,Zero MQ , and custom ones.

When to use kafka streaming and When to use spark streaming ?

Kafka Streams is still best used in a “Kafka > Kafka” context, while Spark Streaming could be used for a “Kafka > Database” or “Kafka > Data science model” type of context. Ref

Spark MLlib

Supported algorithms include logistic regression, naïve Bayes classification, support vector machines ( SVM s), decision trees, random forests, linear regression, and k-means clustering.

Spark Graphx

Graphs are data structures comprising vertices and the edges connecting them. GraphX provides functions for building graphs, represented as graph RDD s: EdgeRDD and VertexRDD . GraphX contains implementations of the most important algorithms of graph theory, such as page rank, connected components, shortest paths, SVD++ , and others. It also provides the Pregel message-passing API , the same API for large-scale graph processing implemented by Apache Giraph.

How does DAG works in Spark ?

At high level, when any action is called on the RDD, Spark creates the DAG and submits it to the DAG scheduler.

  • The DAG scheduler divides operators into stages of tasks. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together. For e.g. Many map operators can be scheduled in a single stage. The final result of a DAG scheduler is a set of stages.
  • The Stages are passed on to the Task Scheduler.The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies of the stages.
  • The Worker executes the tasks on the Slave. At high level, there are two transformations that can be applied onto the RDDs, namely narrow transformation and wide transformation. Wide transformations basically result in stage boundaries.

Narrow transformation - doesn’t require the data to be shuffled across the partitions. for example, Map, filter etc.. wide transformation- requires the data to be shuffled for example, reduceByKey etc.. This sequence of commands implicitly defines a DAG of RDD objects (RDD lineage) that will be used later when an action is called. Each RDD maintains a pointer to one or more parents along with the metadata about what type of relationship it has with the parent. For example, when we call val b = a.map() on a RDD, the RDD b keeps a reference to its parent a, that’s a lineage. To display the lineage of an RDD, Spark provides a debug method toDebugString().Ref

Lazy Evaluation

Transformations are lazy in nature meaning when we call some operation in RDD, it does not execute immediately. Spark maintains the record of which operation is being called(Through DAG). We can think Spark RDD as the data, that we built up through transformation. Since transformations are lazy in nature, so we can execute operation any time by calling an action on data. Hence, in lazy evaluation data is not loaded until it is necessary.Ref

Commands, Configuration, and other stuff

I will try to restrict the domain to spark rather than going in MLlib, H20 or Graphx.

Shuffle.

  • The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.
  • Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.
  • Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey and aggregateByKey create these structures on the map side, and ‘ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.

    Caching

  • You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes.
  • In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). There are various storage levels like MEMORY_ONLY, MEMORY_AND_DISK.

    Broadcast Variables.

    Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

    val broadcastVar = sc.broadcast(Array(1, 2, 3))
    

Accumulators.

Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. A numeric accumulator can be created by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() to accumulate values of type Long or Double, respectively

val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

Spark SQL

The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder(): import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.json("examples/src/main/resources/people.json")
df.show()
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()

There are lot of other function. Reference

case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

There is a lot of other functionality for hive, parquet etc.Ref

Spark Streaming

  • Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.
  • Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.

DB3

  • Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

DB3

  • Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.
  • DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))

A Duration object(Seconds(1)), which specifies time intervals at which Spark Streaming should split the input stream data and create mini-batch RDD s. Instead of Seconds , you can use the Milliseconds and Minutes objects to specify duration.

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
  • The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local[*]” string to run in local mode.
  • In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there.
  • However, for local testing and unit tests, you can pass “local[*]” to run Spark Streaming in-process (detects the number of cores in the local system). Note that this internally creates a SparkContext (starting point of all Spark functionality) which can be accessed as ssc.sparkContext.
    DStreams

    Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs. Any operation applied on a DStream translates to operations on the underlying RDDs. The flatMap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream. Db5

    Input Dstreams and Receivers.

    Input DStreams are DStreams representing the stream of input data received from streaming sources Every input DStream (except file stream, discussed later in this section) is associated with a Receiver object which receives the data from a source and stores it in Spark’s memory for processing.

    Tips and Pitfalls
  • If you want to receive multiple streams of data in parallel in your streaming application, you can create multiple input DStreams. This will create multiple receivers which will simultaneously receive multiple data streams. But note that a Spark worker/executor is a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Ref
  • When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run. For more ref
  • The number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.

There are two types of sources :

Basic Sources

  • Streaming from file.
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
streamingContext.textFileStream(dataDirectory)

Spark Streaming will monitor the directory dataDirectory and process any files created in that directory.All files must be in the same data format.A file is considered part of a time period based on its modification time, not its creation time. Once processed, changes to a file within the current window will not cause the file to be reread. That is: updates are ignored.“Full” Filesystems such as HDFS tend to set the modification time on their files as soon as the output stream is created. When a file is opened, even before data has been completely written, it may be included in the DStream - after which updates to the file within the same window will be ignored. That is: changes may be missed, and data omitted from the stream.

Streams based on Custom Receivers

DStreams can be created with data streams received through custom receivers. Ref

Queue of RDDs as a Stream

We can create a DStream based on a queue of RDDs, using streamingContext.queueStream(queueOfRDDs). Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.Ref

Advance Sources

This category of sources require interfacing with external non-Spark libraries, some of them with complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts of dependencies, the functionality to create DStreams from these sources has been moved to separate libraries that can be linked to explicitly when necessary. For integrating with kafka see this. Ref

Custom Sources

Input DStreams can also be created out of custom data sources. All you have to do is implement a user-defined receiver (see next section to understand what that is) that can receive data from the custom sources and push it into Spark. Ref

Types of receiver

  • Reliable Receiver - A reliable receiver correctly sends acknowledgment to a reliable source when the data has been received and stored in Spark with replication.
  • Unreliable Receiver - An unreliable receiver does not send acknowledgment to a source. This can be used for sources that do not support acknowledgment, or even for reliable sources when one does not want or need to go into the complexity of acknowledgment.

Transform operation

The functionality of joining every batch in a data stream with another dataset is not directly exposed in the DStream API. However, you can easily use transform to do this. This enables very powerful possibilities. For example, one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

Window operation

Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data. Db89

  • window length - The duration of the window (3 in the figure).
  • sliding interval - The interval at which the window operation is performed (2 in the figure).
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

Reference for function for sliding window

Thats all folks

There is a lot of other intricacies dealt within rthe documentation like how to deal with broadcast variable, MLLIB,Fault tolerance, Setting the right batch interval, Data serialization ,Connection, Performace tuning etc. I have left those parts. I will deal with these later.