Of course, we’d like to avoid having to pay for a useless machine. We’ll see that there’s a way to have zero machines and still have an automatic scaling system.
However, there is one prerequisite: we need a way of knowing when it’s time to scale up again.
At Adenlab we use a taskqueue(MRQ) for our recurring jobs. MRQ allows us to route our jobs to specific worker groups, depending on the queue and/or the task itself. Each worker group is linked to an ASG, so we have a clear metric here: for a given ASG, do we have any jobs queued?
With all this, we can use the following ASG structure:
A small (read: inexpensive) machine that will always be up (not self-leveling)
One or more ASGs, without auto-scaling policy
By creating an ASG without an auto-scaling policy, GCE will let us scale to zero machines if we want. On our little machine, we’ll have a script that runs every 5 minutes and checks each MRQ worker group for jobs to process.
If this is the case, the script will create an auto-scaling policy for the worker group concerned, and scale it up to 1 machine. The auto-scaler can then add additional machines as required, depending on the base metric configured (in our case, CPU utilization).
Let’s write an MRQ task for this script. Already the necessary imports :
from mrq.task import Task
from google.oauth2 import service_account
from googleapiclient import discovery
import time
import re
First, we need to configure a few parameters concerning GCP :
class ScaleUp(Task):
project_name = "name of your GCP project"
zone = "europe-west1-c" # adapt your zone here
service_account_path = "service_account_credentials.json"
# groups that are not concerned by this task
groups_to_skip = ("group1",)
Add to groups_to_skip the worker groups that should not be auto-scalated: at least add the group of the machine that will unstack this task.
Now let’s write the main method of our task:
def run(self, params):
credentials = service_account.Credentials
.from_service_account_file(
self.service_account_path
)
self.service = discovery.build(
"compute",
"v1",
credentials=credentials,
cache_discovery=False
)
# We need to have a way to know what we want our different
# autoscaling policies to be.
# We could store them in a DB and fetch them here,
# so that it is shared with our Ansible playbooks for instance.
# For simplicity here we'll just hardcode them here:
self.autoscaler_configs = {
"group2": {
"min_replicas": 1,
"max_replicas": 10,
"cooldown": 180,
"cpu_target": 0.80
},
"group3": {
"min_replicas": 1,
"max_replicas": 8,
"cooldown": 180,
"cpu_target": 0.90
}
}
# First we need to fetch existing ASGs
self.fetch_groups()
# Next we want to know which groups currently have an autoscaler
self.fetch_autoscalers()
# Check each groups and see if we should scale them up
self.check_groups()
Here’s how to retrieve ASG information:
def fetch_groups(self):
self.groups = {}
request = self.service.instanceGroupManagers()
.list(
project=self.project,
zone=self.zone
)
while request is not None:
response = request.execute()
for asg in response['items']:
group_name = asg["baseInstanceName"]
self.groups[group_name] = {
"base_name": group_name,
"name": asg["name"],
"size": asg["targetSize"],
"link": asg["selfLink"]
}
request = self.service.instanceGroupManagers()
.list_next(
previous_request=request,
previous_response=response
)
We hydrate self.groups with ASG information. For more info on ASG structure, see here.
Now let’s take a look at the code we’ll use to create an autoscaler for a given worker group:
def create_autoscaler(self, group):
autoscaler_config = self.autoscaler_configs[group]
config = {
"target": self.groups[group]["link"],
"name": "%s-as" % group,
"autoscalingPolicy": {
"minNumReplicas": autoscaler_config["min_replicas"],
"maxNumReplicas": autoscaler_config["max_replicas"],
"coolDownPeriodSec": autoscaler_config["cooldown"],
"cpuUtilization": {
"utilizationTarget": autoscaler_config["cpu_target"]
}
}
}
operation = self.service.autoscalers().insert(
project=self.project,
zone=self.zone,
body=config
)
wait_for_operation(operation)
You can find the wait_for_operation code in this example.
The last thing we need is a method for scaling up an ASG:
def scale_up(self, group, size=1):
if self.groups[group]["size"] > 0:
# Already scaled up
return
# Make sure we have an autoscaler
if not self.autoscalers.get(group):
self.create_autoscaler(group)
operation = self.service.instanceGroupManagers().resize(
project=self.project,
zone=self.zone,
instanceGroupManager=self.groups[group]["name"],
size=size
)
wait_for_operation(operation)
The final logic of our task is quite simple:
def check_groups(self):
# Now we have everything we need for the actual task logic:
for group in self.groups:
if group in self.groups_to_skip:
continue
if self.should_scale_up(group):
self.scale_up(group)
should_scale_down is the method that should contain your scaling logic. We don’t provide it here, but remember that in our case it’s a question of checking whether or not we have jobs waiting to be processed.
This task is scheduled to run every 5 minutes. This is convenient for our use case, because even if there are no pending jobs, a team member can perform an action at any time, which will result in the creation of a new job.
We don’t want to have to wait too long for it to be started. Of course, for any user actions that create jobs and are supposed to get a quick response, we need a dedicated machine that’s always up.
In most cases, however, it’s best to avoid asynchronous tasks for user interactions that require feedback.
So now we can scale up a machine when we need it, and GCE will take over if we need more. But we also need a way to scale down to zero when we don’t need it again!
To do this, we have a second task, running every 30 minutes, which will execute the same code as the first, with the difference that it will remove the autoscaler and scale to 0 machines if there are no pending jobs.
We can inherit our new task from the previous one, so that we have all the methods we need to communicate with GCP :
class ScaleDown(ScaleUp):
We also need some new methods to scale down:
def delete_autoscaler(self, group):
autoscaler = self.autoscalers[group]
operation = self.service.autoscalers().delete(
project=self.project,
zone=self.zone,
autoscaler=autoscaler["name"]
)
wait_for_operation(operation)
def scale_down(self, group):
if self.groups[group]["size"] == 0:
# Already scaled down
return
# Delete the autoscaler so that we can scale to zero machine
if self.autoscalers.get(group):
self.delete_autoscaler(group)
operation = self.service.instanceGroupManagers().resize(
project=self.project,
zone=self.zone,
instanceGroupManager=self.groups[group]["name"],
size=0
)
wait_for_operation(operation)
def check_groups(self):
for group in self.groups:
if group in self.groups_to_skip:
continue
if self.should_scale_down(group):
self.scale_down(group)
When we know there are no more pending jobs, we remove the autoscaler and scale down completely. Again, should_scale_down contains the scale down logic to be implemented.
The positive side of this approach is that we can have several scaling criteria. For example, to avoid scaling up and down several times in a row too close together, we can also check that a certain amount of time has elapsed without a job being created before scaling down.