From 282d662d1c9df5dcbe3cceca24f9ede9f3c0e46b Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 1 Jul 2025 14:56:17 -0700 Subject: [PATCH 01/13] add initial settings --- pubsub/v2/shutdown.go | 29 +++++++++++++++++++++++++++++ pubsub/v2/subscriber.go | 11 +++++++++++ 2 files changed, 40 insertions(+) create mode 100644 pubsub/v2/shutdown.go diff --git a/pubsub/v2/shutdown.go b/pubsub/v2/shutdown.go new file mode 100644 index 000000000000..0c81b2e6404e --- /dev/null +++ b/pubsub/v2/shutdown.go @@ -0,0 +1,29 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +// ShutdownBehavior defines the strategy the subscriber should take when +// shutting down. Current options are graceful shutdown vs nacking messages. +type ShutdownBehavior int + +const ( + // ShutdownBehaviorWaitForProcessing means the subscriber will wait for + // outstanding messages to be processed before nacking messages finally. + ShutdownBehaviorWaitForProcessing = iota + + // ShutdownBehaviorNackImmediately means the subscriber will nack all outstanding + // messages before closing. + ShutdownBehaviorNackImmediately +) diff --git a/pubsub/v2/subscriber.go b/pubsub/v2/subscriber.go index aa21b87d2c7d..dcb8a694e5f3 100644 --- a/pubsub/v2/subscriber.go +++ b/pubsub/v2/subscriber.go @@ -148,6 +148,17 @@ type ReceiveSettings struct { // function passed to Receive on them. To limit the number of messages being // processed concurrently, set MaxOutstandingMessages. NumGoroutines int + + // ShutdownTimeout specifies the time the subscriber should wait + // to shutdown before killing the process. + // Set to 0 to immediately shutdown. + // Set to negative value to disable a duration. + // Defaults to disabled. + ShutdownTimeout time.Duration + + // ShutdownBehavior defines the strategy the subscriber should use when + // shutting down (graceful shutdown vs nack messages). + ShutdownBehavior ShutdownBehavior } // DefaultReceiveSettings holds the default values for ReceiveSettings. From 3f8520764ed07b0e8884f6b9165295c3f4f298a3 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 8 Sep 2025 15:08:36 -0700 Subject: [PATCH 02/13] feat(pubsub/v2): add subscriber shutdown options --- pubsub/v2/iterator.go | 43 +++++++--- pubsub/v2/shutdown.go | 18 ++++ pubsub/v2/shutdown_test.go | 156 +++++++++++++++++++++++++++++++++++ pubsub/v2/subscriber.go | 120 +++++++++++++++++++++------ pubsub/v2/subscriber_test.go | 2 - 5 files changed, 298 insertions(+), 41 deletions(-) create mode 100644 pubsub/v2/shutdown_test.go diff --git a/pubsub/v2/iterator.go b/pubsub/v2/iterator.go index 72cbc4bd4b17..0ecb4ce7e87b 100644 --- a/pubsub/v2/iterator.go +++ b/pubsub/v2/iterator.go @@ -263,6 +263,8 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { var rmsgs []*pb.ReceivedMessage var err error + // This is a blocking call, because reading from the stream blocks. + // We want to make sure this is canceled. rmsgs, err = it.recvMessages() // If stopping the iterator results in the grpc stream getting shut down and // returning an error here, treat the same as above and return EOF. @@ -359,9 +361,9 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // If exactly once is enabled, we should wait until modack responses are successes // before attempting to process messages. - it.sendModAck(ackIDs, deadline, false, true) + ctx := context.Background() + it.sendModAck(ctx, ackIDs, deadline, false, true) for ackID, ar := range ackIDs { - ctx := context.Background() _, err := ar.Get(ctx) if err != nil { delete(pendingMessages, ackID) @@ -508,16 +510,16 @@ func (it *messageIterator) sender() { } if sendNacks { // Nack indicated by modifying the deadline to zero. - it.sendModAck(nacks, 0, false, false) + it.sendModAck(context.Background(), nacks, 0, false, false) } if sendModAcks { - it.sendModAck(modAcks, dl, true, false) + it.sendModAck(context.Background(), modAcks, dl, true, false) } if sendPing { it.pingStream() } if sendReceipt { - it.sendModAck(receipts, dl, true, true) + it.sendModAck(context.Background(), receipts, dl, true, true) } } } @@ -559,7 +561,7 @@ type ackFunc = func(ctx context.Context, subName string, ackIds []string) error type ackRecordStat = func(ctx context.Context, toSend []string) type retryAckFunc = func(toRetry map[string]*ipubsub.AckResult) -func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) { +func (it *messageIterator) sendAckWithFunc(ctx context.Context, m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) { ackIDs := make([]string, 0, len(m)) for ackID := range m { ackIDs = append(ackIDs, ackID) @@ -575,9 +577,7 @@ func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackF go func(toSend []string) { defer wg.Done() ackRecordStat(it.ctx, toSend) - // Use context.Background() as the call's context, not it.ctx. We don't - // want to cancel this RPC when the iterator is stopped. - cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + cctx, cancel2 := context.WithTimeout(ctx, 60*time.Second) defer cancel2() err := ackFunc(cctx, it.subName, toSend) if exactlyOnceDelivery { @@ -602,7 +602,8 @@ func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackF // sendAck is used to confirm acknowledgement of a message. If exactly once delivery is // enabled, we'll retry these messages for a short duration in a goroutine. func (it *messageIterator) sendAck(m map[string]*AckResult) { - it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIDs []string) error { + ctx := context.Background() + it.sendAckWithFunc(ctx, m, func(ctx context.Context, subName string, ackIDs []string) error { // For each ackID (message), setup links to the main subscribe span. // If this is a nack, also remove it from active spans. // If the ackID is not found, don't create any more spans. @@ -667,7 +668,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { // percentile in order to capture the highest amount of time necessary without // considering 1% outliers. If the ModAck RPC fails and exactly once delivery is // enabled, we retry it in a separate goroutine for a short duration. -func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid, isReceipt bool) { +func (it *messageIterator) sendModAck(ctx context.Context, m map[string]*AckResult, deadline time.Duration, logOnInvalid, isReceipt bool) { deadlineSec := int32(deadline / time.Second) isNack := deadline == 0 var spanName, eventStart, eventEnd string @@ -680,7 +681,7 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur eventStart = eventModackStart eventEnd = eventModackEnd } - it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIDs []string) error { + it.sendAckWithFunc(ctx, m, func(ctx context.Context, subName string, ackIDs []string) error { if it.enableTracing { // For each ackID (message), link back to the main subscribe span. // If this is a nack, also remove it from active spans. @@ -823,7 +824,7 @@ func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed")) } if logOnInvalid { - log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs) + ("automatic lease modack retry failed for following IDs: %v", ackIDs) } return } @@ -1016,3 +1017,19 @@ func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult, } return completedResults, retryResults } + +// nackInventory nacks all the current messages being held by the iterator. +// This does not stop the existing callbacks, and does not try to remove +// messages from the scheduler. This is used specifically for when the +// user configured ShutdownOptions is set to NackImmediately +func (it *messageIterator) nackInventory(ctx context.Context) { + it.mu.Lock() + defer it.mu.Unlock() + + toNack := make(map[string]*ipubsub.AckResult) + for ackID := range it.keepAliveDeadlines { + // Use a dummy AckResult since we don't propagate nacks back to the user. + toNack[ackID] = newSuccessAckResult() + } + it.sendModAck(ctx, toNack, 0, false, false) +} diff --git a/pubsub/v2/shutdown.go b/pubsub/v2/shutdown.go index 0c81b2e6404e..d58b8fa38bdb 100644 --- a/pubsub/v2/shutdown.go +++ b/pubsub/v2/shutdown.go @@ -14,6 +14,24 @@ package pubsub +import "time" + +// ShutdownOptions configures the shutdown behavior of the subscriber. +type ShutdownOptions struct { + // Timeout specifies the time the subscriber should wait + // to shutdown before killing the process. + // In nack mode, this specifies how long to wait + // for messages to be nacked after shutdown is initiated. + // + // Set to zero to immediately shutdown. + // Set to a negative value to disable timeout. + Timeout time.Duration + + // Behavior defines the strategy the subscriber should use when + // shutting down (wait or nack messages). + Behavior ShutdownBehavior +} + // ShutdownBehavior defines the strategy the subscriber should take when // shutting down. Current options are graceful shutdown vs nacking messages. type ShutdownBehavior int diff --git a/pubsub/v2/shutdown_test.go b/pubsub/v2/shutdown_test.go new file mode 100644 index 000000000000..24b8cb76a422 --- /dev/null +++ b/pubsub/v2/shutdown_test.go @@ -0,0 +1,156 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "context" + "sync" + "testing" + "time" + + pb "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" +) + +func TestShutdown_NackImmediately(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "projects/p/topics/t") + sub := mustCreateSubConfig(t, client, &pb.Subscription{ + Name: "projects/p/subscriptions/s", + Topic: topic.String(), + }) + + // Part of this test: pretend to extend the min duration quite a bit so we can test + // if the message has been properly nacked. + sub.ReceiveSettings.MinDurationPerAckExtension = 10 * time.Minute + sub.ReceiveSettings.ShutdownOptions.Timeout = 1 * time.Minute + sub.ReceiveSettings.ShutdownOptions.Behavior = ShutdownBehaviorNackImmediately + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := topic.Publish(ctx, &Message{Data: []byte("m1")}).Get(ctx) + if err != nil { + t.Errorf("Publish().Get() got err: %v", err) + } + }() + wg.Wait() + + cctx, ccancel := context.WithCancel(ctx) + go sub.Receive(cctx, func(ctx context.Context, m *Message) { + // First time receiving, cancel the context to trigger shutdown. + // Don't cancel away to avoid race condition with fake. + time.Sleep(10 * time.Second) + ccancel() + }) + + // Wait for the message to be redelivered. + time.Sleep(15 * time.Second) + + // call Receive again + var received int + var receiveLock sync.Mutex + ctx2, cancel := context.WithTimeout(ctx, 30*time.Second) + err := sub.Receive(ctx2, func(ctx context.Context, m *Message) { + receiveLock.Lock() + defer receiveLock.Unlock() + received++ + m.Ack() + cancel() + }) + if err != nil { + t.Errorf("got err from recv: %v", err) + } + if received != 1 { + t.Errorf("expected 1 delivery, got %d", received) + } +} + +func TestShutdown_WaitForProcessing(t *testing.T) { + tests := []struct { + name string + shutdownTimeout time.Duration + expectedTimeout time.Duration + minTime time.Duration + }{ + { + name: "BailImmediately", + shutdownTimeout: 0 * time.Second, + expectedTimeout: 5 * time.Second, + }, + { + name: "15 second timeout", + shutdownTimeout: 15 * time.Second, + expectedTimeout: 16 * time.Second, + minTime: 14 * time.Second, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "projects/p/topics/t") + sub := mustCreateSubConfig(t, client, &pb.Subscription{ + Name: "projects/p/subscriptions/s", + Topic: topic.String(), + }) + sub.ReceiveSettings.ShutdownOptions = &ShutdownOptions{ + Behavior: ShutdownBehaviorWaitForProcessing, + Timeout: tc.shutdownTimeout, + } + processingTime := 1 * time.Hour + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := topic.Publish(ctx, &Message{Data: []byte("m1")}).Get(ctx) + if err != nil { + t.Errorf("Publish().Get() got err: %v", err) + } + }() + wg.Wait() + + cctx, cancel2 := context.WithCancel(ctx) + defer cancel2() + start := time.Now() + sub.Receive(cctx, func(ctx context.Context, m *Message) { + cancel() + // Simulate a long processing message that we want to cancel right away. + // The message should never be acked since we expect the client to bail early. + time.Sleep(processingTime) + m.Ack() + }) + + elapsed := time.Since(start) + if elapsed > tc.expectedTimeout { + t.Errorf("expected quick cancellation, elapsed: %v, want less than: %v", elapsed, tc.expectedTimeout) + } + if tc.minTime > 0 && elapsed < tc.minTime { + t.Errorf("expected to wait for shutdown, elapsed: %v, want greater than: %v", elapsed, tc.minTime) + } + }) + } +} diff --git a/pubsub/v2/subscriber.go b/pubsub/v2/subscriber.go index dcb8a694e5f3..0ec34dbc2ede 100644 --- a/pubsub/v2/subscriber.go +++ b/pubsub/v2/subscriber.go @@ -149,16 +149,9 @@ type ReceiveSettings struct { // processed concurrently, set MaxOutstandingMessages. NumGoroutines int - // ShutdownTimeout specifies the time the subscriber should wait - // to shutdown before killing the process. - // Set to 0 to immediately shutdown. - // Set to negative value to disable a duration. - // Defaults to disabled. - ShutdownTimeout time.Duration - - // ShutdownBehavior defines the strategy the subscriber should use when - // shutting down (graceful shutdown vs nack messages). - ShutdownBehavior ShutdownBehavior + // ShutdownOptions configures the shutdown behavior of the subscriber. + // If unset, the default behavior is to graceful shutdown with no timeout. + ShutdownOptions *ShutdownOptions } // DefaultReceiveSettings holds the default values for ReceiveSettings. @@ -169,6 +162,10 @@ var DefaultReceiveSettings = ReceiveSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, // 1G NumGoroutines: 1, + ShutdownOptions: &ShutdownOptions{ + Behavior: ShutdownBehaviorWaitForProcessing, + Timeout: -1, + }, } var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscriber") @@ -234,6 +231,10 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa if minExtPeriod < 0 { minExtPeriod = DefaultReceiveSettings.MinDurationPerAckExtension } + shutdown := s.ReceiveSettings.ShutdownOptions + if shutdown == nil { + shutdown = DefaultReceiveSettings.ShutdownOptions + } var numGoroutines int switch { @@ -277,6 +278,11 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa ctx2, cancel2 := context.WithCancel(gctx) defer cancel2() + // This context is for forcefully shutting down callbacks if ShutdownTimeout + // is exceeded. + shutdownKillCtx, shutdownKillCancel := context.WithCancel(context.Background()) + defer shutdownKillCancel() + for i := 0; i < numGoroutines; i++ { // The iterator does not use the context passed to Receive. If it did, // canceling that context would immediately stop the iterator without @@ -301,22 +307,43 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa var maxToPull int32 // maximum number of messages to pull // If the context is done, don't pull more messages. select { - case <-ctx.Done(): + case <-ctx2.Done(): return nil default: } - msgs, err := iter.receive(maxToPull) - if errors.Is(err, io.EOF) { + // This channel's type is a slice, so it only needs to store 1 object. + // This is used to communicate the result of iter.receive. + msgChan := make(chan []*Message, 1) + doneChan := make(chan struct{}) + go func() { + msgs, err := iter.receive(maxToPull) + if errors.Is(err, io.EOF) { + } + if err != nil { + } + msgChan <- msgs + close(doneChan) + close(msgChan) + }() + + // Make message pulling dependent on iterator for context cancellation + // If the context is cancelled while pulling messages, stop calling Receive early. + select { + case <-ctx2.Done(): return nil + case <-doneChan: } - if err != nil { - return err + + var msgs []*Message + for _, m := range <-msgChan { + msgs = append(msgs, m) } + // If context is done and messages have been pulled, // nack them. select { - case <-ctx.Done(): + case <-ctx2.Done(): for _, m := range msgs { m.Nack() } @@ -404,7 +431,19 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa } } defer fc.release(ctx, msgLen) - f(otelCtx, m) + + done := make(chan struct{}) + go func() { + defer close(done) + f(otelCtx, m) + }() + + select { + case <-done: + // Callback finished gracefully. + case <-shutdownKillCtx.Done(): + // Shutdown timeout exceeded, stop waiting for callback. + } }); err != nil { wg.Done() // TODO(hongalex): propagate these errors to an otel span. @@ -424,21 +463,50 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa } go func() { + // Detected cancellation (either user initiated or permanent error). <-ctx2.Done() - - // Wait for all iterators to stop. + // Stop all the pullstreams as the first thing we do to prevent new messages. for _, p := range pairs { - p.iter.stop() - p.wg.Done() + p.iter.ps.cancel() + } + + opts := s.ReceiveSettings.ShutdownOptions + // Once shutdown is initiated, start the timer for forceful shutdown. + if opts.Timeout == 0 { + shutdownKillCancel() // Immediate forceful shutdown. + } else if opts.Timeout > 0 { + time.AfterFunc(opts.Timeout, shutdownKillCancel) + for _, p := range pairs { + p.iter.nackInventory(shutdownKillCtx) + } } - // This _must_ happen after every iterator has stopped, or some - // iterator will still have undelivered messages but the scheduler will - // already be shut down. - sched.Shutdown() + if opts.Timeout >= 0 { + go func() { + for _, p := range pairs { + // Since we aren't looking for graceful timeout, call wg.Done in + // order to free up the waitgroup sooner and exit sooner. + // In graceful timeout, iterators must stop before wg is done. + p.wg.Done() + p.iter.stop() + } + sched.Shutdown() + }() + } else if opts.Timeout < 0 { + // Either no options or specified or the user wants to graceful shutdown. + for _, p := range pairs { + p.iter.stop() + p.wg.Done() + } + // This _must_ happen after every iterator has stopped, or some + // iterator will still have undelivered messages but the scheduler will + // already be shut down. + sched.Shutdown() + } }() - return group.Wait() + err := group.Wait() + return err } type pullOptions struct { diff --git a/pubsub/v2/subscriber_test.go b/pubsub/v2/subscriber_test.go index 2d272cd7bd56..efa5d3cfc70c 100644 --- a/pubsub/v2/subscriber_test.go +++ b/pubsub/v2/subscriber_test.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "log" "testing" "time" @@ -251,7 +250,6 @@ func TestExactlyOnceDelivery_AckRetryDeadlineExceeded(t *testing.T) { // Override the default timeout here so this test doesn't take 10 minutes. exactlyOnceDeliveryRetryDeadline = 10 * time.Second err = s.Receive(ctx, func(ctx context.Context, msg *Message) { - log.Printf("received message: %v\n", msg) ar := msg.AckWithResult() s, err := ar.Get(ctx) if s != AcknowledgeStatusOther { From bfe941ad4220463f20419717456a6d6224e3a33a Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 8 Sep 2025 15:19:28 -0700 Subject: [PATCH 03/13] add log printf statement back --- pubsub/v2/iterator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/v2/iterator.go b/pubsub/v2/iterator.go index 0ecb4ce7e87b..eef85d2c17ef 100644 --- a/pubsub/v2/iterator.go +++ b/pubsub/v2/iterator.go @@ -824,7 +824,7 @@ func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed")) } if logOnInvalid { - ("automatic lease modack retry failed for following IDs: %v", ackIDs) + log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs) } return } From 6f242bfb7aedad005cdc28c49efed690caf9022a Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 8 Sep 2025 16:38:57 -0700 Subject: [PATCH 04/13] propagate stream errors to Receive --- pubsub/v2/shutdown.go | 2 ++ pubsub/v2/subscriber.go | 18 +++++++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pubsub/v2/shutdown.go b/pubsub/v2/shutdown.go index d58b8fa38bdb..1b94d2958dab 100644 --- a/pubsub/v2/shutdown.go +++ b/pubsub/v2/shutdown.go @@ -17,6 +17,8 @@ package pubsub import "time" // ShutdownOptions configures the shutdown behavior of the subscriber. +// If not specified, the behavior will default to indefinite processing, +// that is graceful shutdown with no timeout. type ShutdownOptions struct { // Timeout specifies the time the subscriber should wait // to shutdown before killing the process. diff --git a/pubsub/v2/subscriber.go b/pubsub/v2/subscriber.go index 0ec34dbc2ede..77b1383de159 100644 --- a/pubsub/v2/subscriber.go +++ b/pubsub/v2/subscriber.go @@ -231,9 +231,8 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa if minExtPeriod < 0 { minExtPeriod = DefaultReceiveSettings.MinDurationPerAckExtension } - shutdown := s.ReceiveSettings.ShutdownOptions - if shutdown == nil { - shutdown = DefaultReceiveSettings.ShutdownOptions + if s.ReceiveSettings.ShutdownOptions == nil { + s.ReceiveSettings.ShutdownOptions = DefaultReceiveSettings.ShutdownOptions } var numGoroutines int @@ -315,16 +314,20 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa // This channel's type is a slice, so it only needs to store 1 object. // This is used to communicate the result of iter.receive. msgChan := make(chan []*Message, 1) + errChan := make(chan error, 1) doneChan := make(chan struct{}) go func() { + defer close(doneChan) + defer close(msgChan) + defer close(errChan) msgs, err := iter.receive(maxToPull) if errors.Is(err, io.EOF) { + errChan <- nil } if err != nil { + errChan <- err } msgChan <- msgs - close(doneChan) - close(msgChan) }() // Make message pulling dependent on iterator for context cancellation @@ -332,6 +335,8 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa select { case <-ctx2.Done(): return nil + case err := <-errChan: + return err case <-doneChan: } @@ -505,8 +510,7 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa } }() - err := group.Wait() - return err + return group.Wait() } type pullOptions struct { From 16d03f051704c62a485df7d66918326fdafb6d86 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 9 Sep 2025 16:02:01 -0700 Subject: [PATCH 05/13] fix error with channel closing --- pubsub/v2/subscriber.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pubsub/v2/subscriber.go b/pubsub/v2/subscriber.go index 77b1383de159..5d6cd214f46b 100644 --- a/pubsub/v2/subscriber.go +++ b/pubsub/v2/subscriber.go @@ -306,7 +306,7 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa var maxToPull int32 // maximum number of messages to pull // If the context is done, don't pull more messages. select { - case <-ctx2.Done(): + case <-ctx.Done(): return nil default: } @@ -317,17 +317,19 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa errChan := make(chan error, 1) doneChan := make(chan struct{}) go func() { - defer close(doneChan) defer close(msgChan) defer close(errChan) msgs, err := iter.receive(maxToPull) if errors.Is(err, io.EOF) { errChan <- nil + return } if err != nil { errChan <- err + return } msgChan <- msgs + close(doneChan) }() // Make message pulling dependent on iterator for context cancellation From f6552328df15be75ffa3ecade00df781767fff2b Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 11 Sep 2025 15:00:44 -0700 Subject: [PATCH 06/13] simplify stream pulling channels --- pubsub/v2/subscriber.go | 45 +++++++++++++++++------------------------ 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/pubsub/v2/subscriber.go b/pubsub/v2/subscriber.go index 5d6cd214f46b..6c41662fbea6 100644 --- a/pubsub/v2/subscriber.go +++ b/pubsub/v2/subscriber.go @@ -277,7 +277,7 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa ctx2, cancel2 := context.WithCancel(gctx) defer cancel2() - // This context is for forcefully shutting down callbacks if ShutdownTimeout + // This context is for forcefully shutting down the application if ShutdownTimeout // is exceeded. shutdownKillCtx, shutdownKillCancel := context.WithCancel(context.Background()) defer shutdownKillCancel() @@ -311,14 +311,10 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa default: } - // This channel's type is a slice, so it only needs to store 1 object. // This is used to communicate the result of iter.receive. - msgChan := make(chan []*Message, 1) - errChan := make(chan error, 1) - doneChan := make(chan struct{}) + msgChan := make(chan []*Message) + errChan := make(chan error) go func() { - defer close(msgChan) - defer close(errChan) msgs, err := iter.receive(maxToPull) if errors.Is(err, io.EOF) { errChan <- nil @@ -329,28 +325,23 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa return } msgChan <- msgs - close(doneChan) }() // Make message pulling dependent on iterator for context cancellation // If the context is cancelled while pulling messages, stop calling Receive early. + var msgs []*Message select { - case <-ctx2.Done(): + case <-ctx.Done(): return nil case err := <-errChan: return err - case <-doneChan: - } - - var msgs []*Message - for _, m := range <-msgChan { - msgs = append(msgs, m) + case msgs = <-msgChan: } // If context is done and messages have been pulled, // nack them. select { - case <-ctx2.Done(): + case <-ctx.Done(): for _, m := range msgs { m.Nack() } @@ -477,30 +468,30 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa p.iter.ps.cancel() } - opts := s.ReceiveSettings.ShutdownOptions + shutdownOpts := s.ReceiveSettings.ShutdownOptions // Once shutdown is initiated, start the timer for forceful shutdown. - if opts.Timeout == 0 { + if shutdownOpts.Timeout == 0 { shutdownKillCancel() // Immediate forceful shutdown. - } else if opts.Timeout > 0 { - time.AfterFunc(opts.Timeout, shutdownKillCancel) + } else if shutdownOpts.Timeout > 0 { + time.AfterFunc(shutdownOpts.Timeout, shutdownKillCancel) for _, p := range pairs { p.iter.nackInventory(shutdownKillCtx) } } - if opts.Timeout >= 0 { + if shutdownOpts.Timeout >= 0 { // Timed shutdown go func() { for _, p := range pairs { - // Since we aren't looking for graceful timeout, call wg.Done in - // order to free up the waitgroup sooner and exit sooner. - // In graceful timeout, iterators must stop before wg is done. + // Since we aren't looking for graceful timeout, call + // each iterator.stop asynchronously. + go func() { + p.iter.stop() + }() p.wg.Done() - p.iter.stop() } sched.Shutdown() }() - } else if opts.Timeout < 0 { - // Either no options or specified or the user wants to graceful shutdown. + } else if shutdownOpts.Timeout < 0 { // Graceful shutdown for _, p := range pairs { p.iter.stop() p.wg.Done() From 1a9e6de66eeb651c0c8dbe72e9d787f6e9d42642 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Sun, 21 Sep 2025 10:06:50 -0700 Subject: [PATCH 07/13] revert recent changes --- pubsub/v2/integration_test.go | 4 +-- pubsub/v2/shutdown_test.go | 1 + pubsub/v2/streaming_pull_test.go | 5 ++-- pubsub/v2/subscriber.go | 45 +++++++++++++++++--------------- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/pubsub/v2/integration_test.go b/pubsub/v2/integration_test.go index a1e0e194473e..541751a301d7 100644 --- a/pubsub/v2/integration_test.go +++ b/pubsub/v2/integration_test.go @@ -317,7 +317,7 @@ func TestIntegration_CancelReceive(t *testing.T) { return default: publisher.Publish(ctx, &Message{Data: []byte("some msg")}) - time.Sleep(time.Second) + time.Sleep(1 * time.Second) } } }() @@ -325,7 +325,7 @@ func TestIntegration_CancelReceive(t *testing.T) { go func() { err = sub.Receive(ctx, func(_ context.Context, msg *Message) { cancel() - time.AfterFunc(5*time.Second, msg.Ack) + msg.Ack() }) close(doneReceiving) }() diff --git a/pubsub/v2/shutdown_test.go b/pubsub/v2/shutdown_test.go index 24b8cb76a422..330627e613c4 100644 --- a/pubsub/v2/shutdown_test.go +++ b/pubsub/v2/shutdown_test.go @@ -84,6 +84,7 @@ func TestShutdown_NackImmediately(t *testing.T) { } func TestShutdown_WaitForProcessing(t *testing.T) { + t.Skip("skip") tests := []struct { name string shutdownTimeout time.Duration diff --git a/pubsub/v2/streaming_pull_test.go b/pubsub/v2/streaming_pull_test.go index 66dc6cfcba91..cc77fc2fbab2 100644 --- a/pubsub/v2/streaming_pull_test.go +++ b/pubsub/v2/streaming_pull_test.go @@ -68,7 +68,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) { func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer, msgs []*pb.ReceivedMessage) { sub := client.Subscriber("S") - gotMsgs, err := pullN(context.Background(), sub, len(msgs), 0, func(_ context.Context, m *Message) { + gotMsgs, err := pullN(context.Background(), sub, len(msgs), 0*time.Second, func(_ context.Context, m *Message) { id, err := strconv.Atoi(msgAckID(m)) if err != nil { t.Fatalf("pullN err: %v", err) @@ -114,6 +114,8 @@ func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer } } server.wait() + server.mu.Lock() + defer server.mu.Unlock() for i := 0; i < len(msgs); i++ { id := msgs[i].AckId if i%2 == 0 { @@ -184,7 +186,6 @@ func TestStreamingPullCancel(t *testing.T) { func TestStreamingPullRetry(t *testing.T) { // Check that we retry on io.EOF or Unavailable. - t.Parallel() client, server := newMock(t) defer server.srv.Close() defer client.Close() diff --git a/pubsub/v2/subscriber.go b/pubsub/v2/subscriber.go index 6c41662fbea6..cf7826c5a2a9 100644 --- a/pubsub/v2/subscriber.go +++ b/pubsub/v2/subscriber.go @@ -280,7 +280,6 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa // This context is for forcefully shutting down the application if ShutdownTimeout // is exceeded. shutdownKillCtx, shutdownKillCancel := context.WithCancel(context.Background()) - defer shutdownKillCancel() for i := 0; i < numGoroutines; i++ { // The iterator does not use the context passed to Receive. If it did, @@ -328,7 +327,7 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa }() // Make message pulling dependent on iterator for context cancellation - // If the context is cancelled while pulling messages, stop calling Receive early. + // If the context is cancelled while pulling messages, stop reading from stream early. var msgs []*Message select { case <-ctx.Done(): @@ -350,7 +349,6 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa } for i, msg := range msgs { - msg := msg iter.eoMu.RLock() ackh, _ := msgAckHandler(msg, iter.enableExactlyOnceDelivery) iter.eoMu.RUnlock() @@ -429,19 +427,20 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa } } defer fc.release(ctx, msgLen) - - done := make(chan struct{}) - go func() { - defer close(done) - f(otelCtx, m) - }() - - select { - case <-done: - // Callback finished gracefully. - case <-shutdownKillCtx.Done(): - // Shutdown timeout exceeded, stop waiting for callback. - } + f(otelCtx, m) + + // done := make(chan struct{}) + // go func() { + // defer close(done) + // f(otelCtx, m) + // }() + + // select { + // case <-done: + // // Callback finished gracefully. + // case <-shutdownKillCtx.Done(): + // // Shutdown timeout exceeded, stop waiting for callback. + // } }); err != nil { wg.Done() // TODO(hongalex): propagate these errors to an otel span. @@ -463,16 +462,20 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa go func() { // Detected cancellation (either user initiated or permanent error). <-ctx2.Done() - // Stop all the pullstreams as the first thing we do to prevent new messages. - for _, p := range pairs { - p.iter.ps.cancel() - } shutdownOpts := s.ReceiveSettings.ShutdownOptions // Once shutdown is initiated, start the timer for forceful shutdown. if shutdownOpts.Timeout == 0 { + // Stop all the pullstreams as the first thing we do to prevent new messages. + for _, p := range pairs { + p.iter.ps.cancel() + } shutdownKillCancel() // Immediate forceful shutdown. } else if shutdownOpts.Timeout > 0 { + // Stop all the pullstreams as the first thing we do to prevent new messages. + for _, p := range pairs { + p.iter.ps.cancel() + } time.AfterFunc(shutdownOpts.Timeout, shutdownKillCancel) for _, p := range pairs { p.iter.nackInventory(shutdownKillCtx) @@ -491,7 +494,7 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa } sched.Shutdown() }() - } else if shutdownOpts.Timeout < 0 { // Graceful shutdown + } else { // Graceful shutdown for _, p := range pairs { p.iter.stop() p.wg.Done() From 95bed5325ad8115dba596614bd8661f73b59ebe7 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 22 Sep 2025 12:52:59 -0700 Subject: [PATCH 08/13] fix default options pointer bug, shorten tests --- pubsub/v2/shutdown_test.go | 23 +++++++++---------- pubsub/v2/subscriber.go | 45 ++++++++++++++++++++------------------ 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/pubsub/v2/shutdown_test.go b/pubsub/v2/shutdown_test.go index 330627e613c4..da2c55e02d2a 100644 --- a/pubsub/v2/shutdown_test.go +++ b/pubsub/v2/shutdown_test.go @@ -24,6 +24,7 @@ import ( ) func TestShutdown_NackImmediately(t *testing.T) { + t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() client, srv := newFake(t) @@ -39,9 +40,10 @@ func TestShutdown_NackImmediately(t *testing.T) { // Part of this test: pretend to extend the min duration quite a bit so we can test // if the message has been properly nacked. sub.ReceiveSettings.MinDurationPerAckExtension = 10 * time.Minute - sub.ReceiveSettings.ShutdownOptions.Timeout = 1 * time.Minute - sub.ReceiveSettings.ShutdownOptions.Behavior = ShutdownBehaviorNackImmediately - + sub.ReceiveSettings.ShutdownOptions = &ShutdownOptions{ + Behavior: ShutdownBehaviorNackImmediately, + Timeout: 1 * time.Minute, + } var wg sync.WaitGroup wg.Add(1) go func() { @@ -57,14 +59,13 @@ func TestShutdown_NackImmediately(t *testing.T) { go sub.Receive(cctx, func(ctx context.Context, m *Message) { // First time receiving, cancel the context to trigger shutdown. // Don't cancel away to avoid race condition with fake. - time.Sleep(10 * time.Second) + time.Sleep(2 * time.Second) ccancel() }) // Wait for the message to be redelivered. - time.Sleep(15 * time.Second) + time.Sleep(5 * time.Second) - // call Receive again var received int var receiveLock sync.Mutex ctx2, cancel := context.WithTimeout(ctx, 30*time.Second) @@ -84,7 +85,7 @@ func TestShutdown_NackImmediately(t *testing.T) { } func TestShutdown_WaitForProcessing(t *testing.T) { - t.Skip("skip") + t.Parallel() tests := []struct { name string shutdownTimeout time.Duration @@ -97,10 +98,10 @@ func TestShutdown_WaitForProcessing(t *testing.T) { expectedTimeout: 5 * time.Second, }, { - name: "15 second timeout", - shutdownTimeout: 15 * time.Second, - expectedTimeout: 16 * time.Second, - minTime: 14 * time.Second, + name: "WithTimeout", + shutdownTimeout: 5 * time.Second, + expectedTimeout: 6 * time.Second, + minTime: 4 * time.Second, }, } diff --git a/pubsub/v2/subscriber.go b/pubsub/v2/subscriber.go index cf7826c5a2a9..665928aecb67 100644 --- a/pubsub/v2/subscriber.go +++ b/pubsub/v2/subscriber.go @@ -162,10 +162,6 @@ var DefaultReceiveSettings = ReceiveSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, // 1G NumGoroutines: 1, - ShutdownOptions: &ShutdownOptions{ - Behavior: ShutdownBehaviorWaitForProcessing, - Timeout: -1, - }, } var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscriber") @@ -231,8 +227,17 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa if minExtPeriod < 0 { minExtPeriod = DefaultReceiveSettings.MinDurationPerAckExtension } - if s.ReceiveSettings.ShutdownOptions == nil { - s.ReceiveSettings.ShutdownOptions = DefaultReceiveSettings.ShutdownOptions + var shutdownOpts ShutdownOptions + if s.ReceiveSettings.ShutdownOptions != nil { + shutdownOpts = *s.ReceiveSettings.ShutdownOptions + } else { + // We can't store these in DefaultReceiveSettings because + // ShutdownOptions is a pointer, and editing one client's + /// ReceiveSettings will update the underlying ShutdownOptions value. + shutdownOpts = ShutdownOptions{ + Behavior: ShutdownBehaviorWaitForProcessing, + Timeout: -1, + } } var numGoroutines int @@ -427,20 +432,19 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa } } defer fc.release(ctx, msgLen) - f(otelCtx, m) - - // done := make(chan struct{}) - // go func() { - // defer close(done) - // f(otelCtx, m) - // }() - - // select { - // case <-done: - // // Callback finished gracefully. - // case <-shutdownKillCtx.Done(): - // // Shutdown timeout exceeded, stop waiting for callback. - // } + + cbDone := make(chan struct{}) + go func() { + defer close(cbDone) + f(otelCtx, m) + }() + + select { + case <-cbDone: + // Callback finished gracefully. + case <-shutdownKillCtx.Done(): + // Shutdown timeout exceeded, stop waiting for callback. + } }); err != nil { wg.Done() // TODO(hongalex): propagate these errors to an otel span. @@ -463,7 +467,6 @@ func (s *Subscriber) Receive(ctx context.Context, f func(context.Context, *Messa // Detected cancellation (either user initiated or permanent error). <-ctx2.Done() - shutdownOpts := s.ReceiveSettings.ShutdownOptions // Once shutdown is initiated, start the timer for forceful shutdown. if shutdownOpts.Timeout == 0 { // Stop all the pullstreams as the first thing we do to prevent new messages. From 345ae1a58807cf3df83266318af3bc8113002869 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 22 Sep 2025 12:56:11 -0700 Subject: [PATCH 09/13] revert unrelated changes to existing tests --- pubsub/v2/integration_test.go | 4 ++-- pubsub/v2/shutdown_test.go | 3 +-- pubsub/v2/streaming_pull_test.go | 3 ++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pubsub/v2/integration_test.go b/pubsub/v2/integration_test.go index 541751a301d7..a1e0e194473e 100644 --- a/pubsub/v2/integration_test.go +++ b/pubsub/v2/integration_test.go @@ -317,7 +317,7 @@ func TestIntegration_CancelReceive(t *testing.T) { return default: publisher.Publish(ctx, &Message{Data: []byte("some msg")}) - time.Sleep(1 * time.Second) + time.Sleep(time.Second) } } }() @@ -325,7 +325,7 @@ func TestIntegration_CancelReceive(t *testing.T) { go func() { err = sub.Receive(ctx, func(_ context.Context, msg *Message) { cancel() - msg.Ack() + time.AfterFunc(5*time.Second, msg.Ack) }) close(doneReceiving) }() diff --git a/pubsub/v2/shutdown_test.go b/pubsub/v2/shutdown_test.go index da2c55e02d2a..96ad85a11217 100644 --- a/pubsub/v2/shutdown_test.go +++ b/pubsub/v2/shutdown_test.go @@ -59,8 +59,7 @@ func TestShutdown_NackImmediately(t *testing.T) { go sub.Receive(cctx, func(ctx context.Context, m *Message) { // First time receiving, cancel the context to trigger shutdown. // Don't cancel away to avoid race condition with fake. - time.Sleep(2 * time.Second) - ccancel() + time.AfterFunc(2*time.Second, ccancel) }) // Wait for the message to be redelivered. diff --git a/pubsub/v2/streaming_pull_test.go b/pubsub/v2/streaming_pull_test.go index cc77fc2fbab2..35116457512a 100644 --- a/pubsub/v2/streaming_pull_test.go +++ b/pubsub/v2/streaming_pull_test.go @@ -68,7 +68,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) { func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer, msgs []*pb.ReceivedMessage) { sub := client.Subscriber("S") - gotMsgs, err := pullN(context.Background(), sub, len(msgs), 0*time.Second, func(_ context.Context, m *Message) { + gotMsgs, err := pullN(context.Background(), sub, len(msgs), 0, func(_ context.Context, m *Message) { id, err := strconv.Atoi(msgAckID(m)) if err != nil { t.Fatalf("pullN err: %v", err) @@ -185,6 +185,7 @@ func TestStreamingPullCancel(t *testing.T) { } func TestStreamingPullRetry(t *testing.T) { + t.Parallel() // Check that we retry on io.EOF or Unavailable. client, server := newMock(t) defer server.srv.Close() From a186fbdfc0d4a65cc05e7888772c5e0e31f69ce3 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 22 Sep 2025 13:07:06 -0700 Subject: [PATCH 10/13] fix race in CancelReceive test --- pubsub/v2/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/v2/integration_test.go b/pubsub/v2/integration_test.go index a1e0e194473e..66fdb8ed83f8 100644 --- a/pubsub/v2/integration_test.go +++ b/pubsub/v2/integration_test.go @@ -317,7 +317,7 @@ func TestIntegration_CancelReceive(t *testing.T) { return default: publisher.Publish(ctx, &Message{Data: []byte("some msg")}) - time.Sleep(time.Second) + time.Sleep(10 * time.Second) } } }() From 7b17c9f5df51ef47825dfd3bb2f1ed55e5d0c6bc Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 22 Sep 2025 16:03:26 -0700 Subject: [PATCH 11/13] make comments more clear --- pubsub/v2/shutdown.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/pubsub/v2/shutdown.go b/pubsub/v2/shutdown.go index 1b94d2958dab..06db1bc583cc 100644 --- a/pubsub/v2/shutdown.go +++ b/pubsub/v2/shutdown.go @@ -17,16 +17,18 @@ package pubsub import "time" // ShutdownOptions configures the shutdown behavior of the subscriber. -// If not specified, the behavior will default to indefinite processing, -// that is graceful shutdown with no timeout. +// If not specified, the default behavior is indefinite processing, +// aka graceful shutdown with no timeout. type ShutdownOptions struct { // Timeout specifies the time the subscriber should wait - // to shutdown before killing the process. - // In nack mode, this specifies how long to wait - // for messages to be nacked after shutdown is initiated. + // before forcefully shutting down.. + // In ShutdownBehaviorNackImmediately mode, this configures the timeout + // for message nacks before shutting down. // - // Set to zero to immediately shutdown. + // Set to zero to immediately shutdown (either modes) // Set to a negative value to disable timeout. + // When ShutdownOptions is not set, the client library will + // assume disabled/infinite timeout, which matches the current behavior. Timeout time.Duration // Behavior defines the strategy the subscriber should use when @@ -39,11 +41,11 @@ type ShutdownOptions struct { type ShutdownBehavior int const ( - // ShutdownBehaviorWaitForProcessing means the subscriber will wait for - // outstanding messages to be processed before nacking messages finally. + // ShutdownBehaviorWaitForProcessing means the subscriber client will wait for + // outstanding messages to be processed. ShutdownBehaviorWaitForProcessing = iota - // ShutdownBehaviorNackImmediately means the subscriber will nack all outstanding - // messages before closing. + // ShutdownBehaviorNackImmediately means the subscriber client will nack all + // outstanding messages before closing. ShutdownBehaviorNackImmediately ) From 779eb200476ba08188d1a13b0d8360236113f51f Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 22 Sep 2025 16:05:21 -0700 Subject: [PATCH 12/13] added warning to Timeout default --- pubsub/v2/shutdown.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pubsub/v2/shutdown.go b/pubsub/v2/shutdown.go index 06db1bc583cc..69013a82cc1f 100644 --- a/pubsub/v2/shutdown.go +++ b/pubsub/v2/shutdown.go @@ -27,8 +27,11 @@ type ShutdownOptions struct { // // Set to zero to immediately shutdown (either modes) // Set to a negative value to disable timeout. + // // When ShutdownOptions is not set, the client library will // assume disabled/infinite timeout, which matches the current behavior. + // When ShutdownOptions is set, but Timeout is unspecified, the default zero-value + // will result in immediate shutdown. Timeout time.Duration // Behavior defines the strategy the subscriber should use when From 6097da0bbd5734b95b05b1dce486f097124dcdde Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 25 Sep 2025 14:22:20 -0700 Subject: [PATCH 13/13] add better clarifying comments to shutdown behavior --- pubsub/v2/shutdown.go | 19 +++++++++++-------- pubsub/v2/subscriber.go | 3 ++- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pubsub/v2/shutdown.go b/pubsub/v2/shutdown.go index 69013a82cc1f..98df0027dedc 100644 --- a/pubsub/v2/shutdown.go +++ b/pubsub/v2/shutdown.go @@ -17,25 +17,28 @@ package pubsub import "time" // ShutdownOptions configures the shutdown behavior of the subscriber. -// If not specified, the default behavior is indefinite processing, -// aka graceful shutdown with no timeout. +// When ShutdownOptions is nil, the client library will +// assume disabled/infinite timeout. +// +// Warning: The interaction between Timeout and Behavior might be surprising. +// Read about the interaction of these below to ensure you +// get the desired behavior. type ShutdownOptions struct { // Timeout specifies the time the subscriber should wait // before forcefully shutting down.. // In ShutdownBehaviorNackImmediately mode, this configures the timeout // for message nacks before shutting down. // - // Set to zero to immediately shutdown (either modes) + // Set to zero to immediately shutdown. // Set to a negative value to disable timeout. - // - // When ShutdownOptions is not set, the client library will - // assume disabled/infinite timeout, which matches the current behavior. - // When ShutdownOptions is set, but Timeout is unspecified, the default zero-value - // will result in immediate shutdown. + // Both zero and negative values overrides the ShutdownBehavior. Timeout time.Duration // Behavior defines the strategy the subscriber should use when // shutting down (wait or nack messages). + // When ShutdownOptions is set, but Timeout is unspecified, the default zero-value + // will result in immediate shutdown. When needing a specific a behavior, + // always set a non-zero Timeout. Behavior ShutdownBehavior } diff --git a/pubsub/v2/subscriber.go b/pubsub/v2/subscriber.go index 665928aecb67..01fe4f40933b 100644 --- a/pubsub/v2/subscriber.go +++ b/pubsub/v2/subscriber.go @@ -150,7 +150,8 @@ type ReceiveSettings struct { NumGoroutines int // ShutdownOptions configures the shutdown behavior of the subscriber. - // If unset, the default behavior is to graceful shutdown with no timeout. + // Default: if unset / nil, the client library will wait + // indefinitely for all in messages inflight to be acked/nacked. ShutdownOptions *ShutdownOptions }