mattbutterfield.com

home | blog | music | photos

Running asynchronous tasks with Cloud Run, Go, and Cloud Tasks.

2021-12-11

It's been a few months since I decided to start using Cloud Run to host my personal web projects. For the most part it's been great, but I've had a hard time deciding how to handle asynchronous tasks and longer running processes. The official recommendation is to use Cloud Tasks, but as someone who isn't used to working with this kind of managed infrastructure, I had a hard time embracing it at first. Once I set it up though, I found it to be quite nice, so I decided to share my setup here as a simple example.

The scenario

My first use case is file uploads. The actual files get uploaded directly from the browser to Cloud Storage thanks to a signed upload url and some simple Javascript. Once the upload finishes, the browser makes a POST request to my server with some metadata about the file that was uploaded. At this point I want some extra processing to happen on the file that could take a while, so it needs to happen outside of a normal HTTP request.

What I'm used to doing is pushing a task message to a Redis queue managed by something like Sidekiq or RabbitMQ, but instead we'll use Cloud Tasks as our queue, and set up a separate Cloud Run service to process the tasks.

Creating the task handler service

Let's look at some code. Below is a Go HTTP handler that parses a JSON request and then performs some operations that could take some time.

    
type SaveSongRequest struct {
	AudioFileName string `json:"audioFileName"`
	CreatedDate   string `json:"createdDate"`
	SongName      string `json:"songName"`
	Description   string `json:"description"`
}

func SaveSong(w http.ResponseWriter, r *http.Request) {
	body := &SaveSongRequest{}
	err := json.NewDecoder(r.Body).Decode(body)
	if err != nil {
		handleError(err, w)
		return
	}
	client, err := storage.NewClient()
	if err != nil {
		handleError(err, w)
		return
	}
	bucket := client.Bucket(lib.FilesBucket)

	// ⚠️ time consuming operation
	err = convertAudio(bucket, body.AudioFileName)
	if err != nil {
		handleError(err, w)
		return
	}

	// etc...
}
    
  

This is exactly the kind of HTTP handler I am trying to avoid writing for my web application, but this will be triggered by Cloud Tasks instead of a user's web browser, so this is actually our asynchronous worker code and not a regular endpoint to be called by users.

Why is it even an HTTP handler? Because the only way to trigger Cloud Run is through an HTTP request. Luckily, Cloud Tasks can use HTTP as a target for handling tasks sent to its queues.

This all works well together. You can just add the handler to your favorite router:

    
r := mux.NewRouter()
r.HandleFunc("/save_song", SaveSong).Methods(http.MethodPost)
err := http.ListenAndServe(net.JoinHostPort("", "8000"), r)
    
  

...and this can now be run and deployed just like any other Go Cloud Run service - importantly with "Allow all traffic" selected for Ingress (Cloud Task HTTP targets must have a public IP), and "Require authentication. Manage authorized users with Cloud IAM." also selected. This will require us to specify a service account when creating the task.

Creating the tasks

There is minimal setup required for Cloud Tasks. After enabling the API in the web console, you can create a queue with the gcloud CLI tool, for example:

    gcloud tasks queues create save-song-uploads
  

Now that we have a queue and a task handler, we can enqueue a task from the HTTP handler that gets called by the user's web browser after the file upload is complete:

    
import (
	"cloud.google.com/go/cloudtasks/apiv2"
	"context"
	"encoding/json"
	"fmt"
	"github.com/m-butterfield/mattbutterfield.com/app/lib"
	"google.golang.org/genproto/googleapis/cloud/tasks/v2"
	"log"
	"net/http"
)

const (
	// URL of the task worker cloud run service
	workerURL  = "https://mattbutterfield-worker-12345.a.run.app/"
	projectID  = "sample-project"
	locationID = "us-central1"
)

func WebSaveSong(w http.ResponseWriter, r *http.Request) {
	body := &SaveSongRequest{}
	err := json.NewDecoder(r.Body).Decode(&body)
	if err != nil {
		handleError(err, w)
		return
	}

	task, err := createTask("save_song", "save-song-uploads", body)
	if err != nil {
		handleError(err, w)
		return
	} else {
		log.Println("Created task: " + task.Name)
	}
	w.WriteHeader(201)
}

func createTask(name, queue string, body SaveSongRequest) (*tasks.Task, error) {
	ctx := context.Background()
	client, err := cloudtasks.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("NewClient: %v", err)
	}
	defer client.Close()

	httpReq := &tasks.HttpRequest{
		HttpMethod: tasks.HttpMethod_POST,
		Url:        workerURL + name,
		AuthorizationHeader: &tasks.HttpRequest_OidcToken{
			OidcToken: &tasks.OidcToken{
				ServiceAccountEmail: t.serviceAccountEmail,
			},
		},
	}

	req := &tasks.CreateTaskRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s/queues/%s",
			projectID, locationID, queue),
		Task: &tasks.Task{
			MessageType: &tasks.Task_HttpRequest{
				HttpRequest: httpReq,
			},
		},
	}

	if message, err := json.Marshal(body); err != nil {
		return nil, err
	} else {
		req.Task.GetHttpRequest().Body = message
	}

	return client.CreateTask(ctx, req)
}
    
  

Now we have a full backend asynchronous queue/worker system with all the benefits that come with Cloud Run. One limitation to note is timeouts. HTTP tasks have a maximum duration of 30 minutes (10 minutes by default), but this should be plenty of time for most use cases.

I'm sure I'll need some cron-like functionality soon, and it looks like Cloud Scheduler will allow me to use my worker service for that as well. That's all for now!