Documentation
¶
Overview ¶
Package lro provides a client for managing long-running operations stored in Spanner and resumed via Cloud Tasks.
Before using this package, provision the backing Spanner table for the target `neuron`. The client expects a table named:
${replace(project, "-", "_")}_${replace(neuron, "-", "_")}_Operations
The required schema is:
- `key` `STRING` primary key, computed/stored from `Operation.name`
- `Operation` `PROTO<google.longrunning.Operation>` required
- `State` `BYTES` nullable
- `ResumePoint` `STRING` nullable
- `UpdateTime` `TIMESTAMP` required
For ALIS-managed services, wire this the same way as the other infra modules:
```hcl infra/
main.tf
apis.tf
cloudrun.tf
spanner.tf
variables.tf
modules/
alis.lro.v2/
main.tf
```
`infra/main.tf` should wire the module like this:
```hcl
module "alis_lro_v2" {
source = "./modules/alis.lro.v2"
alis_os_project = var.ALIS_OS_PROJECT
alis_region = var.ALIS_REGION
alis_project_nr = var.ALIS_PROJECT_NR
alis_managed_spanner_project = var.ALIS_MANAGED_SPANNER_PROJECT
alis_managed_spanner_instance = var.ALIS_MANAGED_SPANNER_INSTANCE
alis_managed_spanner_db = var.ALIS_MANAGED_SPANNER_DB
neuron = local.neuron
depends_on = [google_project_service.environment]
}
```
Inside `modules/alis.lro.v2/main.tf`, provision the table and TTL policy that match the client contract:
```hcl
resource "alis_google_spanner_table" "operations" {
project = var.alis_managed_spanner_project
instance = var.alis_managed_spanner_instance
database = var.alis_managed_spanner_db
name = "${replace(var.alis_os_project, "-", "_")}_${replace(var.neuron, "-", "_")}_Operations"
prevent_destroy = true
schema = {
columns = [
{
name = "key"
is_computed = true
computation_ddl = "Operation.name"
is_stored = true
type = "STRING"
is_primary_key = true
required = true
unique = true
},
{
name = "Operation"
type = "PROTO"
proto_package = "google.longrunning.Operation"
required = true
},
{
name = "State"
type = "BYTES"
required = false
},
{
name = "ResumePoint"
type = "STRING"
required = false
},
{
name = "UpdateTime"
type = "TIMESTAMP"
required = true
},
]
}
}
resource "alis_google_spanner_table_ttl_policy" "operations" {
project = alis_google_spanner_table.operations.project
instance = alis_google_spanner_table.operations.instance
database = alis_google_spanner_table.operations.database
table = alis_google_spanner_table.operations.name
column = "UpdateTime"
ttl = 90
}
```
That module should line up with the runtime values passed to `lro.New` or derived by `lro.NewFromEnv`:
- `Project` / `CloudTasksProject` -> `ALIS_OS_PROJECT`
- `SpannerProject` -> `ALIS_MANAGED_SPANNER_PROJECT`
- `SpannerInstance` -> `ALIS_MANAGED_SPANNER_INSTANCE`
- `SpannerDatabase` -> `ALIS_MANAGED_SPANNER_DB`
- `CloudTasksQueue` -> `{neuron}-operations`
- `CloudTasksServiceAccount` -> `alis-build@${ALIS_OS_PROJECT}.iam.gserviceaccount.com`
- `Host` -> `https://{neuron}-{ALIS_PROJECT_NR}.{ALIS_REGION}.run.app` unless overridden
The v2 API uses explicit client configuration and explicit HTTP binding:
client, err := lro.New(ctx, lro.Config{
Neuron: "launchpad-v1",
Project: "my-project",
SpannerProject: "my-spanner-project",
SpannerInstance: "my-spanner-instance",
SpannerDatabase: "my-spanner-db",
CloudTasksProject: "my-project",
CloudTasksLocation: "europe-west1",
CloudTasksQueue: "launchpad-v1-operations",
CloudTasksServiceAccount: "alis-build@my-project.iam.gserviceaccount.com",
Host: "https://launchpad-backend.example.com",
})
if err != nil {
return err
}
if err := client.AddResumableHandlers(
lro.ResumableHandler{Path: "create-agent", Handler: createAgentHandler},
); err != nil {
return err
}
client.RegisterHTTP(mux)
client.RegisterGRPC(grpcServer)
op, err := client.NewOperation(ctx, "operations/123", metadata)
if err := op.ResumeViaTasks("create-agent", 0); err != nil {
return err
}
Services that use ALIS-managed infrastructure can construct the client from env:
client, err := lro.NewFromEnv(ctx, "launchpad-v1")
if err != nil {
return err
}
if err := client.AddResumableHandlers(
lro.ResumableHandler{Path: "create-agent", Handler: createAgentHandler},
); err != nil { return err }
client.RegisterHTTP(mux)
client.RegisterGRPC(grpcServer)
`NewFromEnv` infers the callback host from the Cloud Run URL pattern and these env vars:
- `ALIS_PROJECT_NR`
- `ALIS_REGION`
The inferred host has this form:
https://{service}-{project-number}.{region}.run.app
That host can be overridden when needed:
client, err := lro.NewFromEnv(ctx, "launchpad-v1", lro.WithHost("https://launchpad-backend.example.com"))
Importing the package never validates env vars or panics.
Here is a typical implementation flow:
Create a shared client once during process startup:
client, err := lro.NewFromEnv(ctx, "launchpad-v1") if err != nil { return err }
Register resumable handlers during startup, not when the RPC is called. Use single or batch flows depending on the service setup:
Register one or more handlers in one place:
if err := client.AddResumableHandlers( lro.ResumableHandler{Path: "publish-agent", Handler: publishAgentHandler}, lro.ResumableHandler{Path: "submit-agent-feedback", Handler: submitAgentFeedbackHandler}, lro.ResumableHandler{Path: "create-agent-from-content", Handler: createAgentFromContentHandler}, ); err != nil { return err }
Expose both the Operations API and the resumable HTTP callback routes:
Typically, in the server.go add the following:
client.RegisterGRPC(grpcServer) client.RegisterHTTP(mux)
In the RPC method, create the operation, attach metadata for clients, save private state for the resumable workflow, and schedule the first callback:
op, err := client.NewOperation(ctx, "operations/"+uuid.NewString(), metadata) if err != nil { return nil, err } if err := op.SavePrivateState(&MyState{ UserID: userID, Stream: streamName, }); err != nil { return nil, err } if err := op.ResumeViaTasks("create-agent-from-content", 0); err != nil { return nil, err } return op.OperationPb(), nil
In the resumable handler, restore private state, optionally unmarshal and update metadata, then either requeue or finish the operation:
func createAgentFromContentHandler(op *lro.Operation) { state := &MyState{} if err := op.DecodePrivateState(state); err != nil { op.Fail("decode private state: %v", err) return }
meta := &pb.CreateAgentFromContentMetadata{} if _, err := lro.UnmarshalMetadata(op, meta); err != nil { op.Fail("unmarshal metadata: %v", err) return }
meta.StatusMessage = "Waiting for content processing..." if err := op.SaveMetadata(meta); err != nil { op.Fail("save metadata: %v", err) return }
if stillWaiting { if err := op.ResumeViaTasks("create-agent-from-content", 2*time.Second); err != nil { op.Fail("reschedule task: %v", err) } return }
_ = op.Complete(response) }
This split is intentional:
- operation metadata is the client-visible status surface
- private state is for workflow-only data such as user ids, upstream resource names, poll counts, or serialized requests
For Cloud Tasks driven handlers that call other services, Launchpad also uses `context.WithoutCancel(ctx)` before creating outbound RPC metadata. That avoids propagating the Cloud Tasks dispatch deadline to downstream services that may schedule their own async work.
Mental model for building an RPC or method that returns an LRO:
- At service startup, add a resumable handler for that workflow and register the HTTP handlers on your mux.
- In the RPC that creates the operation, create the LRO, persist any private state needed to continue later, and call `ResumeViaTasks(path, delay)`.
- In the resumable handler, reload state and metadata from the operation, advance the workflow, and either: - call `ResumeViaTasks(path, nextDelay)` again to continue later, or - call `Complete(...)` / `Fail(...)` to finish the operation.
The important design rule is that the resumable handler must be registered at startup. Do not rely on scheduling time to create HTTP routes, because a future Cloud Tasks callback may land on a fresh instance that never executed the original scheduling request.
Example (ResumeViaTasks) ¶
package main
import (
"context"
"net/http"
"time"
"github.com/google/uuid"
lro "go.alis.build/lro/v2"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/structpb"
)
type CreateAgentState struct {
Owner string
PollCount int
}
func createAgentHandler(op *lro.Operation) {
state := &CreateAgentState{}
if err := op.DecodePrivateState(state); err != nil {
return
}
meta := &structpb.Struct{}
_, _ = lro.UnmarshalMetadata(op, meta)
state.PollCount++
if err := op.SavePrivateState(state); err != nil {
return
}
_ = op.Complete(&emptypb.Empty{})
}
func main() {
// Provision the backing Spanner table before creating the client.
// For neuron "launchpad-v1" the table name is:
// ${replace(project, "-", "_")}_launchpad_v1_Operations
// See the package docs for the Terraform snippet that provisions the required table and TTL policy.
//
// Launchpad creates the client once, registers resumable handlers at startup,
// registers the HTTP callback routes on the mux, and then creates/schedules
// operations from RPC methods.
mux := http.NewServeMux()
client, err := lro.New(context.Background(), lro.Config{
Neuron: "launchpad-v1",
Project: "example-project",
SpannerProject: "example-spanner-project",
SpannerInstance: "example-instance",
SpannerDatabase: "example-db",
CloudTasksProject: "example-project",
CloudTasksLocation: "europe-west1",
CloudTasksQueue: "launchpad-v1-operations",
CloudTasksServiceAccount: "alis-build@example-project.iam.gserviceaccount.com",
Host: "https://launchpad-backend.example.com",
})
if err != nil {
return
}
defer client.Close()
if err := client.AddResumableHandlers(
lro.ResumableHandler{Path: "create-agent", Handler: createAgentHandler},
); err != nil {
return
}
client.RegisterHTTP(mux)
ctx := context.Background()
op, err := client.NewOperation(ctx, "operations/"+uuid.NewString(), &structpb.Struct{
Fields: map[string]*structpb.Value{
"target": structpb.NewStringValue("streams/abc"),
"status_message": structpb.NewStringValue("Processing content..."),
},
})
if err != nil {
return
}
if err := op.SavePrivateState(&CreateAgentState{
Owner: "users/123",
PollCount: 0,
}); err != nil {
return
}
_ = op.ResumeViaTasks("create-agent", 5*time.Second)
}
Output:
Index ¶
- func MustUnmarshalMetadata[MdT proto.Message](op *Operation, md MdT) MdT
- func RegisterGRPC(registrar grpc.ServiceRegistrar, client *Client, ...)
- func UnmarshalMetadata[MdT proto.Message](op *Operation, md MdT) (MdT, error)
- type Client
- func (c *Client) AddResumableHandler(path string, handler ResumeHandler) error
- func (c *Client) AddResumableHandlers(handlers ...ResumableHandler) error
- func (c *Client) Close() error
- func (c *Client) GetOperation(ctx context.Context, operationName string) (*Operation, error)
- func (c *Client) GetOperationPb(ctx context.Context, name string) (*longrunningpb.Operation, error)
- func (c *Client) NewOperation(ctx context.Context, operationName string, md proto.Message) (*Operation, error)
- func (c *Client) OperationsServer(opts ...OperationsServerOption) *OperationsServer
- func (c *Client) RegisterGRPC(registrar grpc.ServiceRegistrar, opts ...OperationsServerOption)
- func (c *Client) RegisterHTTP(mux *http.ServeMux)
- func (c *Client) RegisterHTTPAtPrefix(mux *http.ServeMux, prefix string)
- func (c *Client) RegisterHTTPHandlers(mux *http.ServeMux)deprecated
- func (c *Client) RegisterHTTPHandlersAtPrefix(mux *http.ServeMux, prefix string)deprecated
- type Config
- type Operation
- func (o *Operation) Complete(resp proto.Message) error
- func (o *Operation) DecodePrivateState(state any) error
- func (o *Operation) Fail(reason string, args ...any) error
- func (o *Operation) OperationPb() *longrunningpb.Operation
- func (o *Operation) ResumePoint() string
- func (o *Operation) ResumeViaTasks(path string, waitDuration time.Duration) error
- func (o *Operation) Save() error
- func (o *Operation) SaveMetadata(md proto.Message) error
- func (o *Operation) SavePrivateState(state any) error
- func (o *Operation) SaveResumePoint(resumePoint string) error
- func (o *Operation) SetMetadata(md proto.Message) error
- func (o *Operation) SetPrivateState(state any) error
- func (o *Operation) SetResumePoint(resumePoint string)
- type OperationRow
- type OperationsServer
- type OperationsServerOption
- type Option
- type ResumableHandler
- type ResumeHandler
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MustUnmarshalMetadata ¶
MustUnmarshalMetadata unmarshals operation metadata into md and fatals on error.
func RegisterGRPC ¶ added in v2.6.0
func RegisterGRPC(registrar grpc.ServiceRegistrar, client *Client, opts ...OperationsServerOption)
RegisterGRPC wires the standard google.longrunning.Operations service into a gRPC server or any other ServiceRegistrar.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client manages long-running operations for a specific neuron.
func NewFromEnv ¶ added in v2.1.0
NewFromEnv constructs a client using ALIS-managed environment variables.
The default callback host is inferred from the Cloud Run URL pattern `https://{service}-{project-number}.{region}.run.app` using `ALIS_PROJECT_NR` and `ALIS_REGION`. Use WithHost to override that value when required.
func (*Client) AddResumableHandler ¶ added in v2.2.0
func (c *Client) AddResumableHandler(path string, handler ResumeHandler) error
AddResumableHandler associates a callback path with the handler that resumes that operation.
func (*Client) AddResumableHandlers ¶ added in v2.2.0
func (c *Client) AddResumableHandlers(handlers ...ResumableHandler) error
AddResumableHandlers adds multiple resumable handlers to the client.
func (*Client) GetOperation ¶
GetOperation retrieves an operation wrapper by operation name.
func (*Client) GetOperationPb ¶
GetOperationPb retrieves the underlying protobuf operation by name.
func (*Client) NewOperation ¶
func (c *Client) NewOperation(ctx context.Context, operationName string, md proto.Message) (*Operation, error)
NewOperation creates a new operation row and stores its initial metadata.
func (*Client) OperationsServer ¶ added in v2.3.0
func (c *Client) OperationsServer(opts ...OperationsServerOption) *OperationsServer
OperationsServer returns a google.longrunning.Operations server backed by the client.
func (*Client) RegisterGRPC ¶ added in v2.7.0
func (c *Client) RegisterGRPC(registrar grpc.ServiceRegistrar, opts ...OperationsServerOption)
RegisterGRPC wires the standard google.longrunning.Operations service into a gRPC server or any other ServiceRegistrar.
func (*Client) RegisterHTTP ¶ added in v2.6.0
RegisterHTTP registers the default resumable callback routes on mux.
func (*Client) RegisterHTTPAtPrefix ¶ added in v2.6.0
RegisterHTTPAtPrefix registers resumable operation callbacks using the supplied path prefix.
func (*Client) RegisterHTTPHandlers
deprecated
added in
v2.1.0
RegisterHTTPHandlers registers the default resumable callback routes on mux.
Deprecated: use Client.RegisterHTTP.
func (*Client) RegisterHTTPHandlersAtPrefix
deprecated
added in
v2.1.0
RegisterHTTPHandlersAtPrefix registers resumable operation callbacks using the supplied path prefix.
Deprecated: use Client.RegisterHTTPAtPrefix.
type Config ¶ added in v2.1.0
type Config struct {
Neuron string
// Project is the owning project used to derive the operations table name.
Project string
SpannerProject string
SpannerInstance string
SpannerDatabase string
DatabaseRole string
CloudTasksProject string
CloudTasksLocation string
CloudTasksQueue string
CloudTasksServiceAccount string
Host string
}
Config configures a Client created with New.
type Operation ¶
Operation is a long-running operation managed by a Client.
func (*Operation) Complete ¶
Complete marks the operation done with a successful response and persists it.
func (*Operation) DecodePrivateState ¶
DecodePrivateState decodes the operation's private state into state.
func (*Operation) OperationPb ¶
func (o *Operation) OperationPb() *longrunningpb.Operation
OperationPb returns the underlying protobuf operation.
func (*Operation) ResumePoint ¶
ResumePoint returns the operation's current resume point.
func (*Operation) ResumeViaTasks ¶
ResumeViaTasks saves the operation and schedules the registered resume path to run later via Cloud Tasks.
RegisterHTTP is only required in processes that actually need to serve the resumable callback route. Local callers can also resume directly without binding HTTP handlers.
func (*Operation) SaveMetadata ¶
SaveMetadata updates the operation metadata and persists the operation.
func (*Operation) SavePrivateState ¶
SavePrivateState updates the private state and persists the operation.
func (*Operation) SaveResumePoint ¶
SaveResumePoint updates the resume point and persists the operation.
func (*Operation) SetMetadata ¶
SetMetadata updates the operation metadata without persisting the operation.
func (*Operation) SetPrivateState ¶
SetPrivateState updates the private state without persisting the operation.
func (*Operation) SetResumePoint ¶
SetResumePoint updates the resume point without persisting the operation.
type OperationRow ¶
type OperationRow struct {
Operation *longrunningpb.Operation
// Internal state to track about the operation.
// Use the operation's metadata for information that should be visible to clients.
State []byte
// Checkpoint to resume from.
ResumePoint string
// The last time this operation was updated. Automatically set when writing to Spanner.
UpdateTime time.Time
}
OperationRow is the Spanner-backed representation of an operation.
type OperationsServer ¶ added in v2.3.0
type OperationsServer struct {
longrunningpb.UnimplementedOperationsServer
// contains filtered or unexported fields
}
OperationsServer serves the standard google.longrunning.Operations RPCs using an LRO client.
func NewOperationsServer ¶ added in v2.3.0
func NewOperationsServer(client *Client, opts ...OperationsServerOption) *OperationsServer
NewOperationsServer constructs a google.longrunning.Operations server backed by the client.
func (*OperationsServer) GetOperation ¶ added in v2.3.0
func (s *OperationsServer) GetOperation(ctx context.Context, req *longrunningpb.GetOperationRequest) (*longrunningpb.Operation, error)
GetOperation retrieves the latest state of a long-running operation.
func (*OperationsServer) WaitOperation ¶ added in v2.3.0
func (s *OperationsServer) WaitOperation(ctx context.Context, req *longrunningpb.WaitOperationRequest) (*longrunningpb.Operation, error)
WaitOperation waits until the operation is done or the effective timeout elapses.
type OperationsServerOption ¶ added in v2.3.0
type OperationsServerOption func(*OperationsServer)
OperationsServerOption configures an OperationsServer.
func WithDefaultWaitTimeout ¶ added in v2.3.0
func WithDefaultWaitTimeout(timeout time.Duration) OperationsServerOption
WithDefaultWaitTimeout sets the default timeout used when WaitOperation does not specify one.
func WithWaitPolling ¶ added in v2.3.0
func WithWaitPolling(initialDelay, maxDelay time.Duration) OperationsServerOption
WithWaitPolling configures the initial and maximum polling intervals for WaitOperation.
type ResumableHandler ¶ added in v2.2.0
type ResumableHandler struct {
// Path is the callback path segment registered under the resumable HTTP prefix.
// For example, if Path is "create-agent" and the prefix is "/resume-operation/",
// the callback route becomes "/resume-operation/create-agent".
Path string
// Handler resumes the operation when the registered callback path is invoked.
Handler ResumeHandler
}
ResumableHandler associates a callback path with the handler that resumes that operation.
type ResumeHandler ¶ added in v2.1.1
type ResumeHandler func(*Operation)
ResumeHandler resumes a previously scheduled long-running operation.