263-3010-00: Big Data
Section 10
Generic Dataflow Management
Swiss Federal Institute of Technology Zurich
Eidgenössische Technische Hochschule Zürich
Last Edit Date: 11/12/2024
Disclaimer and Term of Use:
We do not guarantee the accuracy and completeness of the summary content. Some of the course material may not be included, and some of the content in the summary may not be correct. You should use this file properly and legally. We are not responsible for any results from using this file
This personal note is adapted from Professor Ghislain Fourny. Please contact us to delete this file if you think your rights have been violated.
This work is licensed under a Creative Commons Attribution 4.0 International License.
MapReduce is very simple and generic, but many more complex uses involve not just one, but a sequence of several MapReduce jobs. Furthermore, the MapReduce API is low-level, and most users need higher level interfaces, either in the form of APIs or query languages.
This is why, after MapReduce, another generation of distributed processing technologies were invented. The most popular one is the open source Apache Spark.
A more general dataflow model¶
MapReduce consists of a map phase, followed by shu ing, followed by a reduce phase. In fact, the map phase and the reduce phase are not so different: both involve the computation of tasks in parallel on slots.
One could exaggerate a bit by considering that MapReduce is in fact a map phase, then a shuffle, then a map phase (the reduce phase on the already shuffled data) again:
Or, if we abstract away the partitions and consider the input, intermediate input and output as blackboxes that these phases act on:
Resilient distributed datasets¶
The rst idea behind generic data ow processing is to allow the dataflow to be arranged in any distributed acyclic graph (DAG), like so:
All the rectangular nodes in the above graph correspond to intermediate data. They are called resilient distributed datasets, or in short, RDDs. Resilient means that they remain in memory or on disk on a best effort basis, and can be recomputed if need be. Distributed means that, just like the collections of key-value pairs in MapReduce, they are partitioned and spread over multiple machines.
A major di erence with MapReduce, though, is that RDDs need not be collections of pairs. In fact, RDDs can be (ordered) collections of just about anything: strings, dates, integers, objects, arrays, arbitrary JSON values, XML nodes, etc. The only constraint is that the values within the same RDD share the same static type, which does not exclude the use of polymorphism.
Since a key-value pair is a particular example of possible value, RDDs are a generalization of the MapReduce model for input, intermediate input and output.
The RDD lifecycle¶
Creation¶
RDDsundergoalifecycle. First, they get created. RDDs can be created by reading a dataset from the local disk, or from cloud storage, or from a distributed le system, or from a database source, or directly on the fly from a list of values residing in the memory of the client using Apache Spark:
Transformation¶
Then, RDDs can be transformed into other RDDs.
Mapping or reducing, in this model, become two very specific cases of transformations. However, Spark allows for many, many more kinds of transformations. This also includes transformations with several RDDs as input (think of joins or unions in the relational algebra, for example).
Action¶
RDDs can also undergo a final action leading to making an output persistent. This can be by outputting the contents of an RDD to the local disk, to cloud storage, to a distributed le system, to a database system, or directly to the screen of the user (assuming there is not intractably much data to display).
Lazy evaluation¶
Another important aspect of the RDD lifecycle is that the evaluation is lazy: in fact, creations and transformations on their own do nothing. It is only with an action that the entire computation pipeline is put into motion, leading to the computation of all the necessary intermediate RDDsall the way down to the final output corresponding to the action.
A simple example¶
Let us give a very simple example of RDD lifecycle. One easy way to start with Spark is by using a shell. Let us use a scala shell.
First, we can create an RDD with a simple list of two strings, and assign it to a variable, with the parallelize creation function, like so:
val rdd1 = sc.parallelize(
List("Hello, World!", "Hello, there!")
)
Note, for completeness, that it would also be possible to read from an input dataset (let us assume a sharded dataset in some directory in HDFS con gured as the default le system) with
val rdd1 = sc.textFile("/user/hadoop/directory/data-*.txt")
Note that, at this point, nothing actually gets set into motion. But, for pedagogical purpose, this is what will be generated when the computation starts:
Next, we can transform the RDD by tokenizing the strings based on spaces with a atMap transformation. We assign the resulting RDD to another variable.
val rdd2 = rdd1.flatMap(
value => value.split(" ")
)
Note that, at this point, still nothing actually gets set into motion, but rdd2 would correspond do:
Finally, we invoke the action countByValue(), which will return a (local) map structure (object, dict...) associating each value with a count. This is when the computations are actually triggered:
rdd2.countByValue()
The result will be displayed on the screen of the shell:
Transformations¶
Transformations, in Spark, are like a large restaurant menu to pick from. We now go through the most important transformations.
Unary transformations¶
Let us start with unary transformations, i.e., those that take a single RDD as their input.
The lter transformation, provided a predicate function taking a value and returning a Boolean, returns the subset of its input that satisfies the predicate, preserving the relative order.
The map transformation, provided a function taking a value and returning another value, returns the list of values obtained by applying this function to each value in the input.
The atMap transformation, provided a function taking a value and returning zero, one or more values, returns the list of values obtained by applying this function to each value in the input, attening the obtained values (i.e., the information on which values came from the same input value is lost). The flatMap transformation, in fact, corresponds to the MapReduce map phase (in the special case that the RDDs involved are RDDs of key-value pairs).
The distinct transformation eliminates duplicates in the input. It is either possible to supply a comparison function, or to make sure that the class (type) of the input values implements the appropriate Comparable interface.
The sample transformation samples the input to a smaller list. It is possible to specify the sampling percentage.
Binary transformations¶
There are also transformations that take two RDDs as input.
Three of them correspond to set operations in the relational algebra: for example, taking the union of two RDDs (with the same value type).
It is also possible to take the intersection.
It is also possible to take the subtraction, i.e., only keep the elements from the left RDD that do not appear in the right RDD.
Pair transformations¶
Since RDDs can have values of any type, they can also in particular be pairs. Spark has transformations speci cally tailored for RDDs of key value pairs. Note that all other transformations also work on RDDs of key-value pairs, as they accept any values.
The key transformation returns a new RDD with only the keys:
The values transformation returns a new RDD with only the values:
The reduceByKey transformation, given a (normally associative and commutative) binary operator under which the input data type is closed, invokes and chains this operator on all values of the input RDD that share the same key. For example, if values $v_1, v_2, ..., v_n$ are associated with key $k$ each, in their own input pair, then the pair $(k (v_1 + v_2 + ... + v_n))$ is output assuming $+$ is the operator.
It is normally also required to tell Spark what the neutral element is (e.g., 0 for addition, 1 for multiplication, etc) so that it can use it for empty partitions during the parallel computation.
The reduceByKey transformation corresponds to the reduce phase of MapReduce.
The groupByKey transformation groups all key-value pairs by key, and outputs a single key-value for each key, where the value is an array (or list) of all the values that were associated with this key in the input. For example, if the input contains pairs $(k, a)$ and $(k, b)$ and there is no other pair with key k in the input, then the output value corresponding to key $k$ will be $(k, [a, b])$.
The sortByKey transformation, given the speci cation of an order or comparison operator on the key type, outputs the same pairs as in the input RDD, but reordered by key.
The mapValues transformation is similar to the map transformation, except that the map function is only applied to the value (and the key is kept).
The join transformation works on two input RDDs or key-value pairs. It matches the pairs on both sides that have the same key and outputs, for each match, an output pair with that shared key and a tuple with the two values from each side. If there are multiple values with the same key on any side (or both), then all possible combinations are output.
The subtractByKey transformation also takes two input RDDs of key-value pairs. It outputs all pairs of the left, except those who have a key present on the right.
Actions¶
Let us now go through a few actions.
Gathering output locally¶
The collect action downloads all values of an RDD on the client machine and outputs them as a (local) list. It is important to only call this action on an RDD that is known to be small enough (e.g., after ltering) to avoid a memory over ow.
The count action computes (in parallel) the total number of values in the input RDD. This one is safe even for RDDs with billions of values, as it returns a simple integer to the client.
The countByValue action computes (efficiently, in parallel) the total number of occurrence of each distinct value in the input RDD. It is important to only call this action on an RDD that is known to have a small enough number of distinct values (e.g., after ltering) to avoid a memory overflow.
The take action returns, as a local list, the first n (where n can be chosen) values in the input RDD.
The top action returns, as a local list, the last n (where n can be chosen) values from the input RDD.
The takeSample action returns, as local list, n randomly picked values from the input RDD.
The reduce action, given a (normally associative and commutative) binary operator under which the input data type is closed, invokes this operator on all values of the input RDD ($v_1 + v_2 + ... + v_n$ if $+$ is the operator) and outputs the resulting value. It is normally also required to tell Spark what the neutral element is (e.g., 0 for addition, 1 for multiplication, etc) so that it can use it for empty subsets.
Writing to sharded datasets¶
There is also an action called saveAsTextFile that can save the entire RDD to a sharded dataset on Cloud storage (S3, Azure blob storage) or a distributed le system (HDFS). It is very natural in Spark, like in MapReduce, to output sharded datasets, because this means the slots can write their own shards independently and without interfering or synchronizing with each other.
Binary outputs can be saved with saveAsObjectFile. Note that Spark, at least in its RDD API, is not aware of any particular format or syntax, i.e., it is up to the user to parse and serialize values to the appropriate text or bytes, typically with code in the host programming language (Python, Java, Scala ...).
Actions on Pair RDDs¶
There are actions speci cally working on RDDs of key-value pairs. Note that all other actions will also work on RDDs of key-value pairs, as pairs are values, too.
The countByKey action outputs, locally, each key together with the number of values in the input that are associated with this key. This results in a local list of key-value pairs. This requires care if there is a large number of distinct keys.
The lookup action outputs, locally, the value or values associated with a specific key.
Physical architecture¶
Let us now take a look at the physical architecture of Spark.
Narrow-dependency transformations¶
There are two kinds of transformations: narrow-dependency transformations and wide-dependency transformations.
In a narrow-dependency transformation, the computation of each output value involves a single input value.
Thus, a narrow-dependency transformation is comparable to the map phase of MapReduce, and is easily parallelizable: if the input is partitioned and spread across nodes in the cluster, then the output can also be computed in partitions with not need to communicate between the nodes.
Examples of narrow-dependency transformation include map, flatMap, filter, etc.
By default, if the transformation is applied to an RDD freshly created from reading a dataset from HDFS, each partition will correspond to an HDFS block. Thus, the computation of the narrow-dependency transformation mostly involves local reads by short-circuiting HDFS.
One can look at these transformations on the logical layer (as a transformation taking one RDD as input and returning one RDD, which is one edge in the DAG of RDDs), or on the physical layer, as the parallel computation on separate partitions.
In fact, the way this works is quite similar to MapReduce: the sequential calls of the transformation function (whichever it is: a filter predicate, a map function, a atMap function, etc) on each input value within a single partition is called a task. And just like MapReduce, the tasks are assigned to slots. These slots correspond to cores within YARN containers. YARN containers used by Spark are called executors. This is an example if we assume that each executor has one core.
As you can see, the processing of the tasks is sequential within each executor, and tasks are executed in parallel across executors. And like in MapReduce, a queue of unprocessed task is maintained, and everytime a slot is done, it gets a new task. When all tasks have been assigned, the slots who ae done become idle and wait for all others to complete.
Below, we show the same, but with two cores per executor, i.e., two slots per executor.
Chains of narrow-dependency transformations¶
Let us now consider the case of a chain of several narrow-dependency transformations, executed in turn.
Natively, one could expect the execution to look like this (oversimplifying and assuming there is one task per slot):
However, this would be very inefficient because it requires shipping intermediate data over the network. But if all transformations are narrow-dependency transformations, it is possible to chain them without having data leaving the machines:
In fact, on the physical level, the physical calls of the underlying map/ lter/etc functions are directly chained on each input value to directly produce the corresponding final, output value, meaning that the intermediate RDDs are not even materialized anywhere and exist purely logically. This means in particular that there is a single set of tasks, one for each partition of the input, for the entire chain of transformations.
Such a chain of narrow-dependency transformations executed efficiently as a single set of tasks is called a stage, which would correspond to what is called a phase in MapReduce.
Physical parameters¶
Users can parameterize how many executors there are, how many cores there are per executor and how much memory per executor (remember that these then correspond to YARN container requests). In most recent version of Spark, the number of executors can also be dynamically allocated by the cluster.
Below is an example of command to execute a Spark job (supplied within a jar file) with 42 executors, each having 2 cores and 3 GB of memory.
spark-submit
--num-executors 42
--executor-cores 2
--executor-memory 3G
my-application.jar
Shuffling¶
What about wide-dependency transformations? They involve a shuffling of the data over the network, so that the data necessary to compute each partition of the output RDD is physically at the same location.
Thus, on the high-level of a job, the physical execution consists of a sequence of stages, with shu ing happening everytime a new stage begins:
Even though one can imagine a physical implementation in which two stages (corresponding to di erent, independent parts of the DAG of RDDs) are executed at the same time on two sub-parts of the cluster (with each their own executors), a more typical setting is a linear succession of stages on the physical level.
Given a DAG, it is always possible to list the nodes in a linear order compatible with the partial order relation underlying the DAG; this is called a linearization of the DAG. In the above example, Stage 1 can be executed on the entire cluster, then Stage 2, then Stage 3, then Stage 4.
Optimizations¶
Some understanding and knowledge of the internals of Spark can be useful in order to get more performance from it. There are in particular two aspects worthy of a mention.
Pinning RDDs¶
Everytime an action is triggered, all the computations of the reverse transitive closure (i.e., all the way up the DAG through the reverted edges) are set into motion. In some cases, several actions might share subgraphs of the DAG. It makes sense, then, to pin the intermediate RDD by persisting it. It is possible to ask for persistence in memory and/or on disk. In some circumstances, persistence is not possible because of a lack of resources, in which case the full computation can be retriggered.
Pre-partitioning¶
Shuffle is needed to bring together the data that is needed to jointly contribute to individual output values. If, however, Spark knows that the data is already located where it should be, then shuffling is not needed. A simple example is when data is sorted before being grouped with the same keys as for sorting: then, Spark has the knowledge that the groups are already consistent with their physical location, and that no additional shuffling is needed between sorting and grouping.
Users can control how the data is partitioned by asking Spark to repartition, or pre-partition, the data in a certain way.
DataFrame in Spark¶
Data independence¶
RDDs, the primary citizen in Sparks model and the nodes in the data ow DAG, are very exible. In particular, the values can be of any (static) type, while it is possible to use polymorphism in the host language to allow for different dynamic types. Thus, Spark has no problem dealing with heterogeneous and even nested datasets.
However, the complexity of dealing with these datasets is a burden that is placed on the user, because the consequence of this exible model is that it is also a low-level model. This causes a lot of extra work for the end user: unlike a relational database that has everything right off-the-shelf, with RDDs, the user has to reimplement all the primitives they need. For example, when reading a JSON Lines dataset, the user must manually read the input as an RDD of strings, then invoke a JSON parsing function, and only then work with the language primitives. An example is shown below.
rdd = spark.sparkContext.textFile(hdfs:///dataset.json)
rdd2 = rdd.filter(lambda l: parseJSON(l))
rdd3 = rdd2.filter(lambda l: l[key] = 0)
rdd4 = rdd3.map(lambda l: (l[key], l[otherfield]))
result = rdd4.countByKey()
The reader might remember that we discussed data independence before. What is described above is a breach of the data independence principle. This was, of course, on the minds of the developers behind Spark, because they addressed this issue in a subsequent version of Spark by extending the model with support for DataFrames and Spark SQL, bringing back a widely established and popular declarative, high-level language into the ecosystem.
A specific kind of RDD¶
A DataFrame can be seen as a speci c kind of RDD: it is an RDD of rows (equivalently: tuples, records) that has relational integrity, domain integrity, but not necessarily (as the name Row would otherwise fallaciously suggest) atomic integrity. We also saw before that DataFrames can also be characterized as homogeneous collections of valid JSON objects (which are the rows) against a schema. It is thus only logical that, in Spark, every DataFrame has a schema.
The immediate consequence is that, in the particular case of at rows, an RDD of ( at) rows is a relational table. Thus, it is very natural to think of SQL as the natural language to query them. This also was on the minds of the developers, who added support for a dialect of SQL, Spark SQL, for querying DataFrames.
Performance impact¶
DataFrames are not only useful because of the higher, more convenient, level of abstraction that they provide to the user, enhancing their productivity. DataFrames also have a positive impact on performance, because Spark can do a much better job of optimizing the memory footprint and the processing, leveraging decades of knowledge on the matter. In particular, DataFrames are stored column-wise in memory, meaning that the values that belong to the same column are stored together. Furthermore, since there is a known schema, the names of the attributes need not be repeated in every single row, as would be the case with raw RDDs. DataFrames are thus considerably more compact in memory than raw RDDs.
Generally, Spark converts Spark SQL to internal DataFrame transformation and eventually to a physical query plan. An optimizer known as Catalyst is then able to nd many ways of making the execution faster, as knowing the DataFrame schema is invaluable information for an optimizer, as opposed to generic RDDs of which one knows little statically. In fact, an optimizer like Catalyst contains more than 50 years of knowledge on optimizing relational queries.
As an example, since the data is stored by columns, whenever Spark knows that some columns are not used by subsequent transformations and actions, it can silently drop these unused columns with no consequence. This is called projecting away and it is one of the most important optimizations in large-scale databases. Projecting away can even be done already at the disk level, i.e., when reading a Parquet file, it is possible to read only the columns that are actually needed, which significantly reduces the I/O bottleneck and accelerates the job.
Input formats¶
Below is an example of code in which a dataset is directly read as JSON, a view is created, and a SQL query is then evaluated.
df = spark.read.json(hdfs:///dataset.json)
df.createOrReplaceTempView("dataset")
df2 = df.sql("SELECT * FROM dataset "
"WHERE guess = target "
"ORDER BY target ASC, country DESC, date DESC")
result = df2.take(10)
The rst 10 rows are then collected. Note that Spark automatically infers the schema from discovering the JSON Lines le, which adds a static performance overhead that does not exist for raw RDDs: there is no free lunch.
CSV also come with a schema discovery, although this one is optional (as by default, one can treat all values as strings).
Some formats like Parquet, however, come with a declared schema, making this schema discovery step superfluous.
Builtin input formats can be speci ed, when creating a DataFrame from a dataset, with a simple method call after the read command. Non-builtin formats are supplied as a string parameter to a format() method. This is extensible and it is possible to make a format builtin via extension libraries.
df = spark.read.json("hdfs:///dataset.json")
df = spark.read.parquet("hdfs:///dataset.parquet")
df = spark.read.csv("hdfs:///dir/*.csv")
df = spark.read.text("hdfs:///dataset[0-7].txt")
df = spark.read.jdbc(
"jdbc:postgresql://localhost/test?user=fred&password=secret",
...
)
df = spark.read.format("avro").load("hdfs:///dataset.avro")
It is also possible to create DataFrames on the y by validating RDDs, however this is very complex, in particular when not using Scala, the native Spark language. The opposite, converting a DataFrame to an RDD of rows, is relatively easy.
DataFrame transformations¶
It is also possible, instead of Spark SQL, to use a transformation API similar to (but distinct from) the RDD transformation API. This API can be seen as a lower-level, query-plan-like counterpart to Spark SQL. It is also similar to a Python library known as Pandas. This API is more complex to manipulate than Spark SQL and is typically more suitable to power users or data engineers who implement libraries and engines, exposing a higher-level API or language to their end users.
df = spark.read.json(hdfs:///dataset.json)
df2 = df.filter(df[name] = Einstein)
df3 = df.sortBy(asc("theory"), desc("date"))
df4 = df.select(year)
result = df4.take(10)
Unlike the RDD transformation API, there is no guarantee that the execution will happen as written, as the optimizer is free to reorganize the actual computations.
DataFrame column types¶
Let us now discuss the types that DataFrame columns can have. There are atomic and structured types.
Atomic types include:
numbers: byte, short, int, long, oat, double, decimal
non-number types: string, Boolean, binary data, date, timestamps
Below is a correspondence table mapping these types to two host languages, Java and Python.
DataFrames also support, without surprise, the three structured types we previously covered before: arrays, structs, and maps. As a reminder, structs have string keys and arbitrary value types and correspond to generic JSON objects, while in maps, all values have the same type. Thus, structs are more common than maps. Arrays correspond to JSON arrays.
Note that, to conclude, there may and will be impedance mismatches with the various input and output formats, in the sense that the type mappings might not be perfect and information can be lost (e.g., an output format might not support the full decimal value space, or an input format might have a time type).
Mapping types with each other when interfacing technologies is an important skill for data engineers and it is always a good idea to carefully document type mappings when interconnecting technologies.
The Spark SQL dialect¶
We mentioned that Spark SQL is a dialect of SQL. Spark SQL has a few limitations (i.e., there is no OFFSET clause) but also comes with a few convenient extensions, in particular to deal with nested data.
In particular, it is straightforward to get started with Spark SQL as a full-blown query with the usual clauses in the usual order will work with the exception, as we said, of the OFFSET clause, which is not implemented but, we hope, will be one day.
SELECT first_name, last_name
FROM persons
WHERE age >= 65
GROUP BY country
HAVING COUNT(*) >= 1000
ORDER BY country DESC NULLS FIRST
LIMIT 100
Note, en passant, that both GROUP BY and ORDER BY will trigger a shu e in the system, even though this can be optimized as the grouping key and the sorting key are the same.
Let us now look at extensions. The SORT BY clause can sort rows within each partition, but not across partitions, i.e., does not induce any shuffling.
SELECT first_name, last_name
FROM persons
WHERE age >= 65
GROUP BY country
HAVING COUNT(*) >= 1000
SORT BY country DESC NULLS FIRST
The DISTRIBUTE BY clause forces a repartition by putting all rows with the same value (for the speci ed eld(s)) into the same new partition.
SELECT first_name, last_name
FROM persons
WHERE age >= 65
GROUP BY country
HAVING COUNT(*) >= 1000
DISTRIBUTE BY country
It is possible to use both SORT and DISTRIBUTE:
SELECT first_name, last_name
FROM persons
WHERE age >= 65
GROUP BY country
HAVING COUNT(*) >= 1000
SORT BY country DESC NULLS FIRST
DISTRIBUTE BY country
but this is equivalent to the use of another clause, CLUSTER BY:
SELECT first_name, last_name
FROM persons
WHERE age >= 65
GROUP BY country
HAVING COUNT(*) >= 1000
CLUSTER BY country
A word of warning must be given on SORT, DISTRIBUTE and CLUSTER clauses: they are, in fact, a breach of data independence, because they expose partitions, which goes against the fundamental principle set by Edgar Codd in 1970, according to which the physical layer should not be seen by the end user. In the real world, though, data independence is di cult to strictly enforce because optimization is a difficult and complex problem. In fact, it is even undecidable. As a consequence, it is desirable to leave some control to the end user on the physical execution, to compensate for the (current) inability of optimizers to make the system truly data independent. This might change in the future as technology continues to evolve.
Spark SQL also comes with language features to deal with nested arrays and objects.
First, nested arrays can be navigated with the explode() function, like so:
The e ect of the explode() function would probably have been called a spooky action at a distance with a smile by Albert Einstein, as it is not just a function call. Its e ect is that the rows are duplicated into as many rows as items in the array, and one particular item of the array is placed on each duplicated row in the corresponding column. Thus, in the case of a single level of nestedness, explode() can be seen as normalizing the table by bringing it into rst normal form, at the cost of data duplication.
Explode() cannot deal with all use cases, though, which is why there is another, more generic construct known as LATERAL VIEW:
Alateral view can be intuitively described this way: the array mentioned in the LATERAL VIEW clause is turned into a second, virtual table with the rest of the original table is joined. The other clauses can then refer to columns in both the original and second, virtual table.
Lateral views are more powerful and generic than just an explode() because they give more control, and they can also be used to go down several levels of nesting, like so:
It is also possible to navigate and unnest nested structs (objects) using dots, like so:
Does Spark SQL solve the problem of querying denormalized data? In fact, only partially. First, dealing with nested structures is easy if there is not too much nestedness, but the verbosity of the query becomes quickly overwhelming with more nestedness and more complex use cases. SQL was designed for normalized tables and was never meant to be used on denormalized data. Using SQL-with-dots languages helps a bit but does not fully address the issue.
Second, DataFrames and Spark SQL do support data that is not in rst normal form, but they do not support at all data that does not ful ll relational integrity or domain integrity. It is possible to read as input a JSON Lines dataset with inconsistent value types in the same columns, however the schema discovery phase will simply result in a column with type string , which puts all the burden of dealing with the polymorphism of the column to the end user, leading to many additional lines of code in the host language. Sadly, messy data is commonly found in the real world and it is one of the top challenges to clean up and prepare data for machine learning pipelines.
Wewill see, in subsequent chapters, that this problem can be solved by using query languages more adequate, because specifically designed, for denormalized data, both valid (DataFrames) or unvalidated.