Deep Web and MapReduce
 Author: Tao Yufei
 Organization: Tao Yufei
 Publish: Journal of Computing Science and Engineering Volume 7, Issue3, p147~158, 30 Sep 2013

ABSTRACT
This invited paper introduces results on Web science and technology obtained during work with the Korea Advanced Institute of Science and Technology. In the first part, we discuss algorithms for exploring the
deep Web , which refers to the collection of Web pages that cannot be reached by conventional Web crawlers. In the second part, we discuss sorting algorithms on theMapReduce system, which has become a dominant paradigm for massive parallel computing.

KEYWORD
Web , Big data , MapReduce , Parallel computing , Algorithm , Theory

I. INTRODUCTION
From Sep 2011 to Jun 2013, I was a visiting professor under the
World Class University program of the Korean government, at the Korea Advanced Institute of Science and Technology (KAIST). My affiliation at KAIST was with the Division of Web Science and Technology (WebST), a newly established division with a primary mission to explore the relatively new but exciting area ofWeb science . In this invited paper, some of my research results contributing to that mission will be discussed. During my appointment with KAIST, I had retained my professorship at the Chinese University of Hong Kong, and worked at both institutions alternately during the development of these results, which therefore should be accredited to both institutions.> A. Deep Web
Existing search engines can reach only a small portion of the Internet. They crawl HTML pages interconnected with hyperlinks, which constitute one is known as the
surface Web . An increasing number of organizations are bringing their data online, by allowing public users to query their backend databases through contextdependent Web interfaces.Data acquisition is performed by
interacting with the interface at runtime, as opposed to following hyperlinks. As a result, the backend databases cannot be effectively crawled by a search engine with current technology, and are usually referred to ashidden databases .Consider
Yahoo! Autos (autos.yahoo.com), a popular Website for the online trading of automobiles. A potential buyer specifies filtering criteria through a form, as illustrated in Fig. 1. The query is submitted to the system, which runs it against the backend database, and returns the result to the user. What makes it nontrivial for a search engine to crawl the database is that setting all search criteria to ANY does not accomplish the task. The reason is that a system typically limits the numberk of tuples returned (which is roughly 1000 forYahoo! Autos ), and that repeating the same query may not retrieve new tuples, with the samek tuples always being returned.The ability of crawling a hidden database comes with
the appeal of enabling virtually any form of processing on the database’s content. The challenge, however, is clear: how to obtain
all the tuples, given that the system limits the number of return tuples for each query. A naive solution is to issue a query for every single location in the data space (for example, in Fig. 1, the data space is the Cartesian product―while one may leverage knowledge of attribute dependencies (e.g., the fact that BMW does not sell trucks in the United States) to prune the data space into a subset of the Cartesian product, the subset is often still too large to enumerate―of the domains of MAKE, BODY STYLE, PRICE, and MILEAGE). However, the number of queries needed can clearly be prohibitive. This gives rise to an interesting problem, as defined in the next subsection, where the objective is tominimize the number of queries.1) Problem Definitions
Consider that a
data space with
d attributesA _{1},...,A _{d}, each of which has a discrete domain. The domain ofA_{i} denote bydom (A_{i} ) for eachi ∈ [1,d ]. Then,is the Cartesian product of
dom (A _{1}), …,dom (A _{d}). We refer to each element of the Cartesian product as apoint inrepresenting one possible combination of values of all dimensions.
A_{i} is anumeric attribute if there is a total ordering ondom (A_{i} ). Otherwise, it is a categorical attribute. Our discussion distinguishes three types ofNumeric: all d attributes of
are numeric.
Categorical: all d attributes are categorical. In this case, we use Ui to represent the size of dom(Ai), which represents how many distinct values there are in dom(Ai).
Mixed: the first cat ∈ [1, d？1] attributes A1,…, Acat are categorical, whereas the other d ？ cat attributes are numeric. Similar to before, let Ui = dom(Ai) for each i ∈ [1, cat].
To facilitate presentation, we consider the domain of a numeric
A_{i} to be the set of all integers, whereas that of a categoricalA_{i} to be the set of integers from 1 toU_{i} . Keep in mind, however, that the ordering of these values is irrelevant to a categoricalA_{i} .Let
D be the hidden database of a server with each element ofD being a point inTo avoid ambiguity, we will always refer to elements of
D astuples .D is abag (i.e., a multiset), and it may contain identical tuples.The server supports
queries onD . As shown in Fig. 1, each query specifies a predicate on each attribute. Specifically, ifA_{i} is numeric, the predicate is a range condition in the form of:Ai ∈ [x, y]
where [
x ,y ] is an interval indom (A_{i} ). For a categoricalA_{i} , the predicate is:Ai = x
where
x is either a value indom (A_{i} ) or a wildcard ★. In particular, a predicateA_{i} = ★ means thatA_{i} can be an arbitrary value indom (A_{i} ), as shown with capturing BODY STYLE = ANY in Fig. 1. If a hidden database server only allows singlevalue predicates on a numeric attribute (i.e., no rangecondition support), then we can simply consider the attribute as categorical.Given a query
q , the bag of tuples inD qualifying all the predicates ofq is denoted byq (D ). The server does not necessarily return the entireq (D )―it does soonly whenq (D ) is small. Formally, the response of the server is:if q(D) ≤ k: the entire q(D) is returned. In this case, we say that q is resolved.
Otherwise: only k tuples―in practice, these are usually k tuples that have the highest priorities (e.g., according to a ranking function) among all the tuples qualifying the query―in q(D) are returned, together with a signal indicating that q(D) still has other tuples. In this case, we say that q overflows.
The value of
k is a system parameter (e.g.,k = 1,000 forYahoo! Autos , as mentioned earlier). It is important to note that in the event that a queryq overflows, repeatedly issuing the sameq may always result in the same response from the server, and does not help to obtain the other tuples inq (D ).PROBLEM 1 (HIDDEN DATABASE CRAWLING). Retrieve the entireD while minimizing the number of queries.Recall that
D is a bag, and it may have duplicate tuples. We require that no point in the data spacehas
more than k tuples inD . Otherwise, Problem 1 has no solution at all. To see this, consider the existence ofk + 1 tuplest _{1}, ...,t_{k} _{+1} inD , all of which are equivalent to a pointThen, whenever
p satisfies a query, the server canalways choose to leavet_{k} _{+1} out of its response, making it impossible for any algorithm to extract the entireD . InYahoo! Autos , this requirement essentially states that there cannot bek = 1,000 vehicles in the database with exactly the same values forall attributes―an assumption that is fairly realistic.As mentioned in Problem 1, the
cost of an algorithm is the number of queries issued. This metric is motivated by the fact that most systems have control over how many queries can be submitted by the same IP address within a period of time. Therefore, a crawler must minimize the number of queries to complete a task, in addition to minimizing the burden to the server.We will use
n to denote the number of tuples inD . It is clear that the number of queries needed to extract the entireD is at leastn /k . Of course, this ideal cost may not always be possible. Hence, there are two central technical questions that need to be answered. The first, on the upper bound side, relates to how to solve Problem 1 by performing only a small number of queries even in theworst case. The second, on the lower bound side, concerns how many queries arecompulsory for solving the problem in the worst case.2) Our Results
We have concluded a systematic study of
hidden database crawling as defined in Problem 1. At a high level, our first contribution is a set of algorithms that are both provably fast in the worst case, and efficient on practical data. Our second contribution is a set of lowerbound results establishing thehardness of the problem. These results explicitly clarify how the hardness is affected by the underlying factors, and thus reveal valuable insights into the characteristics of the problem. Furthermore, the lower bounds also prove that our algorithms are already optimal asymptotically, and cannot be improved by more than a constant factor.Our first main result is:
THEOREM 1. There is an algorithm for solving Problem 1 whose cost is: The above can be conveniently understood as follows: our algorithm pays an additive cost of
O (n /k ) for each numeric attributeA_{i} , whereas it paysfor each categorical
A_{i} . The only exception is whencat = 1: in this scenario, we pay merelyU _{1} for the (only) categorical attributeA _{1}. The cost of each numeric attribute isirrelevant to its domain size.Our second main result complements the preceding one:
THEOREM 2. None of the results in Theorem 1 can be improved by more than a constant factor in the worst case. Besides establishing the optimality of our upper bounds in Theorem 1, Theorem 2 has its own interesting implications. First, it indicates the unfortunate fact that for all types of
the best achievable query time in the worst case is much higher than the ideal cost of
n /k . Nevertheless, Theorem 1 suggests that we can achieve this costasymptotically whend is a constant and all attributes are numeric. Second, as the numbercat of categorical attributes increases from 1 to 2, the discrepancy of the time complexities in Theorem 1 is not an artifact, but rather, it is due to an inherentleap in the hardness of the problem (which is true regardless of the number of numeric attributes). That is, while we pay onlyO (U _{1}) extra queries for the (sole) categorical attribute whencat = 1, ascat grows to 2 and beyond, the cost paid by any algorithm for each categoricalA_{i} has an extra term ofGiven that the term is multiplicative, this finding implies (perhaps surprisingly) that, in the worst case, it may be infeasible to crawl a hidden database with a large size
n , and at least 2 categorical attributes such that at least one of them has a large domain.In this paper, we prove Theorems 1 and 2 only for the case where
is numeric. The rest of the proof is available elsewhere [1].
> B. MapReduce
We are in an era of information explosion, where industry, academia, and governments are accumulating data at an unprecedentedly high speed. This brings forward the urgent need for fast computation over colossal datasets whose sizes can reach the order of terabytes or higher. In recent years, the database community has responded to this challenge by building massive parallel computing platforms which use hundreds or even thousands of commodity machines. The most notable platform thus far is
MapReduce , which has attracted a significant amount of attention in research.Since its invention [2], MapReduce has gone through years of improvement into a mature paradigm. At a high level, a MapReduce system involves a number of sharenothing machines which communicate only by sending messages over the network. A MapReduce algorithm instructs these machines to perform a computational task collaboratively. Initially, the input dataset is distributed across the machines, typically in a nonreplicate manner, with each object on one machine. The algorithm executes in
rounds (sometimes also calledjobs in the literature), each having three phases:map ,shuffle , andreduce . The first two enable the machines to exchange data. In the map phase, each machine prepares the information to be delivered to other machines, while the shuffle phase takes care of the actual data transfer. No network communication occurs in the reduce phase, where each machine performs calculation with its local storage. The current round finishes after the reduce phase. If the computational task has not completed, another round starts.As with traditional parallel computing, a MapReduce system aims to achieve a high degree of load balancing, as well as the minimization of space, CPU, I/O, and network costs at each individual machine. Although these principles have guided the design of MapReduce algorithms, previous practices have mostly been on a besteffort basis, paying relatively less attention to enforcing serious constraints on different performance metrics. Our work aims to remedy the situation by studying algorithms that promise outstanding efficiency in multiple aspects simultaneously.
1) Minimal MapReduce Algorithms
Let
S be the set of input objects for the underlying problem. Letn be theproblem cardinality , which is the number of objects inS , andt be the number of machines used in the system. Definem =n /t , wherem is the number of objects per machine whenS is evenly distributed across the machines. Consider an algorithm for solving a problem onS . We say that the algorithm isminimal if it has all of the following properties:Minimum footprint: at all times, each machine uses only O(m) space of storage.
Bounded nettraffic: in each round, every machine sends and receives at most O(m) words of information over the network.
Constant round: the algorithm must terminate after a constant number of rounds.
Optimal computation: every machine performs only O(Tseq/t) amount of computation in total (i.e., summing over all rounds), where Tseq is the time needed to solve the same problem on a single sequential machine. The algorithm should achieve a speedup of t by using t machines in parallel.
It is fairly intuitive why minimal algorithms are appealing. First, a
minimum footprint ensures that each machine keepsO (1/t ) of the datasetS at any moment. This effectively preventspartition skew , where some machines are forced to handle considerably more thanm objects, as is a major cause of inefficiency in MapReduce [3].Second,
bounded nettraffic guarantees that the shuffle phase of each round transfers at mostO (m ·t ) =O (n ) words of network traffic overall. The duration of the phase equals roughly the time for a machine to send and receiveO (m ) words, because the data transfers between different machines are in parallel. Furthermore, this property is also useful when one wants to make an algorithmstateless for the purpose of fault tolerance.The third property
constant round is not new, as it has been the goal of many previous MapReduce algorithms. Importantly, this and the previous properties imply that there can be onlyO (n ) words of network traffic during theentire algorithm. Finally,optimal computation echoes the very original motivation of MapReduce to accomplish a computational taskt times faster than leveraging only one machine.2) Our Results
The core of this work comprises a neat minimal algorithm for:
Sorting. The input is a setS ofn objects drawn from an ordered domain. When the algorithm terminates, all the objects must have been distributed across thet machines in a sorted fashion. That is, we can order the machines from 1 tot such that all objects in machinei precede those in machinej for all 1 ≤i ≤j ≤t .Sorting can be settled in
O (n logn ) time on a sequential computer. There has been progress in developing MapReduce algorithms for this important operation. The state of the art isTeraSort [4], which won Jim Gray’s benchmark contest in 2009.TeraSort comes close to being minimal when a crucial parameter is set appropriately. As will be made clear later, the algorithm requires manual tuning of the parameter, an improper choice of which can incur severe performance penalties.Our work was initialized by an attempt to justify theoretically why
TeraSort often achieves excellent sorting time with only 2 rounds. In the first round, the algorithm extracts a random sample setS_{samp} of the inputS , and then pickst ？ 1 sampled objects as theboundary objects . Conceptually, these boundary objects divideS intot segments. In the second round, each of thet machines acquires all the objects in a distinct segment, and sorts them. The size ofS_{samp} is the key to efficiency. IfS_{samp} is too small, the boundary objects may be insufficiently scattered, which can cause partition skew in the second round. Conversely, an oversizedS_{samp} entails expensive sampling overhead. In the standard implementation ofTeraSort , the sample size is left as a parameter, although it always seems to admit a good choice that gives outstanding performance [4].We provide a rigorous explanation of the above phenomenon. Our theoretical analysis clarifies how to set the size of
S_{samp} to guarantee the minimality ofTeraSort . In the meantime, we also remedy a conceptual drawback ofTeraSort . Strictly speaking, this algorithm does not fit in the MapReduce framework, because it requires that, besides network messages, the machines should be able to communicate by reading/writing a common distributed file. Once this is disabled, the algorithm requires one more round. We present an elegant fix so that the algorithm still terminates in 2 rounds even by strictly adhering to MapReduce. Our findings withTeraSort have immediate practical significance, given the essential role of sorting in a large number of MapReduce programs.It is worth noting that a minimal algorithm for sorting leads to minimal algorithms for several fundamental database problems, including ranking,
groupby ,semijoin , andskyline [5].II. CRAWLING THE DEEP WEB
This section explains how to solve Problem 1 when the data space
is numeric. In Section IIA, we first define some atomic operators, and present an algorithm that is intuitive, but has no attractive performance bounds. Then, in Sections IIB and IIC, we present another algorithm, which achieves the optimal performance, as proven in Section IID.
> A. Basic Operations and Baseline Algorithm
Recall that in a numeric
the predicate of a query
q on each attribute is a range condition. Thus,q can be regarded as ad dimensional (axisparallel) rectangle, such that its resultq (D ) consists of the tuples ofD covered by that rectangle. If the predicate ofq on attributeA_{i} (i ∈ [1,d ]) isA_{i} ∈ [x _{1},x _{2}], we say that [x _{1},x _{2}] is the extent of the rectangle ofq alongA_{i} . Henceforth, we may use the symbolq to refer to its rectangle also, when no ambiguity can be caused. Clearly, settling Problem 1 is equivalent to determining the entireq (D ) whereq is the rectangle covering the wholeSplit. A fundamental idea to extract all the tuples inq (D ) is to refineq into a setS of smaller rectangles, such that each rectangleq' ∈S can be resolved (i.e.,q' (D ) has at mostk tuples). Note that this always happens as long as rectangleq' is sufficiently small. In an the extreme case, whenq' has degenerated into a point inthe query
q' is definitely resolved (otherwise, there would be at leastk + 1 tuples ofD at this point). Therefore, a basic operation in our algorithms for Problem 1 issplit .Given a rectangle
q , we may perform two types of splitting, depending on how many rectanglesq is divided into:1) 2way split: Let [x1, x2] be the extent of q on Ai (for some i ∈ [1, d]). A 2way split at a value x ∈ [x1, x2] partitions q into rectangles qleft and qright, by dividing the Aiextent of q at x. Formally, on any attribute other than Ai, qleft and qright have the same extents as q. Along Ai, however, the extent qleft is [x1, x ？ 1], whereas that of qright is [x, x2]. Fig. 2a illustrates the
idea by splitting on the horizontal attribute.
2) 3way split: Let [x1, x2] be defined as above. A 3way split at a value x ∈ [x1, x2] partitions q into rectangles qleft, qmid, and qright as follows. On any attribute other than Ai, they have the same extent as q. Along Ai, however, the extent of qleft is [x1, x ？ 1], that of qmid is [x, x], and that of qright is [x + 1, x2] (Fig. 2b).
In the sequel, a 2way split will be abbreviated simply as a split. No confusion can arise as long as we always mention 3way as referring to a 3way split. The extent of a query
q on an attributeA_{i} can become so short that it covers only a single value, in which case we say thatA_{i} isexhausted onq . For instance, the horizontal attribute is exhausted onq_{mid} in Fig. 2b. It is easy to see that there is always anonexhausted attribute onq unlessq has degenerated into a point.Binaryshrink. Next, we describe a straightforward algorithm for solving Problem 1, which will serve as the baseline approach for comparison. This algorithm, namedbinaryshrink , repeatedly performs 2way splits until a query is resolved. Specifically, given a rectangleq, binaryshrink runs the rectangle (by submitting its corresponding query to the server) and finishes ifq is resolved. Otherwise, the algorithm splitsq on an attributeA_{i} that has not been exhausted, by cutting the extent [x _{1},x _{2}] ofq alongA_{i} into equally long intervals (i.e., the split is performed atx = ？(x _{1}+x _{2})/2？). Letq_{left} ,q_{right} be the queries produced by the split. The algorithm then recurses onq_{left} andq_{right} .It is clear that the cost of
binaryshrink (i.e., the number of queries issued) depends on the domain sizes of the numeric attributes ofwhich can be
unbounded . In the following subsections, we will improve this algorithm to optimality.> B. OneDimensional Case
Before giving our ultimate algorithm for settling Problem 1 with any dimensionality
d , in this subsection, we first explain how it works ford = 1. This will clarify the rationale behind the algorithm’s efficiency, and facilitate our analysis for a generald . It is worth mentioning that the presence of only one attribute removes the need to specify the split dimension in describing a split.Rankshrink. Our algorithmrankshrink differs frombinaryshrink in two ways. First, when performing a 2way split, instead of cutting the extent of a queryq in half, we aim at ensuring that at leastk /4 tuples fall ineach of the rectangles generated by the split. Such a split, however, may not always be possible, which can happen if many tuples are identical to each other. Hence, the second difference thatrankshrink makes is to perform a 3way split in such a scenario, which gives birth to a query (among the 3 created) that can be immediately resolved.Formally, given a query
q , the algorithm eventually returnsq (D ). It starts by issuingq to the server, which returns a bagR of tuples. Ifq is resolved, the algorithm terminates by reportingR . Otherwise (i.e., in the event thatq overflows), we sort the tuples ofR in ascending order, breaking ties arbitrarily. Leto be the (k /2)th tuple in the sorted order, with itsA _{1}value beingx . Now, we count the numberc of tuples inR identical too (i.e.,R hasc tuples withA _{1}valuex ), and proceed as follows:1) Case 1: c ≤ k/4. Split q at x into qleft and qright, each of which must contain at least k/4 tuples in R. To see this for qleft (symmetric reasoning applies to qright), note there are at least k/2 ？ c ≥ k/4 tuples of R strictly smaller than x, all of which fall in qleft. The case for qright follows in analogy.
2) Case 2: c > k/4. Perform a 3way split on q at x. Let qleft, qmid, and qright be the resulting rectangles (note that the ordering among them matters; see Section IIB). Observe that qmid has degenerated into point x, and therefore, can immediately be resolved. As a technical remark, in Case 2, x might be the lower (resp. upper) bound―x cannot be both because otherwise q would be a point and therefore could not have overflowed―on the extent of q. If this happens, we simply discard qleft (or qright) as it would have a meaningless extent.
In either case, we are left with at most two queries (i.e.,
q_{left} andq_{right} ) to further process. The algorithm handles each of them recursively in the same manner.Example. We use the datasetD in Fig. 3a to demonstrate the algorithm. Letk = 4. The first query isq _{1} = (？∞, ∞). Suppose that the server responds by returningR _{1} = {t _{4},t _{6},t _{7},t _{8}} and a signal thatq _{1} overflows. The (k /2) = 2nd smallest tuple inR _{1} ist _{6} (after random tie breaking), whose value isx = 55. AsR _{1} hasc = 3 tuples with value 55 andc >k /4 = 1, we perform a 3way split onq _{1} at 55, generatingq _{2} = (？∞, 54],q _{3} = [55, 55], andq _{4} = [56, ∞). Asq _{3} has degenerated into a point, it is resolved immediately, fetchingt _{6},t _{7}, andt _{8}. These tuples have already been extracted before, but this time they come with an extra fact that no more tuple can exist at point 55.Consider
q _{2}. Suppose that the server’s response isR _{2} = {t _{1},t _{2},t _{4},t _{5}}, plus an overflow signal. Hence,x = 20 andc = 1. Thus, a 2way split onq _{2} at 20 createsq _{5} = (？∞, 19]and
q _{6} = [20, 54]. Queriesq _{4},q _{5}, andq _{6} are all resolved.Analysis. The lemma below bounds the cost ofrankshrink .LEMMA 1. When d = 1,rankshrink requires O(n/k) queries. The main tool used by our proof is aProof. recursion tree T that captures the spawning relationships of the queries performed byrankshrink . Specifically, each node ofT represents a query. Nodeu is the parent of nodeu' if queryu' is created by a 2way or 3way split of queryu . Each internal node thus has 2 or 3 child nodes. Fig. 3b shows the recursion tree for the queries performed in our earlier example on Fig. 3a.We focus on bounding the number of leaves in
T because it dominates the number of internal nodes. Observe that each leafv corresponds to adisjoint interval indom (A _{1}), due to the way splits are carried out. There are three types ofv :Type1: the query represented by v is immediately resolved in a 3way split (i.e., qmid in Case 2). The interval of v contains at least k/4 identical tuples in D.
Type2: query v is not type1, but also covers at least k/4 tuples in D.
Type3: query v covers less than k/4 tuples in D.
For example, among the leaf nodes in Fig. 3,
q _{3} is of type1,q _{5} andq _{6} are of type2, andq _{4} is of type3.As the intervals of various leaves cover disjoint bags of tuples, the number of type1 and type2 leaves is at
Each leaf of type3 must have a sibling in
T that is a type2 leaf (in Fig. 3, such a sibling ofq _{4} isq _{3}). In contrast, a type2 leaf has at most 2 siblings. It thus follows that there are at most twice as many type3 leaves as type2, i.e., the number of type3 leaves is no more than 8n /k . This completes the proof.This analysis implies that quite loosely,
T has no more than 4n /k + 8n /k = 12n /k leaves. Thus, there cannot be more than this number of internal nodes inT . □> C. RankShrink for Higher Dimensionality
We are now ready to extend
rankshrink to handle anyd > 1. In addition to the ideas exhibited in the preceding subsection, we also apply an inductive approach, which involves converting thed dimensional problem to several (d ？ 1)dimensional ones. Our discussion below assumes that the (d ？ 1)dimensional problem has already been settled byrankshrink .Given a query
q , the algorithm (as in 1d) sets out to solicit the server’s responseR , and finishes ifq is resolved. Otherwise, it examines whetherA _{1} is exhausted inq , and whether the extent ofq onA _{1} has only 1 valuex indom (A _{1}). If so, we can then focus on attributesA _{2}, ...,A _{d}. This is a (d ？ 1)dimensional version of Problem 1, in the (d ？ 1)dimensional subspace covered by the extents ofq onA _{2}, ...,A _{d}, eliminatingA _{1} by fixing it tox . Hence, we invokerankshrink to solve it.Consider that
A _{1} is not exhausted onq . Similar to the 1d algorithm, we will splitq such that either every resulting rectangle covers at leastk /4 tuples inR , or one of them can be immediately solved as a (d ？ 1)dimensional problem. The splitting proceeds exactly as described in Cases 1 and 2 of Section II. The only difference is that the rectangleq_{mid} in Case is not a point, but instead, a rectangle on whichA _{1} has been exhausted. Hence,q_{mid} is processed as a (d ？ 1)dimensional problem withrankshrink .As with the 1d case, the algorithm recurses on
q_{left} andq_{right} (provided that they have not been discarded for having a meaningless extent onA _{1}).Example. We demonstrate the algorithm using the 2d dataset in Fig. 4, whereD has 10 tuplest _{1}, ...,t _{10}. Letk = 4. The first queryq _{1} issued covers the entire data space. Suppose that the server responds withR _{1} = {t _{4},t _{7},t _{8},t _{9}} and an overflow signal. We splitq _{1} 3ways atA _{1} = 80 intoq _{2},q _{3}, andq _{4}, whose rectangles can be found in Fig. 4. TheA _{1}extents ofq _{2},q _{3},and q _{4} are (？∞, 79], [80, 80], and [81, ∞), respectively, while theirA _{2}extents are all (？∞, ∞). Note thatA _{1} is exhausted onq _{2}; alternatively, we can see thatq _{2} is equivalent to a 1d query on the vertical lineA _{1} = 80. Hence,q _{2} is recursively settled by our 1d algorithm (requiring 3 queries, which can be verified easily).Suppose that the server’s response to
q _{2} isR _{2} = {t _{2},t _{3},t _{4},t _{5}} and an overflow signal. Accordingly,q _{2} is split intoq _{5} andq _{6} atA _{1} = 40, whose rectangles are also shown in Fig. 4. Finally,q _{4},q _{5}, andq _{6} are all resolved.Analysis. We have the lemma below for generald :LEMMA 2. Rankshrink performs O (dn/k) queries. The caseProof. d = 1 has been proven in Lemma 1.Next, assuming that
rankshrink issues at mostα (d ？ 1)n /k queries for solving a (d ？ 1)dimensional problem withn tuples (whereα is a positive constant), we will show that the cost is at mostαdn/k for dimensionalityd .Again, our argument leverages a recursion tree
T . As before, each node ofT is a query, such that nodeu parents nodeu' , if queryu' was created from splittingu . We make a queryv a leaf ofT as soon as one of the following occurs:v is resolved. We associate v with a weight set to 1.
A1 is exhausted on rectangle v. Recall that such a query is solved as a (d ？ 1)dimensional problem. We associate v with a weight, equal to the cost for rankshrink for that problem.
For our earlier example in Fig. 4, the recursion tree
T happens to be the same as the one in Fig. 3b. The difference is that each leaf has a weight. Specifically, the weight ofq _{3} is 3 (i.e., the cost of solving the 1d query at the vertical lineA _{1} = 80 in Fig. 4), and the weights of the other leaves are 1.Therefore, the total cost of
rankshrink on thed dimensional problem is equal to the total number of internal nodes inT , plus the total weight of all the leaves.As the
A _{1}extents of the leaves’ rectangles have no overlap, their rectangles cover disjoint tuples. Let us classify the leaves into type1, 2, and 3, as in the proof of Lemma 1, by adapting the definition of type1 in a straightforward fashion:v is of this type if it is the middle nodeq_{mid} from a 3way split. Each typeleaf has weight 1 (as its corresponding query must be resolved). As proved in Lemma 1, the number of them is no more than8n /k .Let
v _{1},...,v_{β} be all the type1 and type2 nodes (i.e., suppose the number of them isβ ). Assume that nodev_{i} containsn_{i} tuples ofD . It holds thatThe weight of
v_{i} , by our inductive assumption, is at mostα (d ？ 1)n_{i} /k. Hence, the total weight of all the type1 and type2 nodes does not exceedα (d ？ 1)n /k .The same argument in the proof of Lemma 1 shows that
T has less than 12n /k internal nodes. Thus, summarizing the above analysis, the cost ofd dimensionalrankshrink is no more than:To complete our inductive proof, we want
to be bounded from above by
αdn/k . This is true for anyα ≥ 20. □Remark. This concludes the proof of the first bullet of Theorem 1. Whend is a fixed value (as is true in practice), the time complexity in Lemma 2 becomesO (n /k ), asymptotically matching the trivial lower boundn /k . A natural question at this point of whether there is an algorithm that can still guarantee costO (n /k ) ifd is not constant, Next, we will show that this is impossible.> D. A Lower Bound
The objective of this subsection is to establish:
THEOREM 3. Let k, d, and m be arbitrary positive integers such that d ≤ k. There is a dataset D (in a numeric data space) with n = m(k + d) tuples such that any algorithm must use at least dm queries to solve Problem 1 on D .It is therefore impossible to improve our algorithm
rankshrink (see Lemma 2) by more than a constant factor in the worst case, as shown below:COROLLARY 1. In a numeric data space, no algorithm can guarantee solving Problem 1 with o(dn/k) queries. If there existed such an algorithm, let us use it on the inputs in Theorem 3. The cost isProof. o (dn /k ) =o (dm (k +d )/k ) which, due tod ≤k , iso (dm ), causing a contradiction. □We now proceed to prove Theorem 3 using a hard dataset
D , as illustrated in Fig. 5. The domain of each attribute is the set of integers from 1 tom +1, orD hasm groups ofd +k tuples. Specifically, thei th group (1 ≤i ≤m ) hask tuples at the point (i , ...,i ), taking valuei on all attributes. We call themdiagonal tuples . Furthermore, for eachj ∈ [1,d ], groupi also has a tuple that takes valuei + 1 on attributeA_{j} , andi on all other attributes. Such a tuple is referred to as anondiagonal tuple. Overall,
D haskm diagonal anddm nondiagonal tuples.Let
S be the set ofdm points inthat are equivalent to the
dm nondiagonal tuples inD , respectively (i.e., each point inS corresponds to a distinct nondiagonal tuple). As explained in Section IIA, each query can be regarded as an axisparallel rectangle inWith this correspondence in mind, we observe the following for any algorithm that correctly solves Problem 1 on
D .LEMMA 3. When the algorithm terminates, each point in S must be covered by a distinct resolved query already performed. Every pointProof. p ∈S must be covered by a resolved query. Otherwise,p is either never covered by any query, or covered by only overflowing queries. In the former case, the tuple ofD atp could not have been retrieved, whereas in the latter, the algorithm could not rule out the possibility thatD had more than one tuple atp . In neither case could the algorithm have terminated.Next, we show that no resolved query
q covers more than one point inS . Otherwise, assume thatq containsp _{1} andp _{2} inS , in which caseq fully encloses the minimum bounding rectangle, denoted asr , ofp _{1} andp _{2}. Without loss of generality, suppose thatp _{1} (p_{j} ) is from groupi (j ) such thati ≤j . Ifi =j , thenr contains the point (i , ...,i ), in which case at leastk + 2 tuples satisfyq (i.e.,p _{1},p _{2}, and thek diagonal tuples from groupi ). Alternatively, consideri <j . In this scenario, the coordinate ofp _{1} is at mosti + 1 ≤j on all attributes, while the coordinate ofp _{2} is at leastj on all attributes. Thus,r contains the point (j , ...,j ), causing at leastk + 2 tuples to satisfyq (i.e.,p _{1},p _{2}, and thek diagonal tuples from groupj ). Therefore,q must overflow in any case, which is a contradiction. □The lemma indicates that at least 
S  =dm queries must be performed, which validates Theorem 3.III. MAPREDUCE
As explained earlier, a MapReduce algorithm proceeds in
rounds , where each round has three phases:map ,shuffle , andreduce . As all machines execute a program in the same way, next we focus on one specific machineM .Map. In this phase,M generates a list of keyvalue pairs (k ,v ) from its local storage. While the keyk is usually numeric, the valuev can contain arbitrary information. The pair (k ,v ) will be transmitted to another machine in the shuffle phase, such that the recipient machine is determinedsolely byk , as will be clarified shortly.Shuffle. LetL be the list of keyvalue pairs that all the machines produced in the map phase. The shuffle phase distributesL across the machines adhering to the constraint that pairs with the same key must be delivered to the same machine. That is, if (k ,v _{1}), (k ,v _{2}),..., (k ,v_{x} ) are the pairs inL having a common keyk , all of them will arrive at an identical machine.Reduce. M incorporates the keyvalue pairs received from the previous phase into its local storage. Then, it carries out whatever processing is needed on its local data. After all machines have completed the reduce phase, the current round terminates.Discussion. It is clear that the machines communicate only in the shuffle phase, whereas in the other phases each machine executes the algorithm sequentially, focusing on its own storage. Overall, parallel computing happens mainly in the reduce phase. The major role of the map and shuffle phases is to swap data among the machines, so that computation can take place on different combinations of objects.Simplified view of our algorithms. Let us number thet machines of the MapReduce system arbitrarily from 1 tot . In the map phase, all our algorithms will adopt the convention thatM generates a keyvalue pair (k, v) if and only if it wants to send v to machine k . In other words, the key field is explicitly the ID of the recipient machine.This convention admits a conceptually simpler modeling. In describing our algorithms, we will combine the map and shuffle phases into one called
mapshuffle. In the mapshuffle phase, M delivers v to machine k, which means thatM creates (k ,v ) in the map phase, which is then transmitted to machinek in the shuffle phase. The equivalence also explains why the simplification is only at the logical level, while physically, all our algorithms are still implemented in the standard MapReduce paradigm.Statelessness for fault tolerance. Some MapReduce implementations (e.g., Hadoop) require that at the end of a round, each machine should send all the data in its storage to adistributed file system (DFS), which, in our context, can be understood as a “disk in the cloud” that guarantees consistent storage (i.e., it never fails). The objective is to improve the system’s robustness in the scenario where a machine collapses during the algorithm’s execution. In such a case, the system can replace this machine with another one, ask the new machine to load the storage of the old machine at the end of the previous round, and redo the current round (where the machine failure occurred). Such a system is calledstateless , because intuitively, no machine is responsible for remembering any state of the algorithm [6].The four minimality conditions defined in Section I ensure efficient enforcement of statelessness. In particular, a
minimum footprint guarantees that at each round, every machine sendsO (m ) words to the DFS, which is still consistent withbounded traffic .In the
sorting problem , the input is a setS ofn objects from an ordered domain. For simplicity, we assume that objects are real values, because our discussion easily generalizes to other ordered domains. LetM _{1},...,M_{t} denote the machines in the MapReduce system. Initially,S is distributed across these machines, each storingO (m ) objects, wherem =n /t . At the end of sorting, all objects inM_{i} must precede those inM_{j} for any 1 ≤i <j ≤t .> A. TeraSort
Parameterized by
ρ ∈ (0,1],TeraSort [4] runs as follows:For convenience, the procedure above sometimes asks a machine
M to send data to itself. Needless to say, such data “transfer” occurs internally inM , with no network transmission. Also, note the assumption at the mapshuffle phase of Round 2, which we call thebroadcast assumption , and will deal with later in Section IIIC.In O’Malley’s study [4],
ρ was left as an open parameter. Next, we analyze the setting of this value to makeTeraSort a minimal algorithm.> B. Choice of ρ
Define
S_{i} =S ？ (b_{i} _{？1},b_{i} ], for 1 ≤i ≤t . In Round 2, all the objects inS_{i} are gathered byM_{i} , which sorts them in the reduce phase. ForTeraSort to be minimal, the following must hold:P1. s = O(m).
P2. Si= O(m) for all 1 ≤ i ≤ t.
Specifically,
P _{1} is necessary becauseM _{1} receivesO (s ) objects over the network in the mapshuffle phase of Round 1, which has to beO (m ) to satisfybounded nettraffic (see Section I).P _{2} is necessary becauseM_{i} must receive and storeO (S_{i} ) words in Round 2, which needs to beO (m ) to qualify asbounded nettraffic with aminimum footprint .We now establish an important fact about
TeraSort :THEOREM 4. When m ≥t ln(nt ),P _{1}and P _{2}hold simultaneously with a probability of at least We will considerProof. t ≥ 9, because otherwise,m = Ω(n ), in which caseP _{1} andP _{2} hold trivially. Our proof is based on the Chernoff bound―letX _{1},…,X_{n} be independent Bernoulli variables withPr [X_{i} =1] =p_{i} , for 1 ≤i ≤n .The Chernoff bound states (i) for any 0 <
α < 1,Pr [X ≥ (1 +α )μ ] ≤ exp(？α ^{2}μ /3) whilePr [X ≤ (1 ？α )μ ] ≤ exp(？α ^{2}μ /3), and (ii)Pr [X ≥ 6μ ] ≤ 2？^{6μ}―and an interesting bucketing argument.First, it is easy to see that
E [s ] =mρt =t ln(nt ).A simple application of the Chernoff bound results in:
Pr[s ≥ 1.6 · t ln(nt)] ≤ exp(？0.12 · t ln(nt)) ≤ 1/n
where the last inequality uses the fact that
t ≥ 9. This implies thatP _{1} can fail with a probability of at most 1/n . Next, we analyzeP _{2} under the events < 1.6t ln(nt ) =O (m ).Imagine that
S has been sorted in ascending order. We divide the sorted list intosublists as evenly as possible, and call each sublist a
bucket . Each bucket has between 8n /t = 8m and 16m objects. We observe thatP _{2}holds if every bucket covers at least one boundary object . To understand why, note that under this condition, no bucket can fall between two consecutive boundary objects (counting also the dummy ones―if there was one, the bucket would not be able to cover any boundary object). Hence, everyS_{i} , 1 ≤i ≤t , can contain objects in at most 2 buckets, i.e., S_{i}  ≤ 32m =O (m ).A bucket
β definitely includes a boundary object ifβ covers more than 1.6 ln(nt ) >s /t samples (i.e., objects fromS_{samp} ), as a boundary object is taken everyconsecutive samples. Let 
β  ≥ 8m be the number of objects inβ . Random variablex_{j} , 1 ≤j ≤ β  is defined to be 1 if thej th object inβ is sampled, and 0 otherwise. Define:Clearly,
E [X ] ≥ 8mρ = 8 ln(nt ). We have:We say that
β fails if it covers no boundary object. The above derivation shows thatβ fails with a probability of at most 1/(nt ). As there are at mostt /8 buckets, the probability that at least one bucket fails is at most 1/(8n ). Hence,P _{2} can be violated with a probability of at most 1/(8n ) under the events < 1.6t ln(nt ), i.e., at most 9/8n overall.Therefore,
P _{1} andP _{2} hold at the same time with a probability of at least 1 ？ 17/(8n ). □Discussion. For largen , the success probability 1 ？O (1/n ) in Theorem 4 is so high that the failure probabilityO (1/n ) is negligible, i.e.,P _{1} andP _{2} are almost never violated.The condition about m in Theorem 4 is tight within a logarithmic factor, because
m ≥t is an implicit condition forTeraSort to work, with both the reduce phase of Round 1 and the mapshuffle phase of Round 2 requiring a machine to storet ？ 1 boundary objects.In reality, typically,
m ≫t , and the memory size of a machine is significantly greater than the number of machines. More specifically,m is on the order of at least 10^{6} (this is using only a few megabytes per machine), whilet is on the order of 10^{4} or lower. Therefore,m ≥t ln(nt ) is a (very) reasonable assumption, which explains whyTeraSort has excellent efficiency in practice.Minimality. We now establish the minimality ofTeraSort , temporarily ignoring how to fulfill the broadcast assumption. PropertiesP _{1} andP _{2} indicate that each machine needs to store onlyO (m ) objects at any time, consistent with aminimum footprint . Regarding the network cost, a machineM in each round sends only objects that were already onM when the algorithm started. Hence,M sendsO (m ) network data per round. Furthermore,M _{1} receives onlyO (m ) objects byP _{1}. Therefore,boundedbandwidth is fulfilled.Constant round is obviously satisfied. Finally, the computation time of each machineM_{i} (1 ≤i ≤t ) is dominated by the cost of sortingS_{i} in Round 2, i.e.,As this is 1/
t of theO (n logn ) time of a sequential algorithm,optimal computation is also achieved.> C. Removing the Broadcast Assumption
Before Round 2 of
TeraSort ,M _{1} needs tobroadcast the boundary objectsb _{1},…,b_{t} _{？1} to the other machines. We have to be careful because a naive solution would askM _{1} to sendO (t ) words to every other machine, and hence, incurO (t ^{2}) network traffic overall. This not only requires one more round, but also violatesbounded nettraffic ift exceedsby a nonconstant factor.
In O’Malley’s study [4], this issue was circumvented by assuming that all the machines can access a distributed file system. In this scenario,
M _{1} can simply write the boundary objects to a file on that system, after which eachM_{i} , 2 ≤i ≤t , obtains them from the file. In other words, a bruteforcefileaccessing step is inserted between the two rounds. This is allowed by the current Hadoop implementation (on whichTeraSort was based [4]).Technically, however, the above approach destroys the elegance of
TeraSort , because it requires that, besides sending keyvalue pairs to each other, the machines should also communicate via a distributed file. This implies that the machines are not sharenothing , because they are essentially sharing the file. Furthermore, as far as this paper is concerned, the artifact is inconsistent with the definition of minimal algorithms. As sorting lingers in all the problems to be discussed later, we are motivated to remove the artifact to keep our analytical framework clean.We now provide an elegant remedy, which allows
TeraSort to still terminate in 2 rounds, and retain its minimality. The idea is to giveall machines a copy ofS_{samp} . Specifically, we modify Round 1 ofTeraSort as:Round 2 still proceeds as before. The correctness follows from the fact that in the reduce phase, every machine picks boundary objects in exactly the same way from an identical
S_{samp} . Therefore, all machines will obtain the same boundary objects, thus eliminating the need for broadcasting. Henceforth, we will call the modified algorithmpure TeraSort .At first glance, the new mapshuffle phase of Round 1 may seem to require a machine
M to send out considerable data, because every sample necessitatesO (t ) words of network traffic (i.e.,O (1) to every other machine). However, as every object is sampled with probabilitythe number of words sent by
M is onlyO (m ·t ·ρ ) =O (t ln(nt )) in expectation. The lemma below gives a much stronger fact:LEMMA 4. With a probability of at least every machine sends O (t ln(nt ))words over the network in Round 1 of pure TeraSort . Consider an arbitrary machineProof. M . Let random variableX be the number of objects sampled fromM . Hence,E [X ] =mρ = ln(nt ). A straightforward application of the Chernoff bound gives:Pr[X ≥ 6 ln(nt)] ≤ 2？6 ln(nt) ≤ 1/(nt).
Hence,
M sends more thanO (t ln(nt )) words in Round 1 with a probability of at most 1/(nt ). By union bound, the probability that this is true for allt machines is at least 1 ？ 1/n . □Combining the above lemma with Theorem 4 and the minimality analysis in Section IIIB, we can see that
pure TeraSort is a minimal algorithm with a probability of at least 1 ？O (1/n ) whenm ≥t ln(nt ).We close this section by pointing out that the fix of
TeraSort is of mainly theoretical concerns. The fix serves the purpose of convincing the reader that the broadcast assumption is not a technical “loose end” in achieving minimality. In practice,TeraSort has nearly the same performance as ourpure version, at least on Hadoop, where (as mentioned before) the bruteforce approach ofTeraSort is well supported.IV. FINAL REMARKS
We have obtained a nontrivial glimpse at the results from research during an appointment with the KAIST. Interested readers are referred previous studies [1,5] for additional details, including full surveys of the literature. Due to space constraints, in this paper, other results have not been described, but can be found online at http://www.cse.cuhk.edu.hk/~taoyf.

[Fig. 1.] Formbased querying of a hidden database.

[Fig. 2.] Splitting: (a) 2way and (b) 3way.

[Fig. 3.] Illustration of 1d rankshrink: (a) dataset D and queries and (b) recursion tree.

[Fig. 4.] Illustration of 2d rankshrink.

[Fig. 5.] A hard numeric dataset.