项目作者: mtumilowicz
项目描述 :
Introduction to Spark Batch processing.
高级语言: Scala
项目地址: git://github.com/mtumilowicz/big-data-scala-spark-batch-workshop.git


big-data-scala-spark-batch-workshop
preface
- goals of this workshop:
- introduction to spark and its architecture of spark
- peek under the hood: data representation and optimizations
- practicing rudimentary use-cases
spark
- unified engine designed for large-scale distributed data processing
- provides in-memory storage for intermediate computations
- faster than Hadoop MapReduce
- libraries
- machine learning (MLlib)
- SQL for interactive queries (Spark SQL)
- stream processing (Structured Streaming) based on micro-batch processing engine
- graph processing (GraphX)
- typical Spark scenario
- Ingestion of raw data
- DQ: data quality
- example: ensure that all birth dates are in the past
- example: obfuscate Social Security numbers (SSNs)
- Transformation
- example: join with other datasets, perform aggregations
- Publication
- example: load the data into a data warehouse, save in a file on S3
components overview
- components and architecture

- SparkSession
- a single unified entry point to all of Spark’s functionality
- use cases
- defining DataFrames and Datasets
- reading from data sources
- writing to data lakes
- accessing catalog metadata
- issuing Spark SQL queries
- there is a unique SparkSession for your application, no matter how many nodes it runs
- Spark driver
- process running the
main()
function of the application - instantiates SparkSession
- communicates with the cluster manager
- requests resources (CPU, memory, etc.) from the cluster manager for Spark’s executors (JVMs)
- plans and coordinates the execution: transforms operations into directed acyclic graph (DAG)
and schedules them
- keeps track of available resources to execute tasks
- schedules tasks to run “close” to the data where possible (data locality)
- distributes operations execution across the Spark executors
- once the resources are allocated, it communicates directly with the executors
- resides on master node
- Spark master & Cluster manager
- Cluster Manager can be separate from the Master process
- Spark master requests resources and makes resources available to the Driver
- monitors the status and health of resources
- is not involved in the execution of the application and the coordination of its tasks and stages
- Cluster manager is a process responsible for monitoring the Worker nodes and reserving resources
on these nodes upon request by the Master
- Master then makes these cluster resources available to the Driver in the form of Executors
- Worker node
- any node that can run application code in the cluster
- may not share filesystems with one another
- Spark executor
- is a process that executes tasks on the workers often in parallel
- communicates with the driver program
- workers hold many executors, for many application
- Job
- is a sequence of Stages, triggered by an action (ex.
.count()
)
- Stage
- a sequence of Tasks that can all be run in parallel without a shuffle
- example:
.read.map.filter
- Task
- unit of work that will be sent to one executor
- is a single operation (ex.
.map
or .filter
) applied to a single Partition - is executed as a single thread in an Executor
- example: 2 Partitions, operation:
filter()
=> 2 Tasks, one for each Partition
- Shuffle
- operation where data is re-partitioned across a cluster
- by having all the data needed to calculate on a single node, we reduce the
overhead on the shuffle (the need for serialization and network traffic)
- costly operation - a lot of data travels via the network
- example: join
- Partition
- enable parallelization: data is split into Partitions so that each Executor can operate
on a single part
- once the user has submitted his job into the cluster, each partition is sent to a
specific executor for further processing - the more partitions the more work is distributed to executors, with a smaller number
of partitions the work will be done in larger pieces
- every machine in a spark cluster contains one or more partitions
- single partition do not span multiple machines
- good starting point: number of partitions equal to the number of cores
- example
- 4 cores and 5 partitions
- processing of each partition takes 5 minutes
- total time: 10 minutes (4 in parallel in 5 minutes, then 1 in 5 minutes)
- deployment on Kubernetes
- Spark driver: Runs in a Kubernetes pod
- Spark executor: Each worker runs within its own pod
- Cluster manager: Kubernetes Master
data representation
- RDD (Resilient Distributed Datasets)
- fundamental data structure of Spark
- immutable distributed collection of data
- data itself is in partitions
- Dataset and DataFrame
- schemas
- defines the column names and associated data types for a DataFrame
- inferring a schema (plus inferring data types)
- requires separate job to read a large portion of file to infer the schema
- can be expensive and time-consuming
- no errors detection if data doesn’t match the schema
- defining a schema
- programmatically:
val schema = StructType(Array(StructField("author", StringType, false)
- DDL:
val schema = "author STRING, title STRING, pages INT"
data import / export
- typical use case: read from on-premise database and push to cloud storage (Amazon S3)
- data source types
- file (CSV, JSON, XML, Avro, Parquet, and ORC, etc)
- relational and nonrelational database
- other data provider: (REST) service, etc
- DataFrameReader
- core construct for reading data into a DataFrame
- supports many formats such as JSON, CSV, Parquet, Text, Avro, ORC, etc.
- DataFrameWriter
- it saves data to a data source
- after the files have been successfully exported, Spark will add a _SUCCESS file to the
directory
aggregations
DataFrame API
Dataset<Row> apiDf = df
.groupBy(col("firstName"), col("lastName"), col("state"))
.agg(
sum("quantity"),
sum("revenue"),
avg("revenue"));
is equivalent to
df.createOrReplaceTempView("orders");
String sqlStatement = "SELECT " +
" firstName, " +
" lastName, " +
" state, " +
" SUM(quantity), " +
" SUM(revenue), " +
" AVG(revenue) " +
" FROM orders " +
" GROUP BY firstName, lastName, state";
Dataset<Row> sqlDf = spark.sql(sqlStatement);
joins
- Broadcast Hash Join (map-side-only join)
- used when joining small with large
- small = fitting in the driver’s and executor’s memory
- smaller data set is broadcasted by the driver to all Spark executors
- by default if the smaller data set is less than 10 MB
- does not involve any shuffle of the data set
- Shuffle Sort Merge Join
- used when joining two large data sets
- default join algorithm
- pre-requirement: partitions have to be co-located
- all rows having the same value for the join key should be stored in the same partition
- otherwise, there will be shuffle operations to co-locate the data
- has two phases
- sort phase
- sorts each data set by its desired join key
- merge phase
- iterates over each key in the row from each data set and merges
the rows if the two keys matchsql
- tables
ids.write
.option("path", "/tmp/five_ids")
.saveAsTable("five_ids")
- each table is associated with its relevant metadata (the schema, partitions, physical location
where the actual data resides, etc.)
- metadata is stored in a central metastore
- by default: Apache Hive metastore
- Catalog is the interface for managing a metastore
- spark.catalog.listDatabases()
- spark.catalog.listTables()
- spark.catalog.listColumns(“us_delay_flights_tbl”)
- location: /user/hive/warehouse
- two types of tables
- managed
- Spark manages metadata and the data
- example: local filesystem, HDFS, Amazon S3
- note that SQL command such as DROP TABLE deletes both the metadata and the data
- with an unmanaged table, the same command will delete only the metadata
- unmanaged
- Spark only manages the metadata
- example: Cassandra
views
optimizations
- at the core of the Spark SQL engine are the Catalyst optimizer and Project Tungsten
- note that a lot of the issues can come from key skewing: the data is so fragmented among
partitions that a join operation becomes very long
tungsten
- focuses on enhancing three key areas
- memory management and binary processing
- manage memory explicitly
- off-heap binary memory management
- eliminate the overhead of JVM object model and garbage collection
- Java objects have large overheads — header info, hashcode, Unicode info, etc.
- instead use binary in-memory data representation aka Tungsten row format
- cache-aware computation
- algorithms and data structures to exploit memory hierarchy (L1, L2, L3)
- cache-aware computations with cache-aware layout for high cache hit rates
- code generation
- exploit modern compilers and CPUs
- generates JVM bytecode to access Tungsten-managed memory structures that gives a very fast access
- uses the Janino compiler - super-small, super-fast Java compiler
catalyst
- similar concept to RDBMS query optimizer
- converts computational query and converts it into an execution plan

- Phase 1: Analysis
- Spark SQL engine generates AST tree for the SQL or DataFrame query
- Phase 2: Logical optimization
- catalyst constructs a set of multiple plans and using its cost-based
optimizer assign costs to each plan
- Phase 3: Physical planning
- Spark SQL generates an optimal physical plan for the selected logical plan
- Phase 4: Code generation
- generating efficient Java bytecode to run on each machine
caching
- Spark maintains a history of all transformations you may apply to a DataFrame or RDD
- this enables Spark to be fault tolerant
- however Spark programs take a huge performance hit when fault occurs
as the entire set of transformations to RDD have to be recomputed
- if you reuse a dataframe for different analyses, it is a good idea to cache it
- steps are executed each time you run an analytics pipeline
- example: DataFrames commonly used during iterative machine learning training
- caching vs persistence
- persist() provides more control over how and where data is stored
- DISK_ONLY, OFF_HEAP, etc.
- both are lazy transformations
- immediately after calling the function nothing happens with the data
but the query plan is updated by the Cache Manager by adding a
new operator — InMemoryRelation
- caching vs checkpointing
- whatever is the case of failure, re-calculating the lost partitions is the most expensive operation
- best strategy is to start from some checkpoint in case of failure
- checkpoint() method will truncate the logical plan (DAG) and save the content of the dataframe to disk
- re-computations need not be done all the way from beginning, instead the checkpoint is used
as the beginning of re-calculation
- cache will be cleaned when the session ends
- checkpoints will stay on disk
- example where checkpointing would be preferred over caching
- DataFrame of taxes for a previous year - they are unlikely to change once calculated so
it would be much better to checkpoint and save them forever so that they can be consistently
reused in the future
user-defined functions
val cubed = (s: Long) => { s * s * s } // define function
spark.udf.register("cubed", cubed) // register UDF
spark.sql("SELECT id, cubed(id) AS id_cubed FROM ...") // use
- excellent choice for performing data quality rules
- UDF’s internals are not visible to Catalyst
- UDF is treated as a black box for the optimizer
- Spark won’t be able to analyze the context where the UDF is called
- if you make dataframe API calls before or after, Catalyst can’t optimize
the full transformation - should be at the beginning or the end of transformations