263-3010-00: Big Data
Section 8
Massive Parallel Processing
Swiss Federal Institute of Technology Zurich
Eidgenössische Technische Hochschule Zürich
Last Edit Date: 11/05/2024
Disclaimer and Term of Use:
We do not guarantee the accuracy and completeness of the summary content. Some of the course material may not be included, and some of the content in the summary may not be correct. You should use this file properly and legally. We are not responsible for any results from using this file
This personal note is adapted from Professor Ghislain Fourny. Please contact us to delete this file if you think your rights have been violated.
This work is licensed under a Creative Commons Attribution 4.0 International License.
Now that we know how and where to store datasets; and we know how to model nested, heterogeneous datasets; and we know how to validate them; and we know how to build data frames (when applicable); and we know how they can be stored in various optimized binary formats, it is tome to finally move on to the truly awesome part of Big Data: actually processing gigantic amout of data.
Counting cats¶
The inputs¶
This is where Erwin, who lived in Zurich, comes into play. Erwin has hundreds of cats of ten various kinds, specifically he has:
Persian cats
Siamese cats
Bengal cats
Scottish folds
American shorthairs
Maine coons
British shorthairs
Sphynx cats
Ragdolls
Norwegian forest cats
However, Erwin lost track of counts, and would like to know how many cats of each kind he has with him. He could, of course, count them one by one, but Erwin thinks there has to be a better way.
The map¶
Fortunately, his 22 grandchildren are visiting him this weekend, and Erwin has a plan.
First, he will distribute the cats across all 17 rooms of his large mansion. Then he will sen 17 of his grandchildren to each room, one per room, and ask them to count the cats, by kind, of the room they are assigned to, and to then write the counts of each kind on a piece of paper. This is an example for one room:
Cat Breed | Number |
---|---|
Persian cats | 12 |
Siamese cats | 23 |
Bengal cats | 14 |
Scottish folds | 23 |
American shorthairs | 36 |
Maine coons | 3 |
British shorthairs | 5 |
Sphynx cats | 54 |
Ragdolls | 2 |
Norwegian forest cats | 63 |
The shuffle¶
Meanwhile, the other 5 grandchildren have been each assigned several kinds of cats. Each kind of cat will be processed by one, and exactly one, of these 5 grandchildren. Mathias, who is the oldest grandchild, will process 3 kinds of cats. Leane, Anna and Elliot will process 2 kinds each, and Cleo, the younggest one, will process 1 kind of cats, making it 10 in total.
Cat Breed | Assigned Grandchild |
---|---|
Persian cats | Mathias |
Siamese cats | Anna |
Bengal cats | Cleo |
Scottish folds | Mathias |
American shorthairs | Leane |
Maine coons | Mathias |
British shorthairs | Elliot |
Sphynx cats | Leane |
Ragdolls | Anna |
Norwegian forest cats | Elliot |
By the time this is done, the 17 grandchildren are coming back with their sheets of paper. Erwin gives them scissors and asks them to divide these sheets of paper into stripes: each stripe with a kind of cat and its count. Thne, the group of 17 and the group of 5 start walking in all directions, where each stripe of paper is handed over by the grandchild (from the group of 17) who created it, to the one supposed to process if (from the group of 5).
Needless to say, the mansion is quite noisy at that time, and it takes quite a while until all stripes of paper are finally in the hands of Mathias, Leane, Anna, Elliot, and Cleo. Cleo received 16 stripes with Bengal cats counts; indeed, in one of the 17 rooms, there was no Bengal cats at all. Mathias received 17 stripes of Persian cat counts, 15 stripes of Scottish fold counts, and 14 stripes of Maine coons, and so on.
The reduce¶
Now, Erwin asks the 5 grandchildren to add the counts for each kind of cat. Cleo has one big addition to do with all the numbers on her stripes:
Bengal cats | 12 |
Bengal cats | 13 |
Bengal cats | 23 |
Bengal cats | 3 |
Bengal cats | 1 |
Bengal cats | 6 |
Bengal cats | 13 |
Bengal cats | 6 |
Bengal cats | 14 |
Bengal cats | 9 |
Bengal cats | 7 |
Bengal cats | 7 |
Bengal cats | 9 |
Bengal cats | 11 |
Bengal cats | 12 |
Bengal cats | 11 |
Cleo proudly creates a new stripe with her grand total:
Bengal cats | 157 |
The output¶
Mathias, Anna, Elliot and Leane do the same and give back their newly created stripes to Erwin, who now has the overview of this cat counts:
Cat Breed | Number |
---|---|
Persian cats | 412 |
Siamese cats | 233 |
Bengal cats | 157 |
Scottish folds | 233 |
American shorthairs | 36 |
Maine coons | 351 |
British shorthairs | 153 |
Sphynx cats | 236 |
Ragdolls | 139 |
Norwegian forest cats | 176 |
What did just happen? This is simply an instance of a MapReduce computation. The first phase, with the 17 separate rooms, is called the Map phase. Then came the Shuffle phase, when the stripes were handed over. And finally came the Reduce phase, with the computation of the final grand totals.
Patterns of large-scale query processing¶
Textual input¶
We saw that the data we want to query can take many forms. First, it can be billions of lines of text (this is from Sherlock Holmes, which came into public domain a few years ago, which in turn explains why there are so many movies and series about Sherlock Holmes these days):
In the year 1878 I took my degree of Doctor of Medicine of the University of London, and proceeded to Netley to go through the course prescribed for surgeons in the army. Having completed my studies there, I was duly attached to the Fifth Northumberland Fusiliers as Assistant Surgeon. The regiment was stationed in India at the time, and before I could join it, the second Afghan war had broken out. On landing at Bombay, I learned that my corps had advanced through the passes, and was already deep in the enemys country. I followed, however, with many other officers who were in the same situation as myself, and succeeded in reaching Candahar in safety, where I found my regiment, and at once entered upon my new duties.
It can also be plenty of CSV lines (these will likely be familiar to residents of Zurich):
Year,Date,Duration,Guest
2022,2019-04-25,00:37:59,UR
2021,2019-04-19,00:12:57,UR
2020,NULL,NULL,NULL
2019,2019-04-08,00:17:44,Strassburg
2018,2018-04-16,00:20:31,BS
2017,2017-04-24,00:09:56,GL
2016,2016-04-18,00:43:34,LU
...
Or maybe some use-case-specific textual format (common for weather data, for example):
20222019-04-2500:37:59UR
20212019-04-1900:12:57UR
20200000-00-0000:00:0000
20192019-04-0800:17:44FR
20182018-04-1600:20:31BS
20172017-04-2400:09:56GL
20162016-04-1800:43:34LU
...
Such a format can also be made of a key-value pattern, here, with key and value separated by a space character:
2022 00:37:59.000000
2021 00:12:57.000000
2020 00:00:00.000000
2019 00:17:44.000000
2018 00:20:31.000000
2017 00:09:56.000000
2016 00:43:34.000000
...
A popular format that is more machine readable is the JSON Lines format, with one JSON object per line. Note that it can be heterogeneous:
{ "year": 2022, "date": "2022-04-25", "duration": "00:37:59", "canton": "UR" }
{ "year": 2021, "date": "2021-04-19", "duration": "00:12:57", "canton": "UR" }
{ "year": 2020, "duration": null }
{ "year": 2019, "date": "2019-04-08", "duration": "00:17:44", "city": "Strasbourg" }
{ "year": 2018, "date": "2018-04-16", "duration": "00:20:31", "canton": "BS" }
{ "year": 2017, "date": "2017-04-24", "duration":"00:09:56","canton":"GL"}
{ "year":2016, "date":"2016-04-18", "duration":"00:43:34","canton":"LU"}
Other input formats¶
Some other formats (e.g. Parguet, ...) can be binary, as we saw before. Personally, we are not able to decode it directly, but luckily computers can:
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
We also encountered HFiles in Chapter 6, which are lists of key-value pairs. In fact, Hadoop has another such kind of key-value-based format called Sequence File, which is simply a list of key-values, but not necessarily sorted by key (although ordered) and with keys not necessarily sorted by key (although ordered) and with keys not necessarily unique. This is because, as we will see shortly, this format is used for intermediate data generated with MapReduce.
Sequence Files also have a flavor in which values are compressed, and another flavor in which their block (akin to what we called HBlocks in HFiles) are compressed.
Shards¶
How do we store Petabytes of data on online cloud strage, such as S3 or Azure blob storage, where maximum size of a file is limited? Simply by spreading the data over may files. It is very common to have datasets lying in a directory spread over 100 or 1000 files. Often, these files are named incrementally: part-0001, part-0002, etc. These files are often also called "shards" of the dataset.
What about HDFS? Thechnically, HDFS would make it possible to have a gigantic, single file, automatically partitioned into blocks. However, also for HDFS, it is common to have a pattern with a directory containing many files named incrementally. The size of these files is typically higher than that of a block, for example 10 GB files.
Note that the size of the files do not constrain parallelism: with HDFS, even with 10 GB files, the 128 MB blocks within the same file can still be processed in parallel. S3 is also compatiable with intra-file parallelism. There are several pratical motivations for the many-files pattern even in HDFS:
As we will see, it is much more convenient to output several shards from a framework like MapReduce than it is to create a single, big, final file (which would require a way to concatenate it properly).
It is considerably easier to download or upload datasets that are stored as many files. This is because network issues happen once in a while, and you can simply retry with only the files that failed. With a single 10 PB file, the time it takes is so long that it is extremely likely that a network issue will force you to start is all over and over.
Of course, the data does not need to be stored directly as files on cloud storage or a distributed file system: it could be stored as a relational table in a relational database, or in a wide column store like HBase. In fact, it is quite common to store XML, HTML, or JSON data in HBase cells. MapRecude can also process data on these higher-level stores.
Querying pattern¶
Now that we have the data, how does querying it look like? On the very high-level, it converts some input to some output like so:
However, because the underlying storage supports parallelism (via shards, blocks, regions, etc), the input as well as the output are partitioned:
So far, that does not really help us. But what if we are lucky and the query can, in fact, be reexpressed equivalently to simply map every input partition to an output partition, like so?
Sadly, in reality, this is what you might find instead something that looks more like spaghetti:
Fortunately, what happens often is somewhere in the middle, with some data flow patterns:
Some places have data flowing in parallel (map-like):
While some others are more spaghetti-y (shuffle-like):
This is the motivation behind the standard MapReduce pattern: a map-like phase on the entire input, then a shuffle phase on the intermediate data, then another map-like phase (called reduce) producing the entire output:
This seems restrictive, but in reality, it is extremely powerful and generic to accomodate for many use cases, in particular if you chain MapReduce jobs with each other.
MapReduce model¶
In MapReduce, the input data, intermediate data, and output data are all made of a large collection of key-value pairs (with the keys not necessarily unique, and not neseccarily sorted by key):
The types of the keys and values are known at compile-time (statistically), and they do not need to be the same across all three collections:
In practice, however, it is quite common that the type of the intermediate key-value pairs if the same as that of the output key-value pairs:
Let us now walk through a MapReduce query, but on the logical level for now. Everything starts with partitioning the input. MapReduce calls this partitions "splits":
Each key-value will be fed into a map function:
However, as you can imagine, it would be extremely inefficient to do it pair by pair, thus, the map function is called in batches, on each split:
The intermediate pairs can be seen, logically, as a single big collection:
This collection can be thought of as logically sorted:
And then partitioned again, making sure the pairs with the same key are always in the same partition (but a partition can have pairs with several keys):
The reduce function must then be called, for every unique key, on all the pairs with that key, outputting zero, one or more output pairs (here, we just drew one, which is the most common case):
Just like the map function, the reduce function is called in batches, on each intermediate partition (multiple calls, one per unique key):
If we recap, this is how the entire process looks like on the high (and logical) level:
MapReduce architecture¶
MapReduce can read its input from many places as we saw: a cloud storage service, HDFS, a wide column store, etc. The data can be Petabyte-sized, with thousands of machines involved.
On a cluster, the architecture is centralized, just like for HDFS and HBase. In the original version of MapReduce, the main node is called JobTracker, and the worker nodes are called TaskTrackers.
In fact, the JobTracker typically runs on the same machine as the NameNode (and HMaster) and the TaskTrackers on the same machines as the DataNode (and RegionServers). This is called "bring the query to the data." If using HDFS, then most of the time, for the map phase, things will be orchestrated in such a way that there is a replica of the block corresponding to the split on the same machine (since it is also a DataNode ...), meaning that it is a local read and not a network connection.
As the map phase progresses, there is a risk that the memory becomes full. But we have seen this before in HBase: the intermediate pairs on that machine are then sorted by key and fllushed to the disk to a Sequence File. And as more flushes happen, these Sequence Files can be compacted to less of them, very similar to HBase's Log-Structured Merge Trees.
When the map phase is over, each TaskTracker runs an HTTP server listening for connections, so that they can connect to each other and ship the intermediate data over to create the intermediate partitions ensuring that the same keys are on the same machines (we will look at this again with more precise termiology later). This is the phase called shuffling. Then, the reduce phase can start.
Note that shuffling can start before the map phase is over, but the reduce phase can only start after the map phase is over.
When the reduce phase is completed, each output partition will be output to a shard (as we saw, a file named incrementally) in the output destination (HDFS, S3, etc) and in the desired format.
MapReduce input and output formats¶
Impedance mismatch¶
Let us now get back to the input received by and the output produced by MapReduce. We looked at various examples of textual and binary formats before, and saw in particular that MapReduce can read its input from files lying in a data lake as well as directly from a database system such as HBase or a relational database management system.
As the you may have noticed, MapReduce only reads and writes lists of key-value pairs, where keys may be duplicates and need not appear in order. However, the inputs we considered are not key-value pairs. So we need an additional mechanism that allows MapReduce to interpret this input as key-value pairs.
For tables, whereas relational or in a wide column sotres, this is relative easy: indeed, tables have primary keys, consisting of either a single column or multiple columns. Thus, each tuple can be interpreted as a key-value pair, where the key is the (sub)tuple containing all the values associated with columns that are part of the primary key, while the value is the (sub)tuple containing the values associated with all other columns.
Mapping files to pairs¶
Let us thus focus on files. How do we read a (possibly huge) text file as a list of key-value pairs? The most natural way to do so it to turn each line of test in a key value pair: the value is the string corresponding to the entire line, while the key is an integer that expresses the position (as a number of characters), or offset, at which the line starts in the current file being read, like so:
A small variation consists in reading N lines at a time, mapping them to a single key-value:
Another variation consists of treating a character (picked by the user) specially, as the separator between the key and the value:
Since MapReduce is not aware of JSON or any syntax, a JSON Line file will be read as text as explained above, and it is the repsponsibility of the user to parse this text to JSON. This is, of course, not very convenient, as it means MapReduce pushes the burden of doing so to theuser, but we will see in subsequent chapters that there are additional layers that will come on top of the technology stack, which free the user from having to deal with these details.
Sequence Files are easier to handle: since they already contain lists of key-value pairs in a format native to MapReduce, their interpretation is straightforward. In fact, intermediate data spilled to disk will be written and read back in this format.
A few examples¶
Counting words¶
Let us use a concrete example and count the number of occurences of each word in this document
Lorem ipsum
dolor sit amet,
consectetur adipiscing elit,
sed
...
We already saw how this is interpreted as key-value pairs:
We can count the words within each line similar to our motivation example with the cats, by mapping each line key-value to several key-values, one per word and with a count of 1. This gives us out map function:
The reduce function is then obtained by summing the values with the same key, and keeping the same key:
The output will then consist of a list of unique key-values, with one key for each word, and the number of its occurences as the associated value.
Selecting¶
How about, say, filtering the lines containing a specific word? This is something easily done by having a map function that outputs a subset of its input, based on some predicate provided by the user:
Here we notice that the output of the map phase already gives us the desired result; we still need to provide a reduce function, which is taken trivially as the identity function. This is not unusual (and there are also examples where the map functions is trivial, and the reduce function is doing the actual processing):
In fact, what we have just implemented in MapReduce is nothing else than a selection operator from the relational algebra. So now, you know how to implement the following SQL query on text, seen as a one-column table, on top of MapReduce, and in a way that scales to billions of lines!
SELECT text
FROM input
WHERE text LIKE '\%foobar\%'
Projecting¶
What about projection on some input in the JSON Lines format? Well, MapReduce does not know anything about attributes. So it is up to the user to parse, in their code, each line to a JSON object (ex: if using Python, to a dict), like so:
Then, the map function can project this object to an object with less attributes:
And, like previously, we use a trivial identify function for the reduce function:
MapReduce will then output the results as one or several output files in the JSON Lines format. What we have just implemented on billions of records is the following equivalent SQL query:
SELECT id, country
FROM input
Combine functions and optimization¶
Let us look again at the MapReduce phase:
Is there any way we can optimize things and run even faster? In fact, there is. In out counting example, we created an intermediate key-value for each occurence of a word with a value set to 1. But what if a word appears 5 times on the same line? In this case, we can replace the corresponding key-value pairs with just one pair, with the value 5. Doing so is called combining and happens during the map phase:
Thus, in addition to the map function and the reduce function, the user can supply a combine function. This combine function can then be called by the system during the map phase as many times as it sees fit to "compress" the intermediate key-value pairs. Strategically, the comobine function is likely to be called at every flush of key-value pairs to a Sequence File on disk, and at every compaction of several Sequence Files into one.
However, there is no guarantee that the combine function will be called at all, and there is also no guarantee on how many times it will be called. Thus, if the user provides a combine function, it is important that they think carefully about a combine function that does not affect the corretness of the output data. In fact, in most of the cases, the combine function will be identical to the reduce function, which is generally possible if the intermediate key-value pairs have the same type as the output key-value pairs, and the reduce function is both associative and commutative. This is the case for summing or multiplying values as well as for taking the maximum or the minimum, but not for an unweighted average. As a reminder, associateivty means that $(a + b) + c = a + (b + c)$ and commutativity means that $a + b = b + a$.
In out example, the reduce function fulfills these criteria and can then be used as a combines function as well:
MapReduce programming API¶
Let us now move to the concrete use of MapReduce in a computer program.
Mapper classes¶
In Java, the user needs to define a so-called Mapper class that contains the map function, and a Reducer class that contains the reduce function.
A map function takes in particular a key and a value. Note that it outputs key-value pairs via the call of the write method on the context, rather than with a return statement. That way, it can output zero, one or more key-values A Mapper class looks like so:
import org.apache.hadoop.mapreduce.Mapper;
public class MyOwnMapper extends Mapper<K1, V1, K2, V2>{
public void map(K1 key, V1 value, Context context)
throws IOException, InterruptedException
{
...
K2 new-key = ...
V2 new-value = ... context.write(new-key, new-value); ...
}
}
Reducer classes¶
A reduce function takes in particular a key and a list of values. Note that it outputs key-value pairs via the call of the write method on the context, rather than with a return statement. That way, it can output zero, one, or more key-values. A Reducer class looks like so:
import org.apache.hadoop.mapreduce.Reducer;
public class MyOwnReducer extends Reducer<K2, V2, K3, V3> {
public void reduce (
K2 key,
Iterable<V2> values,
Context context)
throws IOException, InterruptedException
{
...
K3 new-key = ...
V3 new-value = ... context.write(new-key, new-value); ...
}
}
Running the job¶
Finally, a MapReduce job can be created and invoked by supplying a Mapper and Reducer instance to the job, like so:
import org.apache.hadoop.mapreduce.Job;
public class MyMapReduceJob {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setMapperClass(MyOwnMapper.class);
job.setReducerClass(MyOwnReducer.class);
FileInputFormat.addInputPath(job, ...);
FileOutputFormat.setOutputPath(job, ...);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
A combine function can also be supplied with the setCombinerClass method (passing, for example, the same Reducer instance).
It is also possible to use Python ratehr than Java, via the so-called Streaming API. The Stream API is the general way to invoke MapReduce jobs in other languages than Java (or than JVM languages, like Scala). This is done by creating two files, say, mapper.py and reducer.py. These two files take input (or intermediate) key-value pairs from standard input, and write intermediate (or output) key-value pairs to standard output. They are then invoked on the command line:
$ hadoop jar hadoop-streaming*.jar \
-files mapper.py,reducer.py \
-mapper mapper.py \
-reducer reducer.py \
-input input-output output
Without going too much in details, the input formats and output formats can also be specified in all programming languages. In Java, this is in the form of picking Java classes inheriting from InputFormat (DBInputFormat for a relational database, TableInputFormat for HBase, KeyVlaueTextInputFormat, SequenceFileInputFormat, TextInputFormat, FixedLengthInputFormat, NLineInputFormat ...) or OutputFormat (DBOutputFormat, TableOutputFormat, SequenceFileOutputFormat, TextOutputFormat, MapFileOutputFormat...).
Using correct terminology¶
A warning¶
Let us now have a word of warning: the terminology "Mapper" and "Reducer" should only be used in the context of naming classes and files, but never when describing the MapReduce architexture. Even less so with "Combiner."
This is because this terminology is very imprecise and makes it very difficult to comprehend the MapReduce architecture. Sadly, many resources in books and on the Web do refer to mappers, reducers, and combiners. We hope the reader will find what follows enlightening and helpful to truly understand what is going on in a MapReduce cluster.
Rather than "mapper," we encourage the reader to user "map function," "map task," "map slot," or "map phase" depending on what is meant. Rather than "reducer," we encourage the reader to use "reduce function," "read task," "reduce slot," or "reduce phase" depending on what is meant. Rather than "combiner," we encourage the reader to use "combine function" - there is no such thing as a combine task, a combine slot or a combine phase.
Functions¶
Let us start with functions.
A map function is a mathematical, or programmed, function that takes one input key-value pair and returns zero, one or more intermediate key-value pair and returns zero, one or more intermediate key-value pairs.
A reduce function is a mathematical, or programmed, function that takes one or more intermediate key-value pairs and returns zero, one or more output key-value pairs.
A combine function is a mathematical, or programmed, function that takes one or more itermediate key-value pairs and returns zero, one or more intermediate key-value pairs.
Tasks¶
Then, a map task is an assignment (or "homework," or "TODO") that consists in a (sequential) series of calls of the map function on a subset of the input. There is one map task for every input split, so that there are many map tasks as partitions of the input.
A reduce task is an assignment that consists in a (seqnential) series of calls of the reduce function on a subset of the intermediate input. There are as many reduce tasks as partitions of the list of intermediate key-value pairs.
We insist the calls within a task are sequential, meaning that there is no parallelism at all within a task. You can think of it as a for loop calling the function repeatedly, with the size of the for loop being, in a typical setting, between 1,000 and 1,000,000 calls.
There is no such thing as a combine task. Calls of the combine function are not planned as a task, but is called as-hoc during flushing and compaction.
Slots¶
The map tasks are processed thanks to compute and memory resources (CPU and RAM). There resources are called map slots. One map slot corresponods to one CPU core and some allocated memory. The number of map slots is limited by the number of available cores. Each map slot then processes on map task at a time, sequentially. This means that the same ma slot can process several map tasks.
The resources used to process reduce tasks are called reduce slots. Again, one reduce slot corresponds to one CPU core and some allocated memory. The number of reduce slots is limited by the number of available cores. Each reduce slot then processes one reduce task at a time, sequentially. This means that the same reduce slot can process multiple reduce tasks.
So, there is no parallelism either within one map slot, or one reduce slot. In fact, parallelism happens across several slots. In a typical MapReduce job, there will be more tasks than slots. Initially, each slot will receive one task, and the other tasks are kept pending. Every time a slot is done processing a task, it receives a new task from the pending list, and so on, until no task is left: then, some slots will remain idle until all tasks have been processed. If a task fails, it can be reassigned to another slot.
Phases¶
The map phase thus consists of several map slots processing map tasks in parallel:
And the reduce phase consists of several reduce slots processing reduce tasks in parallel:
This is a summary of how functions, tasks, slots, and phases fit together and within cluster nodes:
In the very first verison of MapReduce (with a JobTracker and TaksTrackers), map slots and reduce slots are all pre-allocated from the very beginning, which blocks parts of the cluster remaining idle in both phases. We will see in the next chapter that this can be improved.
Impedance mismatch: blocks vs. splits¶
We finish this section witha a more in-depth discussion of how MapReduce and HDFS interact. Now that we have the necessary terminology to express it, we can say that the "bring the query to the data" paradigm means that the data belonging to the split taht is the input of a map task resides on the same machines as the mao slot processing this map task. How can it be? This is because the DataNode process of HDFS and the TaskTracker process of MapReduce are on the same machine. Thus, getting a replica of the block containing the data necessary to processing of the task is as simple as a local read. This is called short-circuiting, the same name we gave to the local read of HFiles in HBase as well.
But there is something important to consider: HDFS blocks have a size of (at most) 128 MB. In every file, all blocks but the last one have a size of exactly 128 MB. Splits, however, only contain full records: a key-value pair will only belong to one split (and thus be processed by one mao task). This means that, while most key-value pairs will be in the same block, the first and / or last ley-value pair in a split will e spread across two blocks. This means that, while most of the data is obtained locally, getting the first and / or last record in full will require a remot read over the HDFS protocol. This, in tuen, is also the reason why the HDFS API gives the ability to only read a block partially.