263-3010-00: Big Data
Section 8
Massive Parallel Processing
Swiss Federal Institute of Technology Zurich
Eidgenössische Technische Hochschule Zürich
Last Edit Date: 11/05/2024
Disclaimer and Term of Use:
We do not guarantee the accuracy and completeness of the summary content. Some of the course material may not be included, and some of the content in the summary may not be correct. You should use this file properly and legally. We are not responsible for any results from using this file
This personal note is adapted from Professor Ghislain Fourny. Please contact us to delete this file if you think your rights have been violated.
This work is licensed under a Creative Commons Attribution 4.0 International License.
Now that we know how and where to store datasets; and we know how to model nested, heterogeneous datasets; and we know how to validate them; and we know how to build data frames (when applicable); and we know how they can be stored in various optimized binary formats, it is tome to finally move on to the truly awesome part of Big Data: actually processing gigantic amout of data.
Counting cats¶
The inputs¶
This is where Erwin, who lived in Zurich, comes into play. Erwin has hundreds of cats of ten various kinds, specifically he has:
Persian cats
Siamese cats
Bengal cats
Scottish folds
American shorthairs
Maine coons
British shorthairs
Sphynx cats
Ragdolls
Norwegian forest cats
However, Erwin lost track of counts, and would like to know how many cats of each kind he has with him. He could, of course, count them one by one, but Erwin thinks there has to be a better way.
The map¶
Fortunately, his 22 grandchildren are visiting him this weekend, and Erwin has a plan.
First, he will distribute the cats across all 17 rooms of his large mansion. Then he will sen 17 of his grandchildren to each room, one per room, and ask them to count the cats, by kind, of the room they are assigned to, and to then write the counts of each kind on a piece of paper. This is an example for one room:
Cat Breed | Number |
---|---|
Persian cats | 12 |
Siamese cats | 23 |
Bengal cats | 14 |
Scottish folds | 23 |
American shorthairs | 36 |
Maine coons | 3 |
British shorthairs | 5 |
Sphynx cats | 54 |
Ragdolls | 2 |
Norwegian forest cats | 63 |
The shuffle¶
Meanwhile, the other 5 grandchildren have been each assigned several kinds of cats. Each kind of cat will be processed by one, and exactly one, of these 5 grandchildren. Mathias, who is the oldest grandchild, will process 3 kinds of cats. Leane, Anna and Elliot will process 2 kinds each, and Cleo, the younggest one, will process 1 kind of cats, making it 10 in total.
Cat Breed | Assigned Grandchild |
---|---|
Persian cats | Mathias |
Siamese cats | Anna |
Bengal cats | Cleo |
Scottish folds | Mathias |
American shorthairs | Leane |
Maine coons | Mathias |
British shorthairs | Elliot |
Sphynx cats | Leane |
Ragdolls | Anna |
Norwegian forest cats | Elliot |
By the time this is done, the 17 grandchildren are coming back with their sheets of paper. Erwin gives them scissors and asks them to divide these sheets of paper into stripes: each stripe with a kind of cat and its count. Thne, the group of 17 and the group of 5 start walking in all directions, where each stripe of paper is handed over by the grandchild (from the group of 17) who created it, to the one supposed to process if (from the group of 5).
Needless to say, the mansion is quite noisy at that time, and it takes quite a while until all stripes of paper are finally in the hands of Mathias, Leane, Anna, Elliot, and Cleo. Cleo received 16 stripes with Bengal cats counts; indeed, in one of the 17 rooms, there was no Bengal cats at all. Mathias received 17 stripes of Persian cat counts, 15 stripes of Scottish fold counts, and 14 stripes of Maine coons, and so on.
The reduce¶
Now, Erwin asks the 5 grandchildren to add the counts for each kind of cat. Cleo has one big addition to do with all the numbers on her stripes:
Bengal cats | 12 |
Bengal cats | 13 |
Bengal cats | 23 |
Bengal cats | 3 |
Bengal cats | 1 |
Bengal cats | 6 |
Bengal cats | 13 |
Bengal cats | 6 |
Bengal cats | 14 |
Bengal cats | 9 |
Bengal cats | 7 |
Bengal cats | 7 |
Bengal cats | 9 |
Bengal cats | 11 |
Bengal cats | 12 |
Bengal cats | 11 |
Cleo proudly creates a new stripe with her grand total:
Bengal cats | 157 |
The output¶
Mathias, Anna, Elliot and Leane do the same and give back their newly created stripes to Erwin, who now has the overview of this cat counts:
Cat Breed | Number |
---|---|
Persian cats | 412 |
Siamese cats | 233 |
Bengal cats | 157 |
Scottish folds | 233 |
American shorthairs | 36 |
Maine coons | 351 |
British shorthairs | 153 |
Sphynx cats | 236 |
Ragdolls | 139 |
Norwegian forest cats | 176 |
What did just happen? This is simply an instance of a MapReduce computation. The first phase, with the 17 separate rooms, is called the Map phase. Then came the Shuffle phase, when the stripes were handed over. And finally came the Reduce phase, with the computation of the final grand totals.
Patterns of large-scale query processing¶
Textual input¶
We saw that the data we want to query can take many forms. First, it can be billions of lines of text (this is from Sherlock Holmes, which came into public domain a few years ago, which in turn explains why there are so many movies and series about Sherlock Holmes these days):
In the year 1878 I took my degree of Doctor of Medicine of the University of London, and proceeded to Netley to go through the course prescribed for surgeons in the army. Having completed my studies there, I was duly attached to the Fifth Northumberland Fusiliers as Assistant Surgeon. The regiment was stationed in India at the time, and before I could join it, the second Afghan war had broken out. On landing at Bombay, I learned that my corps had advanced through the passes, and was already deep in the enemys country. I followed, however, with many other officers who were in the same situation as myself, and succeeded in reaching Candahar in safety, where I found my regiment, and at once entered upon my new duties.
It can also be plenty of CSV lines (these will likely be familiar to residents of Zurich):
Year,Date,Duration,Guest
2022,2019-04-25,00:37:59,UR
2021,2019-04-19,00:12:57,UR
2020,NULL,NULL,NULL
2019,2019-04-08,00:17:44,Strassburg
2018,2018-04-16,00:20:31,BS
2017,2017-04-24,00:09:56,GL
2016,2016-04-18,00:43:34,LU
...
Or maybe some use-case-specific textual format (common for weather data, for example):
20222019-04-2500:37:59UR
20212019-04-1900:12:57UR
20200000-00-0000:00:0000
20192019-04-0800:17:44FR
20182018-04-1600:20:31BS
20172017-04-2400:09:56GL
20162016-04-1800:43:34LU
...
Such a format can also be made of a key-value pattern, here, with key and value separated by a space character:
2022 00:37:59.000000
2021 00:12:57.000000
2020 00:00:00.000000
2019 00:17:44.000000
2018 00:20:31.000000
2017 00:09:56.000000
2016 00:43:34.000000
...
A popular format that is more machine readable is the JSON Lines format, with one JSON object per line. Note that it can be heterogeneous:
{ "year": 2022, "date": "2022-04-25", "duration": "00:37:59", "canton": "UR" }
{ "year": 2021, "date": "2021-04-19", "duration": "00:12:57", "canton": "UR" }
{ "year": 2020, "duration": null }
{ "year": 2019, "date": "2019-04-08", "duration": "00:17:44", "city": "Strasbourg" }
{ "year": 2018, "date": "2018-04-16", "duration": "00:20:31", "canton": "BS" }
{ "year": 2017, "date": "2017-04-24", "duration":"00:09:56","canton":"GL"}
{ "year":2016, "date":"2016-04-18", "duration":"00:43:34","canton":"LU"}
Other input formats¶
Some other formats (e.g. Parguet, ...) can be binary, as we saw before. Personally, we are not able to decode it directly, but luckily computers can:
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
01010010101010101010010101101010010101010101
001010101010101011010011110101000101101001010
We also encountered HFiles in Chapter 6, which are lists of key-value pairs. In fact, Hadoop has another such kind of key-value-based format called Sequence File, which is simply a list of key-values, but not necessarily sorted by key (although ordered) and with keys not necessarily sorted by key (although ordered) and with keys not necessarily unique. This is because, as we will see shortly, this format is used for intermediate data generated with MapReduce.
Sequence Files also have a flavor in which values are compressed, and another flavor in which their block (akin to what we called HBlocks in HFiles) are compressed.
Shards¶
How do we store Petabytes of data on online cloud strage, such as S3 or Azure blob storage, where maximum size of a file is limited? Simply by spreading the data over may files. It is very common to have datasets lying in a directory spread over 100 or 1000 files. Often, these files are named incrementally: part-0001, part-0002, etc. These files are often also called "shards" of the dataset.
What about HDFS? Thechnically, HDFS would make it possible to have a gigantic, single file, automatically partitioned into blocks. However, also for HDFS, it is common to have a pattern with a directory containing many files named incrementally. The size of these files is typically higher than that of a block, for example 10 GB files.
Note that the size of the files do not constrain parallelism: with HDFS, even with 10 GB files, the 128 MB blocks within the same file can still be processed in parallel. S3 is also compatiable with intra-file parallelism. There are several pratical motivations for the many-files pattern even in HDFS:
As we will see, it is much more convenient to output several shards from a framework like MapReduce than it is to create a single, big, final file (which would require a way to concatenate it properly).
It is considerably easier to download or upload datasets that are stored as many files. This is because network issues happen once in a while, and you can simply retry with only the files that failed. With a single 10 PB file, the time it takes is so long that it is extremely likely that a network issue will force you to start is all over and over.
Of course, the data does not need to be stored directly as files on cloud storage or a distributed file system: it could be stored as a relational table in a relational database, or in a wide column store like HBase. In fact, it is quite common to store XML, HTML, or JSON data in HBase cells. MapRecude can also process data on these higher-level stores.
Querying pattern¶
Now that we have the data, how does querying it look like? On the very high-level, it converts some input to some output like so:
However, because the underlying storage supports parallelism (via shards, blocks, regions, etc), the input as well as the output are partitioned:
So far, that does not really help us. But what if we are lucky and the query can, in fact, be reexpressed equivalently to simply map every input partition to an output partition, like so?
Sadly, in reality, this is what you might find instead something that looks more like spaghetti:
Fortunately, what happens often is somewhere in the middle, with some data flow patterns:
Some places have data flowing in parallel (map-like):