Skip to main content

Paper Insights #29 - Autopilot: Workload Autoscaling at Google Scale

This paper from Google was presented at Eurosys 2020. It has a lot of statistics. However, it represents one of the most important concepts in cluster/cloud computing - scaling - and it is important to explore those concepts in system design.

Paper Link

Recommended Read: Paper Insights - Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center where I introduced cluster computing and resource allocation. 

Borg

Borg is a cluster orchestrator developed by Google for managing its clusters. The design of Kubernetes was significantly influenced by Borg.

Jobs and Tasks

A Borg cluster consists of roughly 100 to 10,000 physical machines connected through a high-speed network fabric. Users submit jobs for execution on these machines, and these jobs are categorized into several types:

  1. Services: These are long-duration jobs that frequently constitute components of larger systems, such as those employing a microservices architecture. A service is generally replicated, and an individual instances is called task.
  2. Batch: These jobs are typically composed of short-lived data-processing tasks.

A task executes as a containerized process on the cluster machines.

Given Google's extensive infrastructure for internet-scale services, most jobs running within Borg are services. Services occupy the majority of machine capacity, with batch jobs utilizing the remaining available capacity.

Scaling

Every job running in a cluster/cloud requires resources. The most critical of these are:

  • CPU
  • RAM

Note that with the rise of data-intensive and compute-intensive applications (such as ML training), memory bandwidth is also becoming a crucial resource. Similarly, network bandwidth is critical for many network-intensive applications. However, this paper focuses solely on CPU and RAM, assuming an abundance of network and memory bandwidth. These assumptions will likely evolve with changes in the computing landscape.

When users submit jobs to cluster/cloud, they need to specify resource requirements. However, these specifications can often be inaccurate, either overestimating or underestimating the actual needs of the job. For instance, if a service experiences a 5x increase in user requests compared to the expected volume, its resources will need to scale up. Conversely, if the service receives 10x fewer requests than anticipated, resources should be scaled down.

Types

Resource scaling for a job can be categorized into two primary types:

  • Vertical Scaling: Increasing or decreasing the resources allocated to each task.

  • Horizontal Scaling: Adding or removing tasks.

Which to prefer?

Numerous online articles discuss the optimal scenarios for each approach. However, based on my experience with large-scale distributed systems, I can confidently state that the decision is context-dependent. It hinges on numerous factors, and there is no universally ideal solution.

Cluster orchestrators often favor vertical scaling as it tends to keep the number of tasks lower, which can expedite the computation of optimal resource assignments. On the other hand, I've encountered applications that had to scale out horizontally because the NICs on individual machines couldn't handle the required network capacity.

Therefore, my answer to the question of preference is: experiment to determine what yields the best results for your specific use case.

Limits & Oversubscriptions

Even with resource scaling, establishing limits is crucial. For example, users should define upper bounds for the RAM and CPU that each task can consume. A user might set a task limit to <2 CPU, 4 GB RAM>.

These limits are often based on the worst-case observed scenario, which may not accurately reflect a task's actual, typical resource needs. The average resource utilization can be significantly lower than the defined limits. Many tasks exhibit diurnal usage patterns, with high usage during specific hours of a day and low usage at other times.

To address this discrepancy, cluster/cloud providers employs oversubscription on machines. Consider a machine with a capacity of 256 GB RAM and 64 CPU, hosting tasks with the following limits:

  • 20 CPU, 100 GB RAM
  • 40 CPU, 50 GB RAM
  • 40 CPU, 100 GB RAM

All these tasks can be scheduled on the same machine, with CPU resources being oversubscribed under the assumption that not all tasks will simultaneously utilize their maximum allocated CPU.

Oversubscription is vital; without it, machines often remain underutilized.

Furthermore, these limits are often soft limits. For example, if the first task uses 50 CPU while other tasks are idle, this is permissible (soft limiting). However, soft limiting transitions to hard limiting when there's resource contention. If other tasks begin to use their allocated 40 CPU, the first task will experience preemption to prevent it from exceeding its 20 CPU limit. The presence of priorities further complicates this dynamic.

Enforceable vs. Non-enforceable Resources

CPU is an enforceable resource. If a task exceeds its hard CPU limit, it will be descheduled to enforce adherence to the limit. In contrast, RAM limits are non-enforceable. Cluster machines typically have limited swap space (due to diskless architecture choices), so if a task surpasses a certain hard RAM limit, it must be Out-Of-Memory (OOM) terminated.

Chargeback Model

Users consuming resources on a cluster/cloud are typically charged for their usage, as computing operation isn't free. There are real costs associated with acquiring machines, setting up networks, and maintaining real estate, among other expenses.

A straightforward chargeback model could involve charging users based on the resource limits they set. However, this approach might not lead to competitive pricing. Charging users based on their actual resource usage might seem more equitable. Yet, even usage-based charging presents challenges, as a user's average usage can be significantly lower than their peak demand. For example, a user might average 1 GB of RAM usage but occasionally spike to 100 GB. Ultimately, cluster/cloud providers must provision for these peak demands. Therefore, charging solely on average usage would be unfavorable for service providers.

A more effective approach could be to charge users based on their usage plus a portion of the oversubscription cost, distributed proportionally among the tasks.

In the example above, if the actual average CPU usages are 10, 20, and 25, then:

Base charge for user 1: 10
Base charge for user 2: 20
Base charge for user 3: 25

Slack: 64 - (10 + 20 + 25) = 9

Oversubscription Tax for user 1: (20 / (20 + 40 + 40)) * 9 = 1.8
Oversubscription Tax for user 2: (40 / (20 + 40 + 40)) * 9 = 3.6
Oversubscription Tax for user 3: (40 / (20 + 40 + 40)) * 9 = 3.6

Autopilot

Autopilot is designed to automate the scaling of resources for jobs running on Borg.

The primary goal of Autopilot is to automate two key aspects of resources for a job:

  • Setting resource limits for individual tasks (vertical autoscaling) and
  • Adjusting the number of running tasks (horizontal autoscaling).
Automating the setting of resources is especially important because these resources directly determine the cost charged back to users. Therefore, optimizing this process can lead to substantial cost reductions for users.

Architecture

Autopilot's architecture is based on a triple closed-loop control system. One loop manages horizontal scaling, while the other two independently control per-task CPU and memory resources.

Each job is treated as an isolated entity, meaning there is no learning or information sharing across different jobs.


The data flow within this closed-loop system is as follows:

  1. Resource usage data of a task is continuously logged and exported in the form of time series. For instance, the CPU utilization of a specific task over time.
  2. The resource usage time series for all tasks of a job are fed into recommender systems. These recommenders analyze the data and generate suggested job resource limits, which are then passed to the Autopilot service.
  3. The Autopilot service takes these recommended job limits and communicates them to an actuator component. The actuator then translates these job limits into task limits and task count, and sends them to the Borgmaster.
  4. Finally, the Borgmaster processes these updates and takes appropriate action, such as adjusting the resource allocation of the affected tasks or adjusting the number of tasks.

Per-task Vertical Autoscaling

As previously mentioned, vertical autoscaling automates the setting of task resource limits, freeing users from manual configuration. The goal is to determine limits that prevent potential issues—such as performance degradation due to preemptions and OOM errors caused by overly restrictive RAM limits—while also optimizing resource utilization to reduce costs for users. Fundamentally, this is an optimization problem.

Autopilot addresses this challenge using two primary approaches:
  • Moving Window Recommenders
  • Machine Learning
Irrespective of the approach used, the goal is to find the resource limits for the job.

Let's delve into the details of their operation. The paper provides a comprehensive explanation of the mathematical details. Often, research papers relegate such in-depth explanations to the appendices.

To understand the underlying mechanisms, we need to introduce some notation. The core of the system relies on time-series analysis.

Let:
  • ri[τ] represent the resource usage value exported by task i at time τ.
  • si[t] represent a distribution of all data points in ri within the time window [t - 5 minutes, t]. This distribution is essentially a histogram composed of discrete buckets.
If we denote the range of the kth bucket as [b[k−1], b[k]), then si[t][k] represents the frequency (count) of data points from task i's resource usage within [b[k−1], b[k]) between time [t - 5 minutes, t].

For CPU usage, the monitoring system utilizes a fixed set of 400 buckets, and a task's CPU usage distribution (si[t]) can have non-zero frequencies across multiple buckets.

In contrast, for memory usage, the distribution si[t] typically has a significant non-zero frequency in only one bucket—the one corresponding to the highest observed memory usage. This is because memory usage tends to be relatively stable over short timeframes. Even when objects are deallocated, the underlying memory pages might remain assigned to the process due to memory allocators like TCMalloc, which cache pages for potential reuse.

We can derive the resource usage distribution for an entire job, s[t], by summing the individual task distributions:

s[t][k] = ∑i si[t][k]

Essentially, this involves summing the frequencies of the corresponding kth bucket across all tasks belonging to the job. It's important to note that the data collection period (τ) is approximately 1 second. Consequently, each individual task distribution si[t] is derived from roughly 300 data points within the 5-minute window, and the job distribution s[t] incorporates approximately 300 × (number of tasks) data points.

Let's illustrate the mathematical formulation with an example. Consider a job comprising two tasks.

The first task exhibits the following CPU usage over a 5-minute window:

  • 10 CPU for 2.5 minutes (150 seconds)
  • 20 CPU for 2.5 minutes (150 seconds)

The second task shows the following CPU usage over the same 5-minute window:

  • 20 CPU for 2.5 minutes (150 seconds)
  • 30 CPU for 2.5 minutes (150 seconds)

Assume the CPU usage bucket boundaries are defined as {0, 10, 20, 30, 40}, resulting in the following buckets in a distribution: [0, 10), [10, 20), [20, 30), [30, 40). Then,

s0[t][0] = 0
s0[t][1] = 150 # number of seconds in 2.5 minutes.
s0[t][2] = 150
s0[t][3] = 0

s1[t][0] = 0
s1[t][1] = 0
s1[t][2] = 150
s1[t][3] = 150

Finally s[t] is [0, 150, 300, 150].

Moving Window Recommenders

The core idea behind moving window recommenders is to analyze samples (s[τ]) to make a recommendation. Importantly, not all these samples are treated equally; more recent sample is given higher significance. This is achieved using a weighting function:

W[τ] = 2−τ/t1/2
 
where τ is the time of a sample. t1/2 (the half-life of the weight) is 12 hours for CPU.

With 12 hours, the weights would follow a pattern where the most recent 12 hours samples have a weight of 1, the next 12 hours samples (12 to 24 hours old) have a weight of 0.5, the subsequent 12 hours samples (24 to 36 hours old) have a weight of approximately 0.25, and so on, effectively halving the weight every 12 hours.

The paper defines three distinct types of windowing recommendations each applied based on the application's use case.

Peak

This is simply the maximum value observed across all buckets in the last N samples. This is used for memory for jobs with minimal OOM tolerance.

Weighted Average

First, we calculate the mean of the sample s[τ] at a given time τ. This is done using the standard formula for the mean of a distribution:

s[τ]mean = ∑(b[j] x s[τ][j]) / ∑(s[τ][j])

where b[j] is a representative value for the jth bucket and s[t][j] is the frequency of that bucket.

To incorporate the historical trend captured by the moving window, a weighted average of the sample mean values is computed:

Weighted Average = ∑all samples (weight * sample mean) / all samples (weight)

The above is a simplified formulation of equation (2) in section 3.2.2.

Weighted average is used for CPU limits of batch jobs, which typically tolerate CPU throttling during peak usage.

jth Percentile of Adjusted Usage

To calculate this, we first adjust the frequency of each bucket in the historical data by multiplying it with the corresponding weight and the bucket's value. This creates a load-adjusted distribution. We compute the jth percentile directly from this load-adjusted distribution. 

Adjusted frequency of bucket k = b[k] * ∑all samples (weight * sample[k])

The above is a simplified formulation of equation (3) in section 3.2.2. Let's understand what multiplying frequency by b[k] means. Going by our previous example, say the buckets are [0, 10), [10, 20), [20, 30), [30, 40). Let the corresponding b be [0, 10, 20, 30], i.e., taking the minimum in each bucket. Say there is only one sample - the most recent one - and its distribution is [0, 150, 300, 150]. Then, after load adjustment of the sample, we will get a new distribution, [0, 1500, 6000, 4500]. The earlier numbers {0, 150, 300, 150} represented pure frequencies. Now, the numbers {0, 1500, 6000, 4500} carry hints of actual usages. The p95 of the former distribution is 38, whereas the p95 of the latter distribution is 38.67. The latter is better suited because it represents the 95th percentile of the usage that would be observed.

The jth percentile is used to set CPU limits for serving jobs (typically p95 or p90), RAM limits for jobs with low OOM tolerance (p98), and RAM limits for jobs with intermediate OOM tolerance (p60).

ML Recommenders

The ML recommender for vertical autoscaling involves several steps, but the underlying calculations are relatively straightforward. The system utilizes N different models (ensemble of models), each characterized by a decay rate (dm) and a safety margin (Mm).

Step 1 to 4 determine the most optimal limit for the job found by each model. Step 5 and 6 then find the best model with the most optimal limit for the job. 

Step 1: Calculate Overrun Cost

The overrun cost, o(L)[t], represents the cost incurred if the resource limit is set to L at time t, and the actual usage exceeds this limit. It's calculated as a decaying sum of past overruns:

o(L)[t] = (1 - dm)*o(L)[t−1] + dm*(∑j s[t][j] such that b[j] > L)

The first term, (1 - dm)*o(L)[t−1], represents the decayed overrun cost from the previous time step. The decay factor (1 - dm) ensures that older overrun costs have a diminishing impact.

The second term, dm*(∑j s[t][j] such that b[j] > L), accounts for the current overrun. It sums the frequencies s[t][j] of all buckets j where the bucket's value b[j] is greater than the proposed limit L. This sum is then weighted by the model's decay rate dm.

Step 2: Calculate Underrun Cost

The underrun cost, u(L)[t], represents the cost of underutilization if the limit is set to L at time t, meaning the allocated resources are more than the actual usage. The formula is analogous to the overrun cost:

u(L)[t] = (1 - dm)*u(L)[t−1] + dm*(∑j s[t][j] such that b[j] < L)

Step 3: Determine the Best Limit for Each Model

Each model m attempts to find a candidate limit Lm′[t] that minimizes a weighted sum of the current overrun cost, underrun cost, and the cost of changing the limit from the previous time step (Lm′[t−1]):

Lm′[t] = argminL (wo*o(L)[t] + wu*u(L)[t] + wΔL*Δ(L, Lm′[t−1]))

where:

wo is the weight for the overrun cost.
wu is the weight for the underrun cost.
wΔL is the weight for the cost of changing the limit.

Δ(L, Lm′[t−1]) is an indicator function that equals 1 if the proposed limit L is different from the previous best limit for model m (Lm′[t−1]), and 0 otherwise. This term penalizes frequent limit changes.

Step 4: Apply Safety Margin

After finding the optimal limit Lm′[t] based on cost minimization, each model adds its specific safety margin Mm to arrive at its final recommended limit Lm[t]:

Lm[t] = Lm′[t] + Mm

Step 5: Calculate the Cost of Each Model's Recommendation

To evaluate the performance of each model over time, a decaying cost cm[t] is calculated:

cm[t] = d * (wo*cost of overrun by model m 
+ wu*cost of underrun by model m 
+ wΔL*cost of change) 
+ (1 − d) * cm[t−1]

The cost of change would be 1 if the chosen limit Lm[t] is different from the previously applied limit, and 0 otherwise. The decay parameter d gives more weight to recent costs.

Step 6: Choose the Best Model and Set the Limit

Finally, the system selects the model m that minimizes a combined cost, taking into account the model's historical cost, the cost of switching models, and the cost of changing the limit:

L[t] = argminm (cm[t] + wΔm*Δ(m[t−1], m[t]) + wΔL*Δ(L[t−1], Lm[t]))

where:

wΔm is the weight for the cost of changing the model.

Δ(m[t−1], m[t]) is 1 if the chosen model at time t is different from the model chosen at the previous time step, and 0 otherwise. Δ(L[t−1], Lm[t]) is 1 if the limit Lm[t] proposed by the chosen model is different from the limit set at the previous time step (L[t−1]), and 0 otherwise.

The final resource limit L[t] is then set to the limit recommended by the chosen model (Lm[t]).

Model and Hyperparameters

Model Parameters (dm, Mm): Each of the N models has its own fixed decay rate and safety margin. By creating a diverse set of models with different parameter values, the system can choose the one that best adapts to the workload's behavior.

Hyperparameters (d, wΔL, wΔm, wo, wu): These are higher-level parameters that control the learning process and the decision-making of the recommender system. They need to be tuned or learned through experimentation to optimize the overall performance of the autoscaling mechanism.

Horizontal Autoscaling

Horizontal autoscaling in Autopilot is triggered by the following metrics:

  • CPU Utilization: Similar to vertical autoscaling, users can define a lookback window (T). Within this window, a chosen percentile or the maximum CPU usage is considered the required usage. Users also specify their desired average utilization. Based on the ratio of the required usage to the average utilization, Autopilot determines the necessary number of tasks for the job.
  • Target Size (User-Defined Function): Alternatively, users can provide their own function to calculate the desired number of tasks. This function can take various input metrics, such as QPS, into account.

Strategies

It's important to note that horizontal scaling carries inherent costs. Allocating new machines for additional tasks takes time. Furthermore, during the scaling process, fluctuations in the triggering metrics (like CPU utilization or QPS) can occur, potentially leading to instability and further unnecessary scaling actions. To mitigate these issues, Autopilot employs several strategies:

  1. Deferred Downscaling: Upscaling (adding tasks) is performed immediately when needed. However, downscaling (removing tasks) is delayed for a specific period. Once new tasks are started, they remain operational for a stabilization period. This is beneficial because a temporary load spike might recur shortly after it subsides, in which case the scaled-up capacity would already be in place. The trade-off is potentially higher resource consumption during the deferral period.

  2. Slow Decay: During downscaling, the number of tasks is reduced gradually rather than abruptly. This helps to avoid sudden drops in capacity and potential disruptions.

  3. Defer Small Changes: Horizontal scaling can have disruptive side effects on certain types of applications. For example, a distributed cache using consistent hashing to distribute data might experience significant re-sharding and temporary availability impacts even with the addition or removal of a single task. Therefore, small scaling changes are often deferred to avoid these disruptions.

  4. Limited Growth: Even during upscaling events, the maximum rate of task growth is limited. This prevents the rapid creation of a large number of new tasks, which could themselves become unavailable during their startup phase and exacerbate instability.

Below is an example of how these strategies play out.


Evaluation

The study evaluates the system's performance directly using real-world workloads. To track this, the authors employ the following metrics:

  • Footprint: Calculated as the normalized sum of average task limits for a job, expressed in the number of machines (where number of machines = RAM bytes / memory per machine).
  • Relative slack: Defined as (limit - usage) / limit.
  • Absolute slack: The difference between limit and usage, normalized to the number of machines.
  • Relative OOM rate: The ratio of the number of OOM events to the total number of tasks.
  • Number of job-days without OOMs.

The results show:

  • Non-autopiloted jobs exhibit a relative slack ranging from 46% to 60%, while autopiloted jobs show a tighter range of 23% to 31%, with ML recommenders demonstrating superior performance.
  • Autopilot significantly reduces absolute slack to approximately 500 machines compared to 12,000 machines for non-autopiloted jobs, resulting in estimated savings of tens of millions of USD for Google. Figure 3(c) shows that Autopilot is indeed responsible for the smaller footprints observed in autopiloted jobs (rather than sampling errors).
  • Migrating 500 previously non-autopiloted jobs led to substantial performance improvements, saving 1708 machines over a four-month period.
  • OOM events are infrequent in autopiloted jobs, with 99.5% experiencing no OOMs. Figure 5 is somewhat difficult to interpret. The y-axis is the percentage of days when no OOMs were observed. It indicates that all algorithms achieved zero OOMs on at least 97% of days, with the moving window approach exhibiting the fewest days with OOMs.
  • Figure 6 reveals an inverse relationship between memory slack and relative OOM rate. Moving window recommenders excel at managing OOMs while maintaining low slack, whereas ML recommenders tend to reduce limits too aggressively, leading to more OOMs.
  • Recommendations demonstrate stability, with no changes occurring on 70% of job-days and only 6 to 7 changes observed at the 99th percentile (Figure 7).
  • Long-running jobs exhibit lower slack compared to new jobs, suggesting that Autopilot cautiously adjusts limits for new jobs to mitigate the risk of OOMs.

Paper Review

The mathematical details driving the vertical autoscaling in this paper are central to its contribution. While the initial formulation might seem daunting, it becomes surprisingly accessible when broken down into its constituent parts. The fact that this approach operates successfully across Google's incredibly diverse and large-scale infrastructure highlights its robustness.

Sections 5 and 6 offer valuable insights into the strategies employed to ensure widespread accessibility and encourage adoption of the system. These lessons are often invaluable when designing infrastructure, highlighting the importance of demonstrating the system's value to client teams to drive impact.

Comments

Popular Posts

Paper Insights #25 - CliqueMap: Productionizing an RMA-Based Distributed Caching System

Memcached is a popular in-memory cache, but I'd like to discuss CliqueMap, Google's caching solution. Having worked closely with CliqueMap, I have a deep understanding of its architecture. One major difference from Memcached is CliqueMap's use of RMA for reads. We'll also take a closer look at RDMA, a crucial cloud technology that emerged in the 2010s. Paper Link Let's begin with some basic concepts. Network Interface Card (NIC) The NIC facilitates data reception and transmission. Understanding its operation requires examining the fundamental interaction between the CPU and memory. CPU <-> Memory Communication In a Von Neumann Architecture , the CPU and memory are core components, enabling Turing computation. Their communication relies on the system bus (e.g. PCIe ), a set of electrical pathways connecting the CPU, memory, and I/O devices. The system bus comprises three primary logical components: Data Bus : Bidirectional, carrying the actual data being tran...

Paper Insights #26 - Don't Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS

This work provides a strong foundation for understanding causality , both within distributed systems and more broadly. Its principles underpin systems achieving causal consistency, a powerful form of consistency that ensures high availability. Presented at SOSP 2011, this paper features contributions from prominent distributed systems researchers Wyatt Lloyd and Michael Freedman . Paper Link Let's begin with some basic concepts. Causal Ordering In 1978, Leslie Lamport published Time, Clocks, and the Ordering of Events in a Distributed System , a seminal paper that significantly impacted distributed system design. This work, alongside Paxos and TLA+ , stands as one of Lamport's most influential contributions. A fundamental challenge in distributed systems is clock synchronization . Perfect synchronization is unattainable, a fact rooted in both computer science and physics. However, the goal isn't perfect synchronization itself, but rather the ability to totally order even...

Paper Insights #24 - Spanner: Google's Globally-Distributed Database

This landmark paper, presented at ODSI '12, has become one of Google's most significant contributions to distributed computing. It didn't solve the long-standing core problem of scalability of 2PC in distributed systems, rather, it introduced  TrueTime  that revolutionized system assumptions. Authored by J.C. Corbett , with contributions from pioneers like Jeff Dean and Sanjay Ghemawat , this paper effectively ended my exploration of distributed SQL databases. It represents the leading edge of the field. Paper Link I would highly recommend reading the following before jumping into this article: 1.  Practical Uses of Synchronized Clocks in Distributed Systems where I introduced why clock synchronization is necessary but not sufficient for external consistency. 2.  A New Presumed Commit Optimization for Two Phase Commit where I introduced two-phase commits (2PC) and how it is solved in a distributed system. 3.  Amazon Aurora: Design Considerations for High Th...