diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/FakeApiService.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/FakeApiService.java similarity index 96% rename from google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/FakeApiService.java rename to google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/FakeApiService.java index 9be4e8e49..48998d7d9 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/FakeApiService.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/FakeApiService.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.pubsublite; +package com.google.cloud.pubsublite.internal; import com.google.api.core.AbstractApiService; diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriberTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriberTest.java index 88b7b2f95..63a7eef24 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriberTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriberTest.java @@ -27,7 +27,6 @@ import com.google.api.core.ApiService.Listener; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsublite.FakeApiService; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.MessageTransformer; import com.google.cloud.pubsublite.Offset; @@ -35,6 +34,7 @@ import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms; import com.google.cloud.pubsublite.cloudpubsub.NackHandler; +import com.google.cloud.pubsublite.internal.FakeApiService; import com.google.cloud.pubsublite.internal.StatusExceptionMatcher; import com.google.cloud.pubsublite.internal.wire.Subscriber; import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReader.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReader.java index e3affcdf3..9c9f410f4 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReader.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReader.java @@ -23,6 +23,7 @@ import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.concurrent.GuardedBy; @@ -33,11 +34,14 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.Queue; +import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; @@ -52,17 +56,51 @@ class PubsubLiteUnboundedReader extends UnboundedReader @GuardedBy("monitor.monitor") private final ImmutableMap subscriberMap; + private final CommitterProxy committerProxy; + @GuardedBy("monitor.monitor") private final Queue messages = new ArrayDeque<>(); @GuardedBy("monitor.monitor") private Optional permanentError = Optional.empty(); + private static class CommitterProxy extends ProxyService { + private final Consumer permanentErrorSetter; + + CommitterProxy( + Collection states, Consumer permanentErrorSetter) + throws StatusException { + this.permanentErrorSetter = permanentErrorSetter; + addServices(states.stream().map(state -> state.committer).collect(Collectors.toList())); + } + + @Override + protected void start() {} + + @Override + protected void stop() {} + + @Override + protected void handlePermanentError(StatusException error) { + permanentErrorSetter.accept(error); + } + } + public PubsubLiteUnboundedReader( UnboundedSource source, - ImmutableMap subscriberMap) { + ImmutableMap subscriberMap) + throws StatusException { this.source = source; this.subscriberMap = subscriberMap; + this.committerProxy = + new CommitterProxy( + subscriberMap.values(), + error -> { + try (CloseableMonitor.Hold h = monitor.enter()) { + permanentError = Optional.of(permanentError.orElse(error)); + } + }); + this.committerProxy.startAsync().awaitRunning(); } @Override @@ -188,7 +226,6 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { public void close() { try (CloseableMonitor.Hold h = monitor.enter()) { for (SubscriberState state : subscriberMap.values()) { - state.committer.stopAsync().awaitTerminated(); try { state.subscriber.close(); } catch (Exception e) { @@ -196,6 +233,7 @@ public void close() { } } } + committerProxy.stopAsync().awaitTerminated(); } @Override diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReaderTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReaderTest.java index 9812aaf94..80e237777 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReaderTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReaderTest.java @@ -30,6 +30,7 @@ import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.beam.PubsubLiteUnboundedReader.SubscriberState; +import com.google.cloud.pubsublite.internal.FakeApiService; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -48,6 +49,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; @RunWith(JUnit4.class) public class PubsubLiteUnboundedReaderTest { @@ -57,8 +60,10 @@ public class PubsubLiteUnboundedReaderTest { @SuppressWarnings("unchecked") private final PullSubscriber subscriber8 = mock(PullSubscriber.class); - private final Committer committer5 = mock(Committer.class); - private final Committer committer8 = mock(Committer.class); + abstract static class CommitterFakeService extends FakeApiService implements Committer {} + + @Spy private CommitterFakeService committer5; + @Spy private CommitterFakeService committer8; @SuppressWarnings("unchecked") private final UnboundedSource source = mock(UnboundedSource.class); @@ -78,6 +83,7 @@ private static Instant toInstant(Timestamp timestamp) { } public PubsubLiteUnboundedReaderTest() throws StatusException { + MockitoAnnotations.initMocks(this); SubscriberState state5 = new SubscriberState(); state5.subscriber = subscriber5; state5.committer = committer5; @@ -208,6 +214,5 @@ public void checkpointMarkFinalizeCommits() throws Exception { when(committer5.commitOffset(Offset.of(10))).thenReturn(ApiFutures.immediateFuture(null)); mark.finalizeCheckpoint(); verify(committer5).commitOffset(Offset.of(10)); - verifyNoMoreInteractions(committer5, committer8); } }