Skip to content
Merged
2 changes: 1 addition & 1 deletion pubsub/v2/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestIntegration_CancelReceive(t *testing.T) {
return
default:
publisher.Publish(ctx, &Message{Data: []byte("some msg")})
time.Sleep(time.Second)
time.Sleep(10 * time.Second)
}
}
}()
Expand Down
41 changes: 29 additions & 12 deletions pubsub/v2/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {

var rmsgs []*pb.ReceivedMessage
var err error
// This is a blocking call, because reading from the stream blocks.
// We want to make sure this is canceled.
rmsgs, err = it.recvMessages()
// If stopping the iterator results in the grpc stream getting shut down and
// returning an error here, treat the same as above and return EOF.
Expand Down Expand Up @@ -359,9 +361,9 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {

// If exactly once is enabled, we should wait until modack responses are successes
// before attempting to process messages.
it.sendModAck(ackIDs, deadline, false, true)
ctx := context.Background()
it.sendModAck(ctx, ackIDs, deadline, false, true)
for ackID, ar := range ackIDs {
ctx := context.Background()
_, err := ar.Get(ctx)
if err != nil {
delete(pendingMessages, ackID)
Expand Down Expand Up @@ -508,16 +510,16 @@ func (it *messageIterator) sender() {
}
if sendNacks {
// Nack indicated by modifying the deadline to zero.
it.sendModAck(nacks, 0, false, false)
it.sendModAck(context.Background(), nacks, 0, false, false)
}
if sendModAcks {
it.sendModAck(modAcks, dl, true, false)
it.sendModAck(context.Background(), modAcks, dl, true, false)
}
if sendPing {
it.pingStream()
}
if sendReceipt {
it.sendModAck(receipts, dl, true, true)
it.sendModAck(context.Background(), receipts, dl, true, true)
}
}
}
Expand Down Expand Up @@ -559,7 +561,7 @@ type ackFunc = func(ctx context.Context, subName string, ackIds []string) error
type ackRecordStat = func(ctx context.Context, toSend []string)
type retryAckFunc = func(toRetry map[string]*ipubsub.AckResult)

func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) {
func (it *messageIterator) sendAckWithFunc(ctx context.Context, m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) {
ackIDs := make([]string, 0, len(m))
for ackID := range m {
ackIDs = append(ackIDs, ackID)
Expand All @@ -575,9 +577,7 @@ func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackF
go func(toSend []string) {
defer wg.Done()
ackRecordStat(it.ctx, toSend)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
cctx, cancel2 := context.WithTimeout(ctx, 60*time.Second)
defer cancel2()
err := ackFunc(cctx, it.subName, toSend)
if exactlyOnceDelivery {
Expand All @@ -602,7 +602,8 @@ func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackF
// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
// enabled, we'll retry these messages for a short duration in a goroutine.
func (it *messageIterator) sendAck(m map[string]*AckResult) {
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIDs []string) error {
ctx := context.Background()
it.sendAckWithFunc(ctx, m, func(ctx context.Context, subName string, ackIDs []string) error {
// For each ackID (message), setup links to the main subscribe span.
// If this is a nack, also remove it from active spans.
// If the ackID is not found, don't create any more spans.
Expand Down Expand Up @@ -667,7 +668,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
// percentile in order to capture the highest amount of time necessary without
// considering 1% outliers. If the ModAck RPC fails and exactly once delivery is
// enabled, we retry it in a separate goroutine for a short duration.
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid, isReceipt bool) {
func (it *messageIterator) sendModAck(ctx context.Context, m map[string]*AckResult, deadline time.Duration, logOnInvalid, isReceipt bool) {
deadlineSec := int32(deadline / time.Second)
isNack := deadline == 0
var spanName, eventStart, eventEnd string
Expand All @@ -680,7 +681,7 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
eventStart = eventModackStart
eventEnd = eventModackEnd
}
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIDs []string) error {
it.sendAckWithFunc(ctx, m, func(ctx context.Context, subName string, ackIDs []string) error {
if it.enableTracing {
// For each ackID (message), link back to the main subscribe span.
// If this is a nack, also remove it from active spans.
Expand Down Expand Up @@ -1016,3 +1017,19 @@ func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult,
}
return completedResults, retryResults
}

// nackInventory nacks all the current messages being held by the iterator.
// This does not stop the existing callbacks, and does not try to remove
// messages from the scheduler. This is used specifically for when the
// user configured ShutdownOptions is set to NackImmediately
func (it *messageIterator) nackInventory(ctx context.Context) {
it.mu.Lock()
defer it.mu.Unlock()

toNack := make(map[string]*ipubsub.AckResult)
for ackID := range it.keepAliveDeadlines {
// Use a dummy AckResult since we don't propagate nacks back to the user.
toNack[ackID] = newSuccessAckResult()
}
it.sendModAck(ctx, toNack, 0, false, false)
}
57 changes: 57 additions & 0 deletions pubsub/v2/shutdown.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pubsub

import "time"

// ShutdownOptions configures the shutdown behavior of the subscriber.
// When ShutdownOptions is nil, the client library will
// assume disabled/infinite timeout.
//
// Warning: The interaction between Timeout and Behavior might be surprising.
// Read about the interaction of these below to ensure you
// get the desired behavior.
type ShutdownOptions struct {
// Timeout specifies the time the subscriber should wait
// before forcefully shutting down..
// In ShutdownBehaviorNackImmediately mode, this configures the timeout
// for message nacks before shutting down.
//
// Set to zero to immediately shutdown.
// Set to a negative value to disable timeout.
// Both zero and negative values overrides the ShutdownBehavior.
Timeout time.Duration
Comment thread
hongalex marked this conversation as resolved.

// Behavior defines the strategy the subscriber should use when
// shutting down (wait or nack messages).
// When ShutdownOptions is set, but Timeout is unspecified, the default zero-value
// will result in immediate shutdown. When needing a specific a behavior,
// always set a non-zero Timeout.
Behavior ShutdownBehavior
}

// ShutdownBehavior defines the strategy the subscriber should take when
// shutting down. Current options are graceful shutdown vs nacking messages.
type ShutdownBehavior int

const (
// ShutdownBehaviorWaitForProcessing means the subscriber client will wait for
// outstanding messages to be processed.
ShutdownBehaviorWaitForProcessing = iota
Comment thread
hongalex marked this conversation as resolved.

// ShutdownBehaviorNackImmediately means the subscriber client will nack all
// outstanding messages before closing.
ShutdownBehaviorNackImmediately
)
157 changes: 157 additions & 0 deletions pubsub/v2/shutdown_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pubsub

import (
"context"
"sync"
"testing"
"time"

pb "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func TestShutdown_NackImmediately(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "projects/p/topics/t")
sub := mustCreateSubConfig(t, client, &pb.Subscription{
Name: "projects/p/subscriptions/s",
Topic: topic.String(),
})

// Part of this test: pretend to extend the min duration quite a bit so we can test
// if the message has been properly nacked.
sub.ReceiveSettings.MinDurationPerAckExtension = 10 * time.Minute
sub.ReceiveSettings.ShutdownOptions = &ShutdownOptions{
Behavior: ShutdownBehaviorNackImmediately,
Timeout: 1 * time.Minute,
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := topic.Publish(ctx, &Message{Data: []byte("m1")}).Get(ctx)
if err != nil {
t.Errorf("Publish().Get() got err: %v", err)
}
}()
wg.Wait()

cctx, ccancel := context.WithCancel(ctx)
go sub.Receive(cctx, func(ctx context.Context, m *Message) {
// First time receiving, cancel the context to trigger shutdown.
// Don't cancel away to avoid race condition with fake.
time.AfterFunc(2*time.Second, ccancel)
})

// Wait for the message to be redelivered.
time.Sleep(5 * time.Second)

var received int
var receiveLock sync.Mutex
ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
err := sub.Receive(ctx2, func(ctx context.Context, m *Message) {
receiveLock.Lock()
defer receiveLock.Unlock()
received++
m.Ack()
cancel()
})
if err != nil {
t.Errorf("got err from recv: %v", err)
}
if received != 1 {
t.Errorf("expected 1 delivery, got %d", received)
}
}

func TestShutdown_WaitForProcessing(t *testing.T) {
t.Parallel()
tests := []struct {
name string
shutdownTimeout time.Duration
expectedTimeout time.Duration
minTime time.Duration
}{
{
name: "BailImmediately",
shutdownTimeout: 0 * time.Second,
expectedTimeout: 5 * time.Second,
},
{
name: "WithTimeout",
shutdownTimeout: 5 * time.Second,
expectedTimeout: 6 * time.Second,
minTime: 4 * time.Second,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "projects/p/topics/t")
sub := mustCreateSubConfig(t, client, &pb.Subscription{
Name: "projects/p/subscriptions/s",
Topic: topic.String(),
})
sub.ReceiveSettings.ShutdownOptions = &ShutdownOptions{
Behavior: ShutdownBehaviorWaitForProcessing,
Timeout: tc.shutdownTimeout,
}
processingTime := 1 * time.Hour

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := topic.Publish(ctx, &Message{Data: []byte("m1")}).Get(ctx)
if err != nil {
t.Errorf("Publish().Get() got err: %v", err)
}
}()
wg.Wait()

cctx, cancel2 := context.WithCancel(ctx)
defer cancel2()
start := time.Now()
sub.Receive(cctx, func(ctx context.Context, m *Message) {
cancel()
// Simulate a long processing message that we want to cancel right away.
// The message should never be acked since we expect the client to bail early.
time.Sleep(processingTime)
m.Ack()
})

elapsed := time.Since(start)
if elapsed > tc.expectedTimeout {
t.Errorf("expected quick cancellation, elapsed: %v, want less than: %v", elapsed, tc.expectedTimeout)
}
if tc.minTime > 0 && elapsed < tc.minTime {
t.Errorf("expected to wait for shutdown, elapsed: %v, want greater than: %v", elapsed, tc.minTime)
}
})
}
}
4 changes: 3 additions & 1 deletion pubsub/v2/streaming_pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer
}
}
server.wait()
server.mu.Lock()
defer server.mu.Unlock()
for i := 0; i < len(msgs); i++ {
id := msgs[i].AckId
if i%2 == 0 {
Expand Down Expand Up @@ -183,8 +185,8 @@ func TestStreamingPullCancel(t *testing.T) {
}

func TestStreamingPullRetry(t *testing.T) {
// Check that we retry on io.EOF or Unavailable.
t.Parallel()
// Check that we retry on io.EOF or Unavailable.
client, server := newMock(t)
defer server.srv.Close()
defer client.Close()
Expand Down
Loading
Loading