lro

package module
v2.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2026 License: Apache-2.0 Imports: 25 Imported by: 0

README

LRO v2

go.alis.build/lro/v2 provides a long-running operations client backed by Google Cloud Spanner and resumed through Cloud Tasks.

What this package does

  • Persists google.longrunning.Operation records in Spanner.
  • Exposes a standard google.longrunning.Operations gRPC server.
  • Registers HTTP callback routes for resumable handlers.
  • Stores private resumable workflow state alongside operation metadata.
  • Requeues unfinished work through Cloud Tasks.

Installation

go get go.alis.build/lro/v2

Infrastructure contract

Provision the backing Spanner resources before creating the client. For a service following the ALIS module layout, keep the LRO resources in infra/modules/alis.lro.v2 and wire that module from infra/main.tf.

The client expects:

  • A Spanner table named ${replace(project, "-", "_")}_${replace(neuron, "-", "_")}_Operations
  • A Cloud Tasks queue named {neuron}-operations
  • A callback host that matches either the explicit Host config or the default Cloud Run URL inferred by NewFromEnv

The full Terraform example and schema live in docs.go.

Startup flow

Create one shared client at startup, register resumable handlers once, mount the HTTP callback routes, and register the Operations API:

mux := http.NewServeMux()

client, err := lro.NewFromEnv(ctx, "launchpad-v1")
if err != nil {
	return err
}
defer client.Close()

if err := client.AddResumableHandlers(
	lro.ResumableHandler{Path: "create-agent", Handler: createAgentHandler},
); err != nil {
	return err
}
client.RegisterHTTPHandlers(mux)

longrunningpb.RegisterOperationsServer(grpcServer, client.OperationsServer())

In an RPC, create the operation, save any private state you need for resumes, and schedule the first callback:

op, err := client.NewOperation(ctx, "operations/"+uuid.NewString(), metadata)
if err != nil {
	return nil, err
}
if err := op.SavePrivateState(&CreateAgentState{Owner: "users/123"}); err != nil {
	return nil, err
}
if err := op.ResumeViaTasks("create-agent", 0); err != nil {
	return nil, err
}
return op.OperationPb(), nil

See example_test.go and docs.go for the end-to-end flow.

Documentation

Overview

Package lro provides a client for managing long-running operations stored in Spanner and resumed via Cloud Tasks.

Before using this package, provision the backing Spanner table for the target `neuron`. The client expects a table named:

${replace(project, "-", "_")}_${replace(neuron, "-", "_")}_Operations

The required schema is:

  • `key` `STRING` primary key, computed/stored from `Operation.name`
  • `Operation` `PROTO<google.longrunning.Operation>` required
  • `State` `BYTES` nullable
  • `ResumePoint` `STRING` nullable
  • `UpdateTime` `TIMESTAMP` required

For ALIS-managed services, wire this the same way as the other infra modules:

```hcl infra/

main.tf
apis.tf
cloudrun.tf
spanner.tf
variables.tf
modules/
  alis.lro.v2/
    main.tf

```

`infra/main.tf` should wire the module like this:

```hcl

module "alis_lro_v2" {
  source = "./modules/alis.lro.v2"

  alis_os_project               = var.ALIS_OS_PROJECT
  alis_region                   = var.ALIS_REGION
  alis_project_nr               = var.ALIS_PROJECT_NR
  alis_managed_spanner_project  = var.ALIS_MANAGED_SPANNER_PROJECT
  alis_managed_spanner_instance = var.ALIS_MANAGED_SPANNER_INSTANCE
  alis_managed_spanner_db       = var.ALIS_MANAGED_SPANNER_DB
  neuron                        = local.neuron

  depends_on = [google_project_service.environment]
}

```

Inside `modules/alis.lro.v2/main.tf`, provision the table and TTL policy that match the client contract:

```hcl

resource "alis_google_spanner_table" "operations" {
  project         = var.alis_managed_spanner_project
  instance        = var.alis_managed_spanner_instance
  database        = var.alis_managed_spanner_db
  name            = "${replace(var.alis_os_project, "-", "_")}_${replace(var.neuron, "-", "_")}_Operations"
  prevent_destroy = true

  schema = {
    columns = [
      {
        name            = "key"
        is_computed     = true
        computation_ddl = "Operation.name"
        is_stored       = true
        type            = "STRING"
        is_primary_key  = true
        required        = true
        unique          = true
      },
      {
        name          = "Operation"
        type          = "PROTO"
        proto_package = "google.longrunning.Operation"
        required      = true
      },
      {
        name     = "State"
        type     = "BYTES"
        required = false
      },
      {
        name     = "ResumePoint"
        type     = "STRING"
        required = false
      },
      {
        name     = "UpdateTime"
        type     = "TIMESTAMP"
        required = true
      },
    ]
  }
}

resource "alis_google_spanner_table_ttl_policy" "operations" {
  project  = alis_google_spanner_table.operations.project
  instance = alis_google_spanner_table.operations.instance
  database = alis_google_spanner_table.operations.database
  table    = alis_google_spanner_table.operations.name
  column   = "UpdateTime"
  ttl      = 90
}

```

That module should line up with the runtime values passed to `lro.New` or derived by `lro.NewFromEnv`:

  • `Project` / `CloudTasksProject` -> `ALIS_OS_PROJECT`
  • `SpannerProject` -> `ALIS_MANAGED_SPANNER_PROJECT`
  • `SpannerInstance` -> `ALIS_MANAGED_SPANNER_INSTANCE`
  • `SpannerDatabase` -> `ALIS_MANAGED_SPANNER_DB`
  • `CloudTasksQueue` -> `{neuron}-operations`
  • `CloudTasksServiceAccount` -> `alis-build@${ALIS_OS_PROJECT}.iam.gserviceaccount.com`
  • `Host` -> `https://{neuron}-{ALIS_PROJECT_NR}.{ALIS_REGION}.run.app` unless overridden

The v2 API uses explicit client configuration and explicit HTTP binding:

client, err := lro.New(ctx, lro.Config{
	Neuron:                   "launchpad-v1",
	Project:                  "my-project",
	SpannerProject:           "my-spanner-project",
	SpannerInstance:          "my-spanner-instance",
	SpannerDatabase:          "my-spanner-db",
	CloudTasksProject:        "my-project",
	CloudTasksLocation:       "europe-west1",
	CloudTasksQueue:          "launchpad-v1-operations",
	CloudTasksServiceAccount: "alis-build@my-project.iam.gserviceaccount.com",
	Host:                     "https://launchpad-backend.example.com",
})
if err != nil {
	return err
}
if err := client.AddResumableHandlers(
	lro.ResumableHandler{Path: "create-agent", Handler: createAgentHandler},
); err != nil {
	return err
}
client.RegisterHTTP(mux)
client.RegisterGRPC(grpcServer)

op, err := client.NewOperation(ctx, "operations/123", metadata)
if err := op.ResumeViaTasks("create-agent", 0); err != nil {
	return err
}

Services that use ALIS-managed infrastructure can construct the client from env:

client, err := lro.NewFromEnv(ctx, "launchpad-v1")
if err != nil {
	return err
}
if err := client.AddResumableHandlers(
	lro.ResumableHandler{Path: "create-agent", Handler: createAgentHandler},
); err != nil { return err }
client.RegisterHTTP(mux)
client.RegisterGRPC(grpcServer)

`NewFromEnv` infers the callback host from the Cloud Run URL pattern and these env vars:

  • `ALIS_PROJECT_NR`
  • `ALIS_REGION`

The inferred host has this form:

https://{service}-{project-number}.{region}.run.app

That host can be overridden when needed:

client, err := lro.NewFromEnv(ctx, "launchpad-v1", lro.WithHost("https://launchpad-backend.example.com"))

Importing the package never validates env vars or panics.

Here is a typical implementation flow:

  1. Create a shared client once during process startup:

    client, err := lro.NewFromEnv(ctx, "launchpad-v1") if err != nil { return err }

  2. Register resumable handlers during startup, not when the RPC is called. Use single or batch flows depending on the service setup:

    Register one or more handlers in one place:

    if err := client.AddResumableHandlers( lro.ResumableHandler{Path: "publish-agent", Handler: publishAgentHandler}, lro.ResumableHandler{Path: "submit-agent-feedback", Handler: submitAgentFeedbackHandler}, lro.ResumableHandler{Path: "create-agent-from-content", Handler: createAgentFromContentHandler}, ); err != nil { return err }

  3. Expose both the Operations API and the resumable HTTP callback routes:

    Typically, in the server.go add the following:

    client.RegisterGRPC(grpcServer) client.RegisterHTTP(mux)

  4. In the RPC method, create the operation, attach metadata for clients, save private state for the resumable workflow, and schedule the first callback:

    op, err := client.NewOperation(ctx, "operations/"+uuid.NewString(), metadata) if err != nil { return nil, err } if err := op.SavePrivateState(&MyState{ UserID: userID, Stream: streamName, }); err != nil { return nil, err } if err := op.ResumeViaTasks("create-agent-from-content", 0); err != nil { return nil, err } return op.OperationPb(), nil

  5. In the resumable handler, restore private state, optionally unmarshal and update metadata, then either requeue or finish the operation:

    func createAgentFromContentHandler(op *lro.Operation) { state := &MyState{} if err := op.DecodePrivateState(state); err != nil { op.Fail("decode private state: %v", err) return }

    meta := &pb.CreateAgentFromContentMetadata{} if _, err := lro.UnmarshalMetadata(op, meta); err != nil { op.Fail("unmarshal metadata: %v", err) return }

    meta.StatusMessage = "Waiting for content processing..." if err := op.SaveMetadata(meta); err != nil { op.Fail("save metadata: %v", err) return }

    if stillWaiting { if err := op.ResumeViaTasks("create-agent-from-content", 2*time.Second); err != nil { op.Fail("reschedule task: %v", err) } return }

    _ = op.Complete(response) }

This split is intentional:

  • operation metadata is the client-visible status surface
  • private state is for workflow-only data such as user ids, upstream resource names, poll counts, or serialized requests

For Cloud Tasks driven handlers that call other services, Launchpad also uses `context.WithoutCancel(ctx)` before creating outbound RPC metadata. That avoids propagating the Cloud Tasks dispatch deadline to downstream services that may schedule their own async work.

Mental model for building an RPC or method that returns an LRO:

  1. At service startup, add a resumable handler for that workflow and register the HTTP handlers on your mux.
  2. In the RPC that creates the operation, create the LRO, persist any private state needed to continue later, and call `ResumeViaTasks(path, delay)`.
  3. In the resumable handler, reload state and metadata from the operation, advance the workflow, and either: - call `ResumeViaTasks(path, nextDelay)` again to continue later, or - call `Complete(...)` / `Fail(...)` to finish the operation.

The important design rule is that the resumable handler must be registered at startup. Do not rely on scheduling time to create HTTP routes, because a future Cloud Tasks callback may land on a fresh instance that never executed the original scheduling request.

Example (ResumeViaTasks)
package main

import (
	"context"
	"net/http"
	"time"

	"github.com/google/uuid"
	lro "go.alis.build/lro/v2"
	"google.golang.org/protobuf/types/known/emptypb"
	"google.golang.org/protobuf/types/known/structpb"
)

type CreateAgentState struct {
	Owner     string
	PollCount int
}

func createAgentHandler(op *lro.Operation) {
	state := &CreateAgentState{}
	if err := op.DecodePrivateState(state); err != nil {
		return
	}

	meta := &structpb.Struct{}
	_, _ = lro.UnmarshalMetadata(op, meta)

	state.PollCount++
	if err := op.SavePrivateState(state); err != nil {
		return
	}
	_ = op.Complete(&emptypb.Empty{})
}

func main() {
	// Provision the backing Spanner table before creating the client.
	// For neuron "launchpad-v1" the table name is:
	//   ${replace(project, "-", "_")}_launchpad_v1_Operations
	// See the package docs for the Terraform snippet that provisions the required table and TTL policy.
	//
	// Launchpad creates the client once, registers resumable handlers at startup,
	// registers the HTTP callback routes on the mux, and then creates/schedules
	// operations from RPC methods.
	mux := http.NewServeMux()
	client, err := lro.New(context.Background(), lro.Config{
		Neuron:                   "launchpad-v1",
		Project:                  "example-project",
		SpannerProject:           "example-spanner-project",
		SpannerInstance:          "example-instance",
		SpannerDatabase:          "example-db",
		CloudTasksProject:        "example-project",
		CloudTasksLocation:       "europe-west1",
		CloudTasksQueue:          "launchpad-v1-operations",
		CloudTasksServiceAccount: "alis-build@example-project.iam.gserviceaccount.com",
		Host:                     "https://launchpad-backend.example.com",
	})
	if err != nil {
		return
	}
	defer client.Close()
	if err := client.AddResumableHandlers(
		lro.ResumableHandler{Path: "create-agent", Handler: createAgentHandler},
	); err != nil {
		return
	}
	client.RegisterHTTP(mux)

	ctx := context.Background()
	op, err := client.NewOperation(ctx, "operations/"+uuid.NewString(), &structpb.Struct{
		Fields: map[string]*structpb.Value{
			"target":         structpb.NewStringValue("streams/abc"),
			"status_message": structpb.NewStringValue("Processing content..."),
		},
	})
	if err != nil {
		return
	}

	if err := op.SavePrivateState(&CreateAgentState{
		Owner:     "users/123",
		PollCount: 0,
	}); err != nil {
		return
	}

	_ = op.ResumeViaTasks("create-agent", 5*time.Second)
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustUnmarshalMetadata

func MustUnmarshalMetadata[MdT proto.Message](op *Operation, md MdT) MdT

MustUnmarshalMetadata unmarshals operation metadata into md and fatals on error.

func RegisterGRPC added in v2.6.0

func RegisterGRPC(registrar grpc.ServiceRegistrar, client *Client, opts ...OperationsServerOption)

RegisterGRPC wires the standard google.longrunning.Operations service into a gRPC server or any other ServiceRegistrar.

func UnmarshalMetadata

func UnmarshalMetadata[MdT proto.Message](op *Operation, md MdT) (MdT, error)

UnmarshalMetadata unmarshals operation metadata into md.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages long-running operations for a specific neuron.

func New

func New(ctx context.Context, cfg Config) (*Client, error)

New constructs a new LRO client from explicit configuration.

func NewFromEnv added in v2.1.0

func NewFromEnv(ctx context.Context, neuron string, opts ...Option) (*Client, error)

NewFromEnv constructs a client using ALIS-managed environment variables.

The default callback host is inferred from the Cloud Run URL pattern `https://{service}-{project-number}.{region}.run.app` using `ALIS_PROJECT_NR` and `ALIS_REGION`. Use WithHost to override that value when required.

func (*Client) AddResumableHandler added in v2.2.0

func (c *Client) AddResumableHandler(path string, handler ResumeHandler) error

AddResumableHandler associates a callback path with the handler that resumes that operation.

func (*Client) AddResumableHandlers added in v2.2.0

func (c *Client) AddResumableHandlers(handlers ...ResumableHandler) error

AddResumableHandlers adds multiple resumable handlers to the client.

func (*Client) Close

func (c *Client) Close() error

Close closes the underlying Spanner and Cloud Tasks clients.

func (*Client) GetOperation

func (c *Client) GetOperation(ctx context.Context, operationName string) (*Operation, error)

GetOperation retrieves an operation wrapper by operation name.

func (*Client) GetOperationPb

func (c *Client) GetOperationPb(ctx context.Context, name string) (*longrunningpb.Operation, error)

GetOperationPb retrieves the underlying protobuf operation by name.

func (*Client) NewOperation

func (c *Client) NewOperation(ctx context.Context, operationName string, md proto.Message) (*Operation, error)

NewOperation creates a new operation row and stores its initial metadata.

func (*Client) OperationsServer added in v2.3.0

func (c *Client) OperationsServer(opts ...OperationsServerOption) *OperationsServer

OperationsServer returns a google.longrunning.Operations server backed by the client.

func (*Client) RegisterGRPC added in v2.7.0

func (c *Client) RegisterGRPC(registrar grpc.ServiceRegistrar, opts ...OperationsServerOption)

RegisterGRPC wires the standard google.longrunning.Operations service into a gRPC server or any other ServiceRegistrar.

func (*Client) RegisterHTTP added in v2.6.0

func (c *Client) RegisterHTTP(mux *http.ServeMux)

RegisterHTTP registers the default resumable callback routes on mux.

func (*Client) RegisterHTTPAtPrefix added in v2.6.0

func (c *Client) RegisterHTTPAtPrefix(mux *http.ServeMux, prefix string)

RegisterHTTPAtPrefix registers resumable operation callbacks using the supplied path prefix.

func (*Client) RegisterHTTPHandlers deprecated added in v2.1.0

func (c *Client) RegisterHTTPHandlers(mux *http.ServeMux)

RegisterHTTPHandlers registers the default resumable callback routes on mux.

Deprecated: use Client.RegisterHTTP.

func (*Client) RegisterHTTPHandlersAtPrefix deprecated added in v2.1.0

func (c *Client) RegisterHTTPHandlersAtPrefix(mux *http.ServeMux, prefix string)

RegisterHTTPHandlersAtPrefix registers resumable operation callbacks using the supplied path prefix.

Deprecated: use Client.RegisterHTTPAtPrefix.

type Config added in v2.1.0

type Config struct {
	Neuron string

	// Project is the owning project used to derive the operations table name.
	Project string

	SpannerProject  string
	SpannerInstance string
	SpannerDatabase string
	DatabaseRole    string

	CloudTasksProject        string
	CloudTasksLocation       string
	CloudTasksQueue          string
	CloudTasksServiceAccount string

	Host string
}

Config configures a Client created with New.

type Operation

type Operation struct {
	Ctx context.Context
	// contains filtered or unexported fields
}

Operation is a long-running operation managed by a Client.

func (*Operation) Complete

func (o *Operation) Complete(resp proto.Message) error

Complete marks the operation done with a successful response and persists it.

func (*Operation) DecodePrivateState

func (o *Operation) DecodePrivateState(state any) error

DecodePrivateState decodes the operation's private state into state.

func (*Operation) Fail

func (o *Operation) Fail(reason string, args ...any) error

Fail marks the operation done with an error message and persists it.

func (*Operation) OperationPb

func (o *Operation) OperationPb() *longrunningpb.Operation

OperationPb returns the underlying protobuf operation.

func (*Operation) ResumePoint

func (o *Operation) ResumePoint() string

ResumePoint returns the operation's current resume point.

func (*Operation) ResumeViaTasks

func (o *Operation) ResumeViaTasks(path string, waitDuration time.Duration) error

ResumeViaTasks saves the operation and schedules the registered resume path to run later via Cloud Tasks.

RegisterHTTP is only required in processes that actually need to serve the resumable callback route. Local callers can also resume directly without binding HTTP handlers.

func (*Operation) Save

func (o *Operation) Save() error

Save persists the current operation row.

func (*Operation) SaveMetadata

func (o *Operation) SaveMetadata(md proto.Message) error

SaveMetadata updates the operation metadata and persists the operation.

func (*Operation) SavePrivateState

func (o *Operation) SavePrivateState(state any) error

SavePrivateState updates the private state and persists the operation.

func (*Operation) SaveResumePoint

func (o *Operation) SaveResumePoint(resumePoint string) error

SaveResumePoint updates the resume point and persists the operation.

func (*Operation) SetMetadata

func (o *Operation) SetMetadata(md proto.Message) error

SetMetadata updates the operation metadata without persisting the operation.

func (*Operation) SetPrivateState

func (o *Operation) SetPrivateState(state any) error

SetPrivateState updates the private state without persisting the operation.

func (*Operation) SetResumePoint

func (o *Operation) SetResumePoint(resumePoint string)

SetResumePoint updates the resume point without persisting the operation.

type OperationRow

type OperationRow struct {
	Operation *longrunningpb.Operation
	// Internal state to track about the operation.
	// Use the operation's metadata for information that should be visible to clients.
	State []byte
	// Checkpoint to resume from.
	ResumePoint string
	// The last time this operation was updated. Automatically set when writing to Spanner.
	UpdateTime time.Time
}

OperationRow is the Spanner-backed representation of an operation.

type OperationsServer added in v2.3.0

type OperationsServer struct {
	longrunningpb.UnimplementedOperationsServer
	// contains filtered or unexported fields
}

OperationsServer serves the standard google.longrunning.Operations RPCs using an LRO client.

func NewOperationsServer added in v2.3.0

func NewOperationsServer(client *Client, opts ...OperationsServerOption) *OperationsServer

NewOperationsServer constructs a google.longrunning.Operations server backed by the client.

func (*OperationsServer) GetOperation added in v2.3.0

GetOperation retrieves the latest state of a long-running operation.

func (*OperationsServer) WaitOperation added in v2.3.0

WaitOperation waits until the operation is done or the effective timeout elapses.

type OperationsServerOption added in v2.3.0

type OperationsServerOption func(*OperationsServer)

OperationsServerOption configures an OperationsServer.

func WithDefaultWaitTimeout added in v2.3.0

func WithDefaultWaitTimeout(timeout time.Duration) OperationsServerOption

WithDefaultWaitTimeout sets the default timeout used when WaitOperation does not specify one.

func WithWaitPolling added in v2.3.0

func WithWaitPolling(initialDelay, maxDelay time.Duration) OperationsServerOption

WithWaitPolling configures the initial and maximum polling intervals for WaitOperation.

type Option

type Option func(*options)

Option configures env-derived client construction.

func WithHost

func WithHost(host string) Option

WithHost overrides the callback host used by NewFromEnv.

type ResumableHandler added in v2.2.0

type ResumableHandler struct {
	// Path is the callback path segment registered under the resumable HTTP prefix.
	// For example, if Path is "create-agent" and the prefix is "/resume-operation/",
	// the callback route becomes "/resume-operation/create-agent".
	Path string
	// Handler resumes the operation when the registered callback path is invoked.
	Handler ResumeHandler
}

ResumableHandler associates a callback path with the handler that resumes that operation.

type ResumeHandler added in v2.1.1

type ResumeHandler func(*Operation)

ResumeHandler resumes a previously scheduled long-running operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL