Skip to main content

Paper Insights #28 - Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center

This paper, presented at NSDI in 2011, comes from the UC Berkeley Systems Lab, with authorship by influential figures like Matei Zaharia (Spark's creator and Databricks co-founder), Ali Ghodsi (CEO of Databricks), Scott Shenker, and Ion Stoica. UC Berkeley's Systems Lab is a powerhouse in computer systems research, and their current work on Sky Computing—envisioning a cloud computing marketplace—is truly groundbreaking.

The concepts explored in these papers are closely intertwined with the development of several influential projects at UC Berkeley, including Spark, Delay Scheduling, Dominant Resource Fairness, and the technical report detailing Mesos – all of which were being actively researched and built concurrently by the same authors.


Let's begin with some basic concepts (and there are many of them).

Cluster Management

A computing cluster comprises numerous interconnected machines, linked by a high-speed network, often referred to as a fabric. These clusters typically range from 100 to 10,000 machines. Users schedule their jobs on these machines.

Cluster Jobs

Clusters are versatile platforms capable of executing a wide array of jobs such as services, batch-processing jobs, and scientific computations.

Services

Users frequently deploy long-running services on clusters for high availability and scalability through replication across multiple machines. Numerous essential systems, such as distributed file systems, lock services, and distributed databases, operate as a set of smaller, interconnected services known as micro-services. Each service is a set of replicated processes running on the cluster. Each replica of a service is called a task.

Services represent the most common type of jobs scheduled by users in cloud computing environments.

Batch Processing

The rise of Big Data has necessitated distributed processing as datasets outgrow the capacity of single machines. Consequently, most modern data processing applications are executed across distributed machines within a cluster. Typically, specialized frameworks (such as Hadoop) run as services on these clusters, accepting data processing jobs as input and orchestrating their execution.

The paradigm of processing large datasets in parallel through phases (such as map-shuffle-reduce) was established with the advent of MapReduce. In this model, a user-submitted data processing job is decomposed into a series of smaller processing phases. Each of these phases is executed as a distinct cluster job (it's important to distinguish this "cluster job" from the higher-level "data processing job" submitted by the user). Furthermore, each job is broken down into individual tasks, with each task running as a separate process on the cluster. A task is responsible for processing a specific slice of the overall data.


Note: In the diagram above, mappers and reducers may run as a single job called workers consisting of mix of map and reduce tasks.

Scientific Computations

Clusters are indispensable for tackling complex scientific problems, for instances, 

climate modeling, drug discovery, computational fluid dynamics, and astrophysics simulations. Clusters specifically designed for scientific computations are often High-Performance Computing (HPC) clusters, equipped with enhanced hardware like GPUs, and high-speed, low-latency networking technologies such as RDMA (Remote Direct Memory Access) and RoCE (RDMA over Converged Ethernet).

These HPC clusters are particularly prevalent in academic research environments.

Containers

The fundamental requirements of tasks running in a cluster is the ability to run on any machines. To ensure tasks can run on any machine within the cluster, it's crucial that they include all necessary dependencies. This is commonly achieved through containerization using technologies like Docker.

Cluster Orchestrator

Clusters represent a powerful computing infrastructure, demanding significant investment for setup. Even tech giants like Google and Microsoft typically operate a limited number of clusters. Given the diverse range of jobs that need to be executed, efficient sharing of cluster resources among multiple users is important.

The finite machine resources within a cluster necessitate a mechanism for fair and effective allocation across different users. One initial approach, static partitioning (dividing machines permanently among users), proves to be inflexible and frequently leads to underutilization of resources.

Consequently, a cluster orchestrator is essential for efficient cluster management. This orchestrator performs several key functions:
  1. Task scheduling based on user requests.
  2. Task status tracking and reporting to users.
  3. Machine maintenance, including OS upgrades and software management.
  4. Resource usage monitoring and reporting.


Examples

Several sophisticated cluster orchestrators have been developed to manage and streamline resource allocation in large-scale computing environments. Examples include:

  • Borg: Google's internal cluster management system, designed to handle a vast array of workloads across its data centers. It's highly scalable and efficient, managing diverse applications, from long-running services to batch-processing jobs. Borg's design heavily influenced the development of Kubernetes.
  • Kubernetes (K8s): An open-source container orchestration system, originally designed by Google, that automates the deployment, scaling, and management of containerized applications. It has become the industry standard for container orchestration. Kubernetes is especially popular for managing Docker containers, but it can also handle other container runtimes. 
  • Docker Swarm: Docker's native clustering and orchestration solution. It allows users to create and manage a cluster of Docker nodes, enabling the deployment and scaling of containerized applications. 
  • Omega: A general-purpose cluster management system developed by Google, designed to address some of the limitations of Borg.
  • HashiCorp Nomad: A simple orchestrator that can deploy and manage both containerized and non-containerized applications.

Scheduler

The scheduler is the core component of an orchestrator, responsible for assigning tasks to available machines.

There are several challenges that cluster schedulers need to solve:

  • Colocation Interference: Occurs when multiple tasks running on the same machine interfere with each other's performance, leading to unpredictable behavior.
  • Noisy Neighbors: A specific type of colocation interference where one resource-intensive task negatively impacts the performance of other tasks sharing the same hardware. This is a widely-known problem faced by several programs running on clusters. See The Tail at Scale by Jeff Dean.
  • Machine Heterogeneity: Dealing with clusters composed of machines with varying hardware specifications and capabilities, making resource allocation and scheduling complex.

These issues have historically limited the adoption rate of cluster orchestrators in general. Notably, only Google's Borg has achieved exceptional machine utilization rates, ranging from 60% to 80%.

Types of Cluster Schedulers

Cluster schedulers come in various architectures:

Monolithic Scheduler

Example: Google's Borg. Early big data systems employed monolithic schedulers, where a single component handled all task placement decisions. Drawbacks include inflexible policies and rapid code growth, making maintenance challenging.

Two-Level SchedulerExample: Apache Mesos. This model separates concerns into two distinct levels:
  • The schedulers, responsible for making the actual task (container) placement decisions.
  • The resource manager, which manages cluster resources and offers them to the schedulers.

Shared-State Scheduler

Examples: Google Omega, HashiCorp Nomad. This model evolved from the two-level architecture to address its limitations. A key issue with two-level schedulers is that individual schedulers may have a limited view of the cluster, hindering optimal resource allocation. The shared-state model utilizes optimistic concurrency, where the cluster's state is modified through transactions. While this approach improves flexibility, it introduces the possibility of scheduling conflicts.


Misc:

  • Fully Distributed Scheduler: Often employs simple random placement algorithms. Results in suboptimal placement and is primarily of academic interest.
  • Hybrid Placement: Schedulers that combine aspects of the other scheduler types to try to achieve the best result. 

Agent

Apart from the scheduler, a cluster orchestrator has an agent running on each machine which receives tasks from the scheduler and executes them, reporting back their status and health.

Resource Allocation

Common examples of resources allocated to tasks include:
  • CPU cycles - We measure the number of instructions that the task executes.
  • RAM - We measure the amount of memory used by the task.
  • Network usage - We measure the amount of bytes sent/received over the wire.
There are also other resources like GPU cycles and disk spindle usages. But we will focus our discussion on these three. 

All these resources are scarce on cluster machines and it is important for tasks to share them. When a resource X must be shared among multiple jobs, a variety of algorithms can be employed to distribute it effectively.

Fair Allocation

This algorithm allocates resources fairly among all requests.

Weighted Fair Allocation

This algorithm allocates resources proportionally to the requested amounts. Given allocation requests {x1, x2, ..., xn} for a resource with total quantity x, the resource allocated to job i is calculated as:

x * (xi / Σxj)

where Σxj represents the sum of all allocation requests.

Max-Min Fair Resource Allocation

Given allocation requests {x1, x2, ..., xn} for a resource x, an allocation is considered max-min fair if increasing any individual request xi necessitates decreasing another request xj where xj < xi.

Max-min fair allocation prioritizes smaller demands, ensuring they receive their requested resources. Any remaining capacity is then distributed equally among larger demands.

Max-min allocation happens in rounds. Consider requests {2, 3, 4, 5} with a total capacity of 12.
  • Round 1: Initial equal distribution {3, 3, 3, 3}.
  • Round 2 Smallest request is satisfied {2, 3.33, 3.33, 3.33} - the slack from the first request is allocated to others.
  • Round 3: Smallest request is satisfied {2, 3, 3.5, 3.5} - the slack from the second request is allocated to others.
  • Round 4: Final max-min fair allocation {2, 3, 3.5, 3.5}.

Max-min fair sharing typically leads to improved resource utilization.

Scheduling

Resource allocation deals with the distribution of available resources among competing tasks. Following allocation, scheduling determines the temporal order of execution. Essentially, allocation answers what resources each job receives, while scheduling addresses when those resources are utilized.

Consider two jobs, each requiring 10,000 CPU cycles for completion. Resource allocation would assign 10,000 cycles to each job. Then, scheduling dictates the sequence of execution (which job gets to use the resource first).

Common non-preemptive scheduling algorithms, such as First-In, First-Out (FIFO) and Last-In, First-Out (LIFO), can be employed. However, non-preemptive approaches can lead to critical issues:

  • Starvation: A long-running task can monopolize the CPU, preventing shorter tasks from executing for extended periods, or even indefinitely.
  • Inefficient Resource Utilization: If a task encounters an I/O wait or other blocking event, the CPU remains idle until the task resumes, leading to wasted processing capacity.
  • Lack of Priority Handling: Non-preemptive algorithms typically do not allow for prioritizing critical tasks, potentially causing delays in essential functions.
Round-robin scheduling is a preemptive algorithm where each task is allocated a fixed time slice, or quantum. Tasks execute for their assigned time slice and then yield the resources to the next task in the queue. This process repeats cyclically until all tasks complete execution.

The Time Dimension in Resource Allocation

By incorporating time as a resource dimension, we can more accurately model both resource allocation and scheduling simultaneously.

Instead of simply allocating a quantity of resources, we allocate a resource usage rate over a specific time period. For instance, CPU resources can be represented as <CPU rate, time> instead of raw CPU cycles, and network usage as <bandwidth, time> instead of total bytes transferred/received. 

CPU rate is essentially clock rate (each clock executes one instruction). It is fixed for all the CPUs on a machine. So, essentially, <CPU rate, time> boils down to <Num CPUs, time>. The resource allocation of <2 CPUs, 5s> means that a job can use the 2 CPUs for 5s. 

If we fix the time period to one second, the allocated CPU count directly corresponds to the CPU usage within that second. It can be fractional as well. For example, say the resource allocation is <0.2 CPU>, then the job can execute 0.2 CPU worth of instructions in a second. Assuming the clock rate is 1 Ghz, then the job can execute 200M cycles worth of instructions in a second. Similarly, the allocated bandwidth within a second reflects the network usage in that second. 

Modern schedulers allocate CPU resources as a rate instead of number cycles. Cloud providers, such as Google Cloud, offer virtual CPUs (vCPUs) which is same as CPU, however, in heterogeneous clusters with varying CPU clock rates, vCPUs provide a standardized abstraction for CPU capacity.

An analogy can be drawn with Joule (energy consumed) and Watt (energy consumption rate per second):

  • Joule: CPU cycles :: Watt: CPU/vCPU
  • Joule: Network usage :: Watt: Bandwidth

When resource allocation is defined in terms of rate, resource allocation algorithms naturally become scheduling mechanisms, such as:

  • Fair Scheduling: Distributes available CPU or bandwidth equally among jobs. For CPU, this means each job receives an equal portion of the total available CPU. For bandwidth, it means an equal share of the network's capacity. This is essentially round-robin.
  • Weighted Fair Scheduling: Allocates resources proportionally based on assigned weights. Jobs with higher weights receive a larger share of the CPU or bandwidth. This is essentially weighted round-robin.
  • Max-Min Fair Scheduling: Aims to maximize the minimum allocation received by any job, ensuring fairness and preventing starvation. It prioritizes jobs with lower allocations, bringing them up to a fair share before distributing excess resources.
Note: RAM is allocated as a static quantity and not as a rate. This is because RAM usage is constant regardless of a job's execution state - running or preempted (blocked).

Delay Scheduling

In a distributed data-processing, there is one more resource that needs to be considered for scheduling - the data that is used for processing. Simply allocating resources without considering data locality lead to tasks being scheduled on nodes where their required data is not readily available. This necessitates data transfer across the network, significantly impacting performance due to increased latency and network bandwidth consumption.

To mitigate this issue, delay scheduling is employed (initially introduced for Hadoop). This technique introduces a deliberate delay before scheduling a task. Instead of immediately assigning the task to the first available resource, the scheduler waits for a short period, hoping that a node with the task's preferred data (i.e., local data) becomes available. If a local resource becomes free within the delay window, the task is scheduled there. Otherwise, it is scheduled on a remote node.

Delay scheduling statistically improves performance substantially by reducing data transfer and lowering latency.

Bin Packing Problem

The bin packing problem is a classic NP-hard optimization challenge: given a set of items with varying sizes and a finite number of bins (or containers), each with a fixed given capacity, the goal is to pack all items into the fewest possible bins. This problem arises frequently in computer science and other fields.

Examples

  • Memory Allocation: Allocating fixed-size pages of RAM to processes, minimizing the number of physical memory pages used.
  • Resource Scheduling in Cloud Computing: Packing tasks with varying resource demands (CPU, memory, storage) into cloud instances to optimize resource utilization and reduce costs.
  • File Storage: Storing files of different sizes on storage devices with fixed capacities, minimizing the number of storage devices required.
  • Network Packet Packing: Packing network packets of variable sizes into fixed size frames for transmission.

Approximation Algorithms

Even though the problem is NP-hard, there are several approximation algorithms to solve the problem:
  • First-Fit: Places each item into the first bin where it fits.
  • Best-Fit: Places each item into the bin with the smallest remaining capacity that can accommodate it.
  • Worst-Fit: Places each item into the bin with the largest remaining capacity.
  • Almost-Worst-Fit: Similar to worst fit, but avoids the absolute worst fit bin.
  • Next-Fit: Places each item into the last bin it checked. If the item doesn't fit, a new bin is opened.
  • Next-k-Fit: A variation of Next-Fit, that checks the last k bins.

These algorithms provide approximate solutions with certain error bounds, indicating how far the solution might be from the optimal:

  • Next-Fit: Has an error ratio of 2, meaning it may use up to twice the optimal number of bins.
  • First-Fit, Best-Fit, Almost-Worst-Fit: Has an error ratio of approximately 1.7.
  • Worst-Fit: Has an error ratio of 2.

These error bounds represent worst-case scenarios, and in practice, these algorithms often perform significantly better. The choice of algorithm depends on the specific application and the trade-off between solution quality and computational cost.

Hadoop v/s Spark

Hadoop, launched in 2006, emerged as the open-source counterpart to Google's MapReduce, mirroring its core functionality of breaking down big data processing jobs into map and reduce tasks. Notably, the Hadoop project also developed the Hadoop Distributed File System (HDFS), drawing inspiration from Google File System (GFS).

In contrast, Spark was specifically conceived with machine learning algorithms, such as logistic regression, in mind. The original whitepaper elaborates on this motivation, which can be summarized as follows:

Consider the iterative nature of logistic regression. A fundamental step involves computing the following linear combination in the map stage:

β + W1 * x1 + W2 * x2 + ... + Wn * xn

Here, β, W1, W2 ... Wn are model parameters and x1, x2, ... xn are the input features from each record. This calculation feeds into a sigmoid function to produce a probability, which will be close to either 0 or 1. The deviation of this predicted probability from the true label is then calculated as the error. In the reduce stage, these errors are aggregated across the entire dataset to determine the overall error of the current model.

Logistic regression is inherently iterative. The algorithm repeatedly refines the model parameters in each round until the error converges to a minimum. The final set of weights constitutes the trained logistic regression model. It's worth noting that this entire process mirrors a single perceptron (or logistic regression unit) within a deeper neural network architecture, where multiple such units are arranged in layers.

Spark's key innovation lies in its use of Resilient Distributed Datasets (RDDs). RDDs are fault-tolerant, immutable, distributed collections of data that can be stored in memory for the duration of a job. RDDs can originate directly from the input dataset or be the result of applying map or reduce operations to existing RDDs. By keeping RDDs in memory (with mechanisms to spill over to disk in case of memory pressure), Spark avoids the overhead of repeatedly reading data from storage. Due to the deterministic nature of map and reduce functions, RDDs can always be recomputed on demand if necessary.

This in-memory processing of RDDs is the primary differentiator between Spark and Hadoop. In Hadoop, each iteration of a machine learning training process necessitates reading the input data from storage and processing it anew. Conversely, Spark allows the input data to reside in memory as RDDs, readily available for subsequent processing rounds, leading to significant performance gains for iterative algorithms like logistic regression.

Message Passing Interface (MPI)

Message passing is a programming model where independent processes coordinate by exchanging data through messages. Each process operates within its own memory space, and communication happens via explicit sending and receiving of data.

The MPI is a standardized library that implements this model efficiently. Key benefits of MPI include:

  • MPI libraries are highly optimized to deliver excellent performance.
  • MPI leverages the fastest available communication method:
    • Using shared memory for communication between processes on the same machine.
    • Using TCP/IP for communication between processes on different nodes.
    • Taking advantage of RDMA, if available, for highly efficient data transfer between nodes.
  • MPI typically guarantees reliable delivery of messages and preserves the order in which they are sent.

The primary application of MPI lies in building parallel programs deployed on HPC clusters. An MPI parallel program launches multiple processes that communicate with each other through message passing. Each of these processes represents a task that needs to be scheduled on the cluster's machines. The MPI scheduler is responsible for managing this task placement and execution.

Example

A popular example of MPI program is Strassen's Matrix Multiplication (divide and conquer). Each tasks runs on a smaller slice of matrix and the results are merged together.

Indeed, many parallel algorithms can be written in MPI and executed in a distributed environment such as HPC that are optimized for communications.

MPI v/s MapReduce

Many computations that MPI can perform can also be executed in a distributed MapReduce environment. However, MapReduce has significant communication overhead. MPI, on the other hand, is optimized for communication and hence should be the preferred choice for computationally heavy programs that require substantial communication. Many programs that are well-suited for parallel programming are also better suited for MPI than MapReduce.

Mesos

Mesos is presented as a two-level scheduler design for different workloads such as Hadoop and MPI, referred to as frameworks, running within a cluster.

Each framework runs certain user-submitted jobs on the cluster. Each job spawns several tasks. For instance, consider a Hadoop framework executing a word count job submitted by user. This would necessitate scheduling numerous map-reduce tasks. Similarly, consider an MPI framework running a ray tracing job, which requires several sub-tasks to operate concurrently for processing.

Use Cases

Mesos is optimized for short-lived tasks , such as those common in data processing or scientific computations. It is not intended for long-running services.

Consequently, downstream frameworks are incentivized to utilize:

  • Short tasks: That is, tasks that run to completion with a short-time.
  • Elastic scaling: Frameworks capable of elastic scaling (i.e., maintaining operation regardless of available resource fluctuations) tend to perform more effectively within the Mesos environment.

Architecture

Please note that this paper is a bit older, and I apologize for the use of some potentially sensitive terms within it. To avoid any misinterpretations, I will adhere to the original terminology used in the paper.

Mesos features a replicated master for high availability, with mastership coordinated through ZooKeeper. There are multiple schedulers each for a particular framework. Each scheduler registers its resource requirements with the master.

In addition to the master and schedulers, an executor (akin to an agent) runs on each machine in the cluster. These executors are responsible for receiving and executing the tasks assigned to them.

Mesos supports multiple frameworks concurrently. For instance, Hadoop and MPI frameworks can run simultaneously on the same cluster, with each framework having its own dedicated scheduler.

The Resource Broker

Essentially, the Mesos master acts as a resource broker, facilitating the allocation of resources between the schedulers and the executors. Executors report the available resources on their respective machines back to the Mesos master in the form of resource offers. The master then employs a resource allocator to distribute these resources to the registered schedulers.

The schedulers then allocate tasks for these resources which is forwarded to the executors.

Resource Allocation

Each scheduler specifies a filter defining resources it will accept. Filter can be of two types:

  • Minimum Resources - Only offer nodes with at least R resource free. Any allocation below this threshold will be rejected by the scheduler.

  • Preference List - Only offer nodes from a list L. Any allocation on nodes outside of the list will be rejected. This is used for specifying preferences (e.g. for data locality).

The resource allocation algorithm allocates resources to the schedulers. The algorithm is pluggable and so can be replaced with a custom one. By default, a variant of max-min called Dominant Resource Fairness algorithm is used for allocating resources as per filters. Only the allocations passing the filters are sent to the scheduler.

Mesos can also claim back resources by revoking the tasks. In this terms, Mesos also has a concept of guaranteed allocations to a framework. Tasks won't be revoked for frameworks with utilization less than guaranteed allocations.

Isolation

Multiple frameworks can run their jobs on the same machine and they are isolated by leveraging Linux containers which can isolate CPU, memory, and network bandwidth.

Behavior Analysis

This paper stands out by including a dedicated section on detailed behavior analysis, which is usually not found in other works. The paper only provides the result. The detailed derivation of the results is presented in their technical report

For analysis, the authors differentiates between two framework types:

  1. Elastic: These frameworks can dynamically adjust their resource allocation. Increased resources lead to faster processing, but the framework remains functional even with fewer resources. Hadoop exemplifies this type.
  2. Rigid: These frameworks require a fixed amount of resources to execute. MPI, which needs all resources to run parallel programs, falls into this category.

For behavior analysis purposes, the cluster is conceptually divided into N equivalent slots. For instance, if each machine has 5 CPUs and 10 GB of RAM and is divided into 5 slots, each slot represents resources equivalent to <. The authors assume each task occupies a single slot, implying uniform resource requirements across tasks.

The analysis focuses on three key metrics:

  1. Framework ramp-up time: The duration required to acquire all necessary resources for a job.
  2. Job completion time: The total time a framework takes to complete all its jobs.
  3. System utilization: The overall resource utilization of the cluster.

The authors also consider two types of task distribution:

  1. Constant: All tasks within a job have the same completion time.
  2. Exponential: Task completion times vary according to an exponential distribution.

Homogeneous Case

In the homogeneous scenario, all frameworks exhibit the same task distribution (either constant or exponential).

Consider a framework f allocated k slots. If the total computation required for framework f is β * k * T, where T is the mean task duration, and k slots are available, the computation will complete in β * T time.

The detailed derivations are available in the technical report and summarized in Table 2. Intuitively:

  • For a constant distribution, the ramp-up time is T (the time for each task). This is because after time T, all existing slots will be freed, including the k slots for new task allocation. For an exponential distribution, the ramp-up time is longer (approximately Tlnk) as longer tasks would take more time to release their slots.
  • For an elastic framework, the completion time is . The initial T accounts for the time to acquire the necessary slots after the first task is scheduled (see the technical report for a detailed derivation). For rigid frameworks, the completion time is higher as no task can begin till ramp-up is complete.
  • Rigid frameworks exhibit lower utilization because their tasks cannot commence until the entire ramp-up phase is complete.

In essence, elastic frameworks and constant task distributions lead to better performance. Non-rigid frameworks with exponential task distributions perform the worst.

Heterogeneous Tasks

When task distributions are heterogeneous, such as a bimodal distribution with both short and long tasks, short tasks can experience starvation behind long-running tasks. The authors suggest the following scheduler strategies to mitigate this:

  1. Random task assignment: Probabilistically increases the chance of scheduling short tasks. The probability depends on the fraction φ of the long tasks to total tasks. The probability that at least 1 short task will be scheduled on a machine with S slots is (1 - φS).
  2. Reserve slots for short tasks: Similar to creating separate queues in HPC.

Placement Preferences

Each scheduler can have a preferred list of slots (essentially, preferred nodes). However, not all scheduler frameworks can be guaranteed their preferred slots. In such instances, Mesos can employ lottery scheduling to distribute preferred slots randomly.

Lottery scheduling is an instantaneous form of weighted fair allocation. In this context, the weight is the total intended allocation of a framework (si).

Implementation

  • Hadoop's fine-grained map and reduce tasks align well with Mesos's resource management model. Notably, Hadoop already provides a pluggable API for developing schedulers. To enhance data locality, delay scheduling was implemented within these schedulers themselves.
  • For MPI integration, the Torque scheduler was adapted for Mesos. When MPI jobs are submitted to the queue, the Torque scheduler begins accepting resource offers from Mesos. Upon accepting an offer on a specific node, tasks from the queued jobs are executed there. Given that Torque tasks are resource-intensive, Torque employs guaranteed allocation and avoids accepting resources beyond its reservation to prevent unnecessary task revocations.
  • Finally, Spark was also integrated with Mesos. Spark executors store the results of map-reduce-like operations (i.e., RDDs) in memory. These cached RDDs can be reused in subsequent iterations, significantly accelerating processing.

Evaluation

The paper presents a remarkably thorough evaluation, a level of detail rarely encountered in research publications.

  • In the macrobenchmark section, Mesos is compared against static partitioning. The results demonstrate that Mesos achieves superior cluster utilization of CPU resources when Hadoop, Spark, and Torque are run concurrently. 
    • Torque's execution time take a hit, due to its non-elastic nature. 
    • Hadoop, particularly with a large mix of jobs, exhibits the most significant performance gains due to its elasticity.
    • However, smaller Hadoop jobs experience a performance degradation. This is attributed to the resource broker layer within Mesos, which introduces additional overhead.
    • Mesos enables frameworks to scale up dynamically by leveraging resources when other frameworks have lower demands.
  • While Mesos introduces a slight overhead, the impact is minimal. For instance, an MPI job took 50.9 seconds without Mesos and 51.8 seconds with Mesos, while Hadoop execution times were 160 seconds versus 166 seconds, respectively. In both cases, the overhead is less than 4%. Nevertheless, this overhead becomes substantial for small Hadoop jobs.
  • Mesos performs effectively with Hadoop jobs that benefit from data locality. Employing delay scheduling reduces the running time by half. With a 5-second delay in scheduling, 100% data locality is achieved.
  • The Spark framework, a key integration within Mesos, demonstrates significantly better performance than Hadoop. While the first iteration yields comparable results to Hadoop, subsequent iterations show substantial improvements, with a tenfold reduction in running time.
  • Mesos exhibits low overhead even at a scale of 50,000 nodes, indicating high scalability.
  • The Mean Time To Recovery (MTTR) for the master node is approximately 4-8 seconds.

Paper Review

The authors' deep expertise in large-scale processing systems is evident throughout this paper, offering valuable insights into the challenges and solutions encountered in this domain. While my early experiences with program execution in college only hinted at the complexities addressed here, it's clear that both Big Data processing and scientific computations are critical for industry. Consequently, the system developed by the authors represents a truly significant achievement in computing. I wholeheartedly recommend reading this paper as an excellent introduction to the types of problems that are well-suited for cluster computing environments.

Comments

Popular Posts

Paper Insights #25 - CliqueMap: Productionizing an RMA-Based Distributed Caching System

Memcached is a popular in-memory cache, but I'd like to discuss CliqueMap, Google's caching solution. Having worked closely with CliqueMap, I have a deep understanding of its architecture. One major difference from Memcached is CliqueMap's use of RMA for reads. We'll also take a closer look at RDMA, a crucial cloud technology that emerged in the 2010s. Paper Link Let's begin with some basic concepts. Network Interface Card (NIC) The NIC facilitates data reception and transmission. Understanding its operation requires examining the fundamental interaction between the CPU and memory. CPU <-> Memory Communication In a Von Neumann Architecture , the CPU and memory are core components, enabling Turing computation. Their communication relies on the system bus (e.g. PCIe ), a set of electrical pathways connecting the CPU, memory, and I/O devices. The system bus comprises three primary logical components: Data Bus : Bidirectional, carrying the actual data being tran...

Paper Insights #26 - Don't Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS

This work provides a strong foundation for understanding causality , both within distributed systems and more broadly. Its principles underpin systems achieving causal consistency, a powerful form of consistency that ensures high availability. Presented at SOSP 2011, this paper features contributions from prominent distributed systems researchers Wyatt Lloyd and Michael Freedman . Paper Link Let's begin with some basic concepts. Causal Ordering In 1978, Leslie Lamport published Time, Clocks, and the Ordering of Events in a Distributed System , a seminal paper that significantly impacted distributed system design. This work, alongside Paxos and TLA+ , stands as one of Lamport's most influential contributions. A fundamental challenge in distributed systems is clock synchronization . Perfect synchronization is unattainable, a fact rooted in both computer science and physics. However, the goal isn't perfect synchronization itself, but rather the ability to totally order even...

Paper Insights #24 - Spanner: Google's Globally-Distributed Database

This landmark paper, presented at ODSI '12, has become one of Google's most significant contributions to distributed computing. It didn't solve the long-standing core problem of scalability of 2PC in distributed systems, rather, it introduced  TrueTime  that revolutionized system assumptions. Authored by J.C. Corbett , with contributions from pioneers like Jeff Dean and Sanjay Ghemawat , this paper effectively ended my exploration of distributed SQL databases. It represents the leading edge of the field. Paper Link I would highly recommend reading the following before jumping into this article: 1.  Practical Uses of Synchronized Clocks in Distributed Systems where I introduced why clock synchronization is necessary but not sufficient for external consistency. 2.  A New Presumed Commit Optimization for Two Phase Commit where I introduced two-phase commits (2PC) and how it is solved in a distributed system. 3.  Amazon Aurora: Design Considerations for High Th...