Cluster Management
A computing cluster comprises numerous interconnected machines, linked by a high-speed network, often referred to as a fabric. These clusters typically range from 100 to 10,000 machines. Users schedule their jobs on these machines.
Cluster Jobs
Clusters are versatile platforms capable of executing a wide array of jobs such as services, batch-processing jobs, and scientific computations.
Services
Users frequently deploy long-running services on clusters for high availability and scalability through replication across multiple machines. Numerous essential systems, such as distributed file systems, lock services, and distributed databases, operate as a set of smaller, interconnected services known as micro-services. Each service is a set of replicated processes running on the cluster. Each replica of a service is called a task.Batch Processing
The paradigm of processing large datasets in parallel through phases (such as map-shuffle-reduce) was established with the advent of MapReduce. In this model, a user-submitted data processing job is decomposed into a series of smaller processing phases. Each of these phases is executed as a distinct cluster job (it's important to distinguish this "cluster job" from the higher-level "data processing job" submitted by the user). Furthermore, each job is broken down into individual tasks, with each task running as a separate process on the cluster. A task is responsible for processing a specific slice of the overall data.
Note: In the diagram above, mappers and reducers may run as a single job called workers consisting of mix of map and reduce tasks.
Scientific Computations
Clusters are indispensable for tackling complex scientific problems, for instances,
climate modeling, drug discovery, computational fluid dynamics, and astrophysics simulations. Clusters specifically designed for scientific computations are often High-Performance Computing (HPC) clusters, equipped with enhanced hardware like GPUs, and high-speed, low-latency networking technologies such as RDMA (Remote Direct Memory Access) and RoCE (RDMA over Converged Ethernet).Containers
The fundamental requirements of tasks running in a cluster is the ability to run on any machines. To ensure tasks can run on any machine within the cluster, it's crucial that they include all necessary dependencies. This is commonly achieved through containerization using technologies like Docker.
Cluster Orchestrator
Clusters represent a powerful computing infrastructure, demanding significant investment for setup. Even tech giants like Google and Microsoft typically operate a limited number of clusters. Given the diverse range of jobs that need to be executed, efficient sharing of cluster resources among multiple users is important.
The finite machine resources within a cluster necessitate a mechanism for fair and effective allocation across different users. One initial approach, static partitioning (dividing machines permanently among users), proves to be inflexible and frequently leads to underutilization of resources.
- Task scheduling based on user requests.
- Task status tracking and reporting to users.
- Machine maintenance, including OS upgrades and software management.
- Resource usage monitoring and reporting.
Examples
Several sophisticated cluster orchestrators have been developed to manage and streamline resource allocation in large-scale computing environments. Examples include:
- Borg: Google's internal cluster management system, designed to handle a vast array of workloads across its data centers. It's highly scalable and efficient, managing diverse applications, from long-running services to batch-processing jobs. Borg's design heavily influenced the development of Kubernetes.
- Kubernetes (K8s): An open-source container orchestration system, originally designed by Google, that automates the deployment, scaling, and management of containerized applications. It has become the industry standard for container orchestration. Kubernetes is especially popular for managing Docker containers, but it can also handle other container runtimes.
- Docker Swarm: Docker's native clustering and orchestration solution. It allows users to create and manage a cluster of Docker nodes, enabling the deployment and scaling of containerized applications.
- Omega: A general-purpose cluster management system developed by Google, designed to address some of the limitations of Borg.
- HashiCorp Nomad: A simple orchestrator that can deploy and manage both containerized and non-containerized applications.
Scheduler
The scheduler is the core component of an orchestrator, responsible for assigning tasks to available machines.
There are several challenges that cluster schedulers need to solve:
- Colocation Interference: Occurs when multiple tasks running on the same machine interfere with each other's performance, leading to unpredictable behavior.
- Noisy Neighbors: A specific type of colocation interference where one resource-intensive task negatively impacts the performance of other tasks sharing the same hardware. This is a widely-known problem faced by several programs running on clusters. See The Tail at Scale by Jeff Dean.
- Machine Heterogeneity: Dealing with clusters composed of machines with varying hardware specifications and capabilities, making resource allocation and scheduling complex.
These issues have historically limited the adoption rate of cluster orchestrators in general. Notably, only Google's Borg has achieved exceptional machine utilization rates, ranging from 60% to 80%.
Types of Cluster Schedulers
Cluster schedulers come in various architectures:
Monolithic Scheduler:
Example: Google's Borg. Early big data systems employed monolithic schedulers, where a single component handled all task placement decisions. Drawbacks include inflexible policies and rapid code growth, making maintenance challenging.- The schedulers, responsible for making the actual task (container) placement decisions.
- The resource manager, which manages cluster resources and offers them to the schedulers.
Shared-State Scheduler:
Examples: Google Omega, HashiCorp Nomad. This model evolved from the two-level architecture to address its limitations. A key issue with two-level schedulers is that individual schedulers may have a limited view of the cluster, hindering optimal resource allocation. The shared-state model utilizes optimistic concurrency, where the cluster's state is modified through transactions. While this approach improves flexibility, it introduces the possibility of scheduling conflicts.Misc:
- Fully Distributed Scheduler: Often employs simple random placement algorithms. Results in suboptimal placement and is primarily of academic interest.
- Hybrid Placement: Schedulers that combine aspects of the other scheduler types to try to achieve the best result.
Agent
Resource Allocation
- CPU cycles - We measure the number of instructions that the task executes.
- RAM - We measure the amount of memory used by the task.
- Network usage - We measure the amount of bytes sent/received over the wire.
Fair Allocation
This algorithm allocates resources fairly among all requests.
Weighted Fair Allocation
This algorithm allocates resources proportionally to the requested amounts. Given allocation requests {x1, x2, ..., xn} for a resource with total quantity x, the resource allocated to job i is calculated as:
where Σxj represents the sum of all allocation requests.
Max-Min Fair Resource Allocation
Given allocation requests {x1, x2, ..., xn} for a resource x, an allocation is considered max-min fair if increasing any individual request xi necessitates decreasing another request xj where xj < xi.
Max-min fair allocation prioritizes smaller demands, ensuring they receive their requested resources. Any remaining capacity is then distributed equally among larger demands.
Max-min allocation happens in rounds. Consider requests {2, 3, 4, 5} with a total capacity of 12.- Round 1: Initial equal distribution {3, 3, 3, 3}.
- Round 2 Smallest request is satisfied {2, 3.33, 3.33, 3.33} - the slack from the first request is allocated to others.
- Round 3: Smallest request is satisfied {2, 3, 3.5, 3.5} - the slack from the second request is allocated to others.
- Round 4: Final max-min fair allocation {2, 3, 3.5, 3.5}.
Max-min fair sharing typically leads to improved resource utilization.
Scheduling
Resource allocation deals with the distribution of available resources among competing tasks. Following allocation, scheduling determines the temporal order of execution. Essentially, allocation answers what resources each job receives, while scheduling addresses when those resources are utilized.
Consider two jobs, each requiring 10,000 CPU cycles for completion. Resource allocation would assign 10,000 cycles to each job. Then, scheduling dictates the sequence of execution (which job gets to use the resource first).
Common non-preemptive scheduling algorithms, such as First-In, First-Out (FIFO) and Last-In, First-Out (LIFO), can be employed. However, non-preemptive approaches can lead to critical issues:
- Starvation: A long-running task can monopolize the CPU, preventing shorter tasks from executing for extended periods, or even indefinitely.
- Inefficient Resource Utilization: If a task encounters an I/O wait or other blocking event, the CPU remains idle until the task resumes, leading to wasted processing capacity.
- Lack of Priority Handling: Non-preemptive algorithms typically do not allow for prioritizing critical tasks, potentially causing delays in essential functions.
The Time Dimension in Resource Allocation
By incorporating time as a resource dimension, we can more accurately model both resource allocation and scheduling simultaneously.
Instead of simply allocating a quantity of resources, we allocate a resource usage rate over a specific time period. For instance, CPU resources can be represented as <CPU rate, time> instead of raw CPU cycles, and network usage as <bandwidth, time> instead of total bytes transferred/received.
CPU rate is essentially clock rate (each clock executes one instruction). It is fixed for all the CPUs on a machine. So, essentially, <CPU rate, time> boils down to <Num CPUs, time>. The resource allocation of <2 CPUs, 5s> means that a job can use the 2 CPUs for 5s.
If we fix the time period to one second, the allocated CPU count directly corresponds to the CPU usage within that second. It can be fractional as well. For example, say the resource allocation is <0.2 CPU>, then the job can execute 0.2 CPU worth of instructions in a second. Assuming the clock rate is 1 Ghz, then the job can execute 200M cycles worth of instructions in a second. Similarly, the allocated bandwidth within a second reflects the network usage in that second.
Modern schedulers allocate CPU resources as a rate instead of number cycles. Cloud providers, such as Google Cloud, offer virtual CPUs (vCPUs) which is same as CPU, however, in heterogeneous clusters with varying CPU clock rates, vCPUs provide a standardized abstraction for CPU capacity.
An analogy can be drawn with Joule (energy consumed) and Watt (energy consumption rate per second):
- Joule: CPU cycles :: Watt: CPU/vCPU
- Joule: Network usage :: Watt: Bandwidth
When resource allocation is defined in terms of rate, resource allocation algorithms naturally become scheduling mechanisms, such as:
- Fair Scheduling: Distributes available CPU or bandwidth equally among jobs. For CPU, this means each job receives an equal portion of the total available CPU. For bandwidth, it means an equal share of the network's capacity. This is essentially round-robin.
- Weighted Fair Scheduling: Allocates resources proportionally based on assigned weights. Jobs with higher weights receive a larger share of the CPU or bandwidth. This is essentially weighted round-robin.
- Max-Min Fair Scheduling: Aims to maximize the minimum allocation received by any job, ensuring fairness and preventing starvation. It prioritizes jobs with lower allocations, bringing them up to a fair share before distributing excess resources.
Delay Scheduling
In a distributed data-processing, there is one more resource that needs to be considered for scheduling - the data that is used for processing. Simply allocating resources without considering data locality lead to tasks being scheduled on nodes where their required data is not readily available. This necessitates data transfer across the network, significantly impacting performance due to increased latency and network bandwidth consumption.
To mitigate this issue, delay scheduling is employed (initially introduced for Hadoop). This technique introduces a deliberate delay before scheduling a task. Instead of immediately assigning the task to the first available resource, the scheduler waits for a short period, hoping that a node with the task's preferred data (i.e., local data) becomes available. If a local resource becomes free within the delay window, the task is scheduled there. Otherwise, it is scheduled on a remote node.
Delay scheduling statistically improves performance substantially by reducing data transfer and lowering latency.
Bin Packing Problem
The bin packing problem is a classic NP-hard optimization challenge: given a set of items with varying sizes and a finite number of bins (or containers), each with a fixed given capacity, the goal is to pack all items into the fewest possible bins. This problem arises frequently in computer science and other fields.
Examples
- Memory Allocation: Allocating fixed-size pages of RAM to processes, minimizing the number of physical memory pages used.
- Resource Scheduling in Cloud Computing: Packing tasks with varying resource demands (CPU, memory, storage) into cloud instances to optimize resource utilization and reduce costs.
- File Storage: Storing files of different sizes on storage devices with fixed capacities, minimizing the number of storage devices required.
- Network Packet Packing: Packing network packets of variable sizes into fixed size frames for transmission.
Approximation Algorithms
- First-Fit: Places each item into the first bin where it fits.
- Best-Fit: Places each item into the bin with the smallest remaining capacity that can accommodate it.
- Worst-Fit: Places each item into the bin with the largest remaining capacity.
- Almost-Worst-Fit: Similar to worst fit, but avoids the absolute worst fit bin.
- Next-Fit: Places each item into the last bin it checked. If the item doesn't fit, a new bin is opened.
- Next-k-Fit: A variation of Next-Fit, that checks the last k bins.
These algorithms provide approximate solutions with certain error bounds, indicating how far the solution might be from the optimal:
- Next-Fit: Has an error ratio of 2, meaning it may use up to twice the optimal number of bins.
- First-Fit, Best-Fit, Almost-Worst-Fit: Has an error ratio of approximately 1.7.
- Worst-Fit: Has an error ratio of 2.
These error bounds represent worst-case scenarios, and in practice, these algorithms often perform significantly better. The choice of algorithm depends on the specific application and the trade-off between solution quality and computational cost.
Hadoop v/s Spark
Hadoop, launched in 2006, emerged as the open-source counterpart to Google's MapReduce, mirroring its core functionality of breaking down big data processing jobs into map and reduce tasks. Notably, the Hadoop project also developed the Hadoop Distributed File System (HDFS), drawing inspiration from Google File System (GFS).
In contrast, Spark was specifically conceived with machine learning algorithms, such as logistic regression, in mind. The original whitepaper elaborates on this motivation, which can be summarized as follows:
Consider the iterative nature of logistic regression. A fundamental step involves computing the following linear combination in the map stage:
β + W1 * x1 + W2 * x2 + ... + Wn * xn
Here, β, W1, W2 ... Wn are model parameters and x1, x2, ... xn are the input features from each record. This calculation feeds into a sigmoid function to produce a probability, which will be close to either 0 or 1. The deviation of this predicted probability from the true label is then calculated as the error. In the reduce stage, these errors are aggregated across the entire dataset to determine the overall error of the current model.
Logistic regression is inherently iterative. The algorithm repeatedly refines the model parameters in each round until the error converges to a minimum. The final set of weights constitutes the trained logistic regression model. It's worth noting that this entire process mirrors a single perceptron (or logistic regression unit) within a deeper neural network architecture, where multiple such units are arranged in layers.
Spark's key innovation lies in its use of Resilient Distributed Datasets (RDDs). RDDs are fault-tolerant, immutable, distributed collections of data that can be stored in memory for the duration of a job. RDDs can originate directly from the input dataset or be the result of applying map or reduce operations to existing RDDs. By keeping RDDs in memory (with mechanisms to spill over to disk in case of memory pressure), Spark avoids the overhead of repeatedly reading data from storage. Due to the deterministic nature of map and reduce functions, RDDs can always be recomputed on demand if necessary.
This in-memory processing of RDDs is the primary differentiator between Spark and Hadoop. In Hadoop, each iteration of a machine learning training process necessitates reading the input data from storage and processing it anew. Conversely, Spark allows the input data to reside in memory as RDDs, readily available for subsequent processing rounds, leading to significant performance gains for iterative algorithms like logistic regression.
Message Passing Interface (MPI)
Message passing is a programming model where independent processes coordinate by exchanging data through messages. Each process operates within its own memory space, and communication happens via explicit sending and receiving of data.
The MPI is a standardized library that implements this model efficiently. Key benefits of MPI include:
- MPI libraries are highly optimized to deliver excellent performance.
- MPI leverages the fastest available communication method:
- Using shared memory for communication between processes on the same machine.
- Using TCP/IP for communication between processes on different nodes.
- Taking advantage of RDMA, if available, for highly efficient data transfer between nodes.
- MPI typically guarantees reliable delivery of messages and preserves the order in which they are sent.
The primary application of MPI lies in building parallel programs deployed on HPC clusters. An MPI parallel program launches multiple processes that communicate with each other through message passing. Each of these processes represents a task that needs to be scheduled on the cluster's machines. The MPI scheduler is responsible for managing this task placement and execution.
Example
A popular example of MPI program is Strassen's Matrix Multiplication (divide and conquer). Each tasks runs on a smaller slice of matrix and the results are merged together.
Indeed, many parallel algorithms can be written in MPI and executed in a distributed environment such as HPC that are optimized for communications.
MPI v/s MapReduce
Mesos
Mesos is presented as a two-level scheduler design for different workloads such as Hadoop and MPI, referred to as frameworks, running within a cluster.
Each framework runs certain user-submitted jobs on the cluster. Each job spawns several tasks. For instance, consider a Hadoop framework executing a word count job submitted by user. This would necessitate scheduling numerous map-reduce tasks. Similarly, consider an MPI framework running a ray tracing job, which requires several sub-tasks to operate concurrently for processing.
Use Cases
Mesos is optimized for short-lived tasks , such as those common in data processing or scientific computations. It is not intended for long-running services.
Consequently, downstream frameworks are incentivized to utilize:
- Short tasks: That is, tasks that run to completion with a short-time.
- Elastic scaling: Frameworks capable of elastic scaling (i.e., maintaining operation regardless of available resource fluctuations) tend to perform more effectively within the Mesos environment.
Architecture
Mesos features a replicated master for high availability, with mastership coordinated through ZooKeeper. There are multiple schedulers each for a particular framework. Each scheduler registers its resource requirements with the master.
In addition to the master and schedulers, an executor (akin to an agent) runs on each machine in the cluster. These executors are responsible for receiving and executing the tasks assigned to them.
Mesos supports multiple frameworks concurrently. For instance, Hadoop and MPI frameworks can run simultaneously on the same cluster, with each framework having its own dedicated scheduler.
The Resource Broker
Essentially, the Mesos master acts as a resource broker, facilitating the allocation of resources between the schedulers and the executors. Executors report the available resources on their respective machines back to the Mesos master in the form of resource offers. The master then employs a resource allocator to distribute these resources to the registered schedulers.
The schedulers then allocate tasks for these resources which is forwarded to the executors.
Resource Allocation
Each scheduler specifies a filter defining resources it will accept. Filter can be of two types:
- Minimum Resources - Only offer nodes with at least R resource free. Any allocation below this threshold will be rejected by the scheduler.
- Preference List - Only offer nodes from a list L. Any allocation on nodes outside of the list will be rejected. This is used for specifying preferences (e.g. for data locality).
The resource allocation algorithm allocates resources to the schedulers. The algorithm is pluggable and so can be replaced with a custom one. By default, a variant of max-min called Dominant Resource Fairness algorithm is used for allocating resources as per filters. Only the allocations passing the filters are sent to the scheduler.
Mesos can also claim back resources by revoking the tasks. In this terms, Mesos also has a concept of guaranteed allocations to a framework. Tasks won't be revoked for frameworks with utilization less than guaranteed allocations.
Isolation
Behavior Analysis
This paper stands out by including a dedicated section on detailed behavior analysis, which is usually not found in other works. The paper only provides the result. The detailed derivation of the results is presented in their technical report.
For analysis, the authors differentiates between two framework types:
- Elastic: These frameworks can dynamically adjust their resource allocation. Increased resources lead to faster processing, but the framework remains functional even with fewer resources. Hadoop exemplifies this type.
- Rigid: These frameworks require a fixed amount of resources to execute. MPI, which needs all resources to run parallel programs, falls into this category.
For behavior analysis purposes, the cluster is conceptually divided into N equivalent slots. For instance, if each machine has 5 CPUs and 10 GB of RAM and is divided into 5 slots, each slot represents resources equivalent to <. The authors assume each task occupies a single slot, implying uniform resource requirements across tasks.
The analysis focuses on three key metrics:
- Framework ramp-up time: The duration required to acquire all necessary resources for a job.
- Job completion time: The total time a framework takes to complete all its jobs.
- System utilization: The overall resource utilization of the cluster.
The authors also consider two types of task distribution:
- Constant: All tasks within a job have the same completion time.
- Exponential: Task completion times vary according to an exponential distribution.
Homogeneous Case
In the homogeneous scenario, all frameworks exhibit the same task distribution (either constant or exponential).
Consider a framework f allocated k slots. If the total computation required for framework f is β * k * T, where T is the mean task duration, and k slots are available, the computation will complete in β * T time.
The detailed derivations are available in the technical report and summarized in Table 2. Intuitively:
- For a constant distribution, the ramp-up time is T (the time for each task). This is because after time T, all existing slots will be freed, including the k slots for new task allocation. For an exponential distribution, the ramp-up time is longer (approximately Tlnk) as longer tasks would take more time to release their slots.
- For an elastic framework, the completion time is . The initial T accounts for the time to acquire the necessary slots after the first task is scheduled (see the technical report for a detailed derivation). For rigid frameworks, the completion time is higher as no task can begin till ramp-up is complete.
- Rigid frameworks exhibit lower utilization because their tasks cannot commence until the entire ramp-up phase is complete.
In essence, elastic frameworks and constant task distributions lead to better performance. Non-rigid frameworks with exponential task distributions perform the worst.
Heterogeneous Tasks
When task distributions are heterogeneous, such as a bimodal distribution with both short and long tasks, short tasks can experience starvation behind long-running tasks. The authors suggest the following scheduler strategies to mitigate this:
- Random task assignment: Probabilistically increases the chance of scheduling short tasks. The probability depends on the fraction φ of the long tasks to total tasks. The probability that at least 1 short task will be scheduled on a machine with S slots is (1 - φS).
- Reserve slots for short tasks: Similar to creating separate queues in HPC.
Placement Preferences
Each scheduler can have a preferred list of slots (essentially, preferred nodes). However, not all scheduler frameworks can be guaranteed their preferred slots. In such instances, Mesos can employ lottery scheduling to distribute preferred slots randomly.
Lottery scheduling is an instantaneous form of weighted fair allocation. In this context, the weight is the total intended allocation of a framework (si).
Implementation
- Hadoop's fine-grained map and reduce tasks align well with Mesos's resource management model. Notably, Hadoop already provides a pluggable API for developing schedulers. To enhance data locality, delay scheduling was implemented within these schedulers themselves.
- For MPI integration, the Torque scheduler was adapted for Mesos. When MPI jobs are submitted to the queue, the Torque scheduler begins accepting resource offers from Mesos. Upon accepting an offer on a specific node, tasks from the queued jobs are executed there. Given that Torque tasks are resource-intensive, Torque employs guaranteed allocation and avoids accepting resources beyond its reservation to prevent unnecessary task revocations.
- Finally, Spark was also integrated with Mesos. Spark executors store the results of map-reduce-like operations (i.e., RDDs) in memory. These cached RDDs can be reused in subsequent iterations, significantly accelerating processing.
Evaluation
- In the macrobenchmark section, Mesos is compared against static partitioning. The results demonstrate that Mesos achieves superior cluster utilization of CPU resources when Hadoop, Spark, and Torque are run concurrently.
- Torque's execution time take a hit, due to its non-elastic nature.
- Hadoop, particularly with a large mix of jobs, exhibits the most significant performance gains due to its elasticity.
- However, smaller Hadoop jobs experience a performance degradation. This is attributed to the resource broker layer within Mesos, which introduces additional overhead.
- Mesos enables frameworks to scale up dynamically by leveraging resources when other frameworks have lower demands.
- While Mesos introduces a slight overhead, the impact is minimal. For instance, an MPI job took 50.9 seconds without Mesos and 51.8 seconds with Mesos, while Hadoop execution times were 160 seconds versus 166 seconds, respectively. In both cases, the overhead is less than 4%. Nevertheless, this overhead becomes substantial for small Hadoop jobs.
- Mesos performs effectively with Hadoop jobs that benefit from data locality. Employing delay scheduling reduces the running time by half. With a 5-second delay in scheduling, 100% data locality is achieved.
- The Spark framework, a key integration within Mesos, demonstrates significantly better performance than Hadoop. While the first iteration yields comparable results to Hadoop, subsequent iterations show substantial improvements, with a tenfold reduction in running time.
- Mesos exhibits low overhead even at a scale of 50,000 nodes, indicating high scalability.
- The Mean Time To Recovery (MTTR) for the master node is approximately 4-8 seconds.
Comments
Post a Comment