From f9e28d703185c65ed16cc4837533a383c49bdadc Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 15 Jun 2021 11:01:44 -0400 Subject: [PATCH 1/2] fix: ManagedChannel shutdown issues ManagedChannel emits a warning when it is marked for finalization; calling close() in the finalize() method is insufficient to remove this warning. Add ManagedBacklogReaderFactory to tie ManagedChannel shutdown to the SDF finalize method --- .../beam/ManagedBacklogReaderFactory.java | 32 +++++++++ .../beam/ManagedBacklogReaderFactoryImpl.java | 67 +++++++++++++++++++ .../beam/OffsetByteRangeTracker.java | 13 ++-- .../beam/PerSubscriptionPartitionSdf.java | 14 +++- .../pubsublite/beam/SubscribeTransform.java | 11 ++- .../beam/OffsetByteRangeTrackerTest.java | 11 +-- .../beam/PerSubscriptionPartitionSdfTest.java | 33 ++++++++- 7 files changed, 159 insertions(+), 22 deletions(-) create mode 100644 pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/ManagedBacklogReaderFactory.java create mode 100644 pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/ManagedBacklogReaderFactoryImpl.java diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/ManagedBacklogReaderFactory.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/ManagedBacklogReaderFactory.java new file mode 100644 index 000000000..c337047f1 --- /dev/null +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/ManagedBacklogReaderFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.beam; + +import java.io.Serializable; + +/** + * A ManagedBacklogReaderFactory produces TopicBacklogReaders and tears down any produced readers + * when it is itself closed. + * + *

close() should never be called on produced readers. + */ +public interface ManagedBacklogReaderFactory extends AutoCloseable, Serializable { + TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition); + + @Override + void close(); +} diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/ManagedBacklogReaderFactoryImpl.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/ManagedBacklogReaderFactoryImpl.java new file mode 100644 index 000000000..df77cb804 --- /dev/null +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/ManagedBacklogReaderFactoryImpl.java @@ -0,0 +1,67 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.beam; + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class ManagedBacklogReaderFactoryImpl implements ManagedBacklogReaderFactory { + private final SerializableFunction newReader; + + @GuardedBy("this") + private final Map readers = new HashMap<>(); + + ManagedBacklogReaderFactoryImpl( + SerializableFunction newReader) { + this.newReader = newReader; + } + + private static final class NonCloseableTopicBacklogReader implements TopicBacklogReader { + private final TopicBacklogReader underlying; + + NonCloseableTopicBacklogReader(TopicBacklogReader underlying) { + this.underlying = underlying; + } + + @Override + public ComputeMessageStatsResponse computeMessageStats(Offset offset) throws ApiException { + return underlying.computeMessageStats(offset); + } + + @Override + public void close() { + throw new IllegalArgumentException( + "Cannot call close() on a reader returned from ManagedBacklogReaderFactory."); + } + } + + @Override + public synchronized TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) { + return new NonCloseableTopicBacklogReader( + readers.computeIfAbsent(subscriptionPartition, newReader::apply)); + } + + @Override + public synchronized void close() { + readers.values().forEach(TopicBacklogReader::close); + } +} diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java index c6e227466..05c5373f4 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTracker.java @@ -42,7 +42,7 @@ * would return ProcessContinuation.resume(). */ class OffsetByteRangeTracker extends TrackerWithProgress { - private final TopicBacklogReader backlogReader; + private final TopicBacklogReader unownedBacklogReader; private final Duration minTrackingTime; private final long minBytesReceived; private final Stopwatch stopwatch; @@ -51,7 +51,7 @@ class OffsetByteRangeTracker extends TrackerWithProgress { public OffsetByteRangeTracker( OffsetByteRange range, - TopicBacklogReader backlogReader, + TopicBacklogReader unownedBacklogReader, Stopwatch stopwatch, Duration minTrackingTime, long minBytesReceived) { @@ -61,18 +61,13 @@ public OffsetByteRangeTracker( checkArgument( range.getByteCount() == 0L, "May only construct OffsetByteRangeTracker with an unbounded range with no progress."); - this.backlogReader = backlogReader; + this.unownedBacklogReader = unownedBacklogReader; this.minTrackingTime = minTrackingTime; this.minBytesReceived = minBytesReceived; this.stopwatch = stopwatch.reset().start(); this.range = range; } - @Override - public void finalize() { - this.backlogReader.close(); - } - @Override public IsBounded isBounded() { return IsBounded.UNBOUNDED; @@ -170,7 +165,7 @@ public void checkDone() throws IllegalStateException { @Override public Progress getProgress() { ComputeMessageStatsResponse stats = - this.backlogReader.computeMessageStats(Offset.of(nextOffset())); + this.unownedBacklogReader.computeMessageStats(Offset.of(nextOffset())); return Progress.from(range.getByteCount(), stats.getMessageBytes()); } } diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java index b681d6b94..5949d9e5e 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdf.java @@ -31,27 +31,35 @@ class PerSubscriptionPartitionSdf extends DoFn { private final Duration maxSleepTime; + private final ManagedBacklogReaderFactory backlogReaderFactory; private final SubscriptionPartitionProcessorFactory processorFactory; private final SerializableFunction offsetReaderFactory; - private final SerializableBiFunction + private final SerializableBiFunction trackerFactory; private final SerializableFunction committerFactory; PerSubscriptionPartitionSdf( Duration maxSleepTime, + ManagedBacklogReaderFactory backlogReaderFactory, SerializableFunction offsetReaderFactory, - SerializableBiFunction + SerializableBiFunction trackerFactory, SubscriptionPartitionProcessorFactory processorFactory, SerializableFunction committerFactory) { this.maxSleepTime = maxSleepTime; + this.backlogReaderFactory = backlogReaderFactory; this.processorFactory = processorFactory; this.offsetReaderFactory = offsetReaderFactory; this.trackerFactory = trackerFactory; this.committerFactory = committerFactory; } + @Teardown + public void teardown() { + backlogReaderFactory.close(); + } + @GetInitialWatermarkEstimatorState public Instant getInitialWatermarkState() { return Instant.EPOCH; @@ -103,7 +111,7 @@ public OffsetByteRange getInitialRestriction( @NewTracker public TrackerWithProgress newTracker( @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetByteRange range) { - return trackerFactory.apply(subscriptionPartition, range); + return trackerFactory.apply(backlogReaderFactory.newReader(subscriptionPartition), range); } @GetSize diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java index 684b89643..5737cbe1a 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java @@ -85,12 +85,16 @@ private SubscriptionPartitionProcessor newPartitionProcessor( options.flowControlSettings()); } - private TrackerWithProgress newRestrictionTracker( - SubscriptionPartition subscriptionPartition, OffsetByteRange initial) { + private TopicBacklogReader newBacklogReader(SubscriptionPartition subscriptionPartition) { checkSubscription(subscriptionPartition); + return options.getBacklogReader(subscriptionPartition.partition()); + } + + private TrackerWithProgress newRestrictionTracker( + TopicBacklogReader backlogReader, OffsetByteRange initial) { return new OffsetByteRangeTracker( initial, - options.getBacklogReader(subscriptionPartition.partition()), + backlogReader, Stopwatch.createUnstarted(), options.minBundleTimeout(), LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10)); @@ -129,6 +133,7 @@ public PCollection expand(PBegin input) { new PerSubscriptionPartitionSdf( // Ensure we read for at least 5 seconds more than the bundle timeout. options.minBundleTimeout().plus(Duration.standardSeconds(5)), + new ManagedBacklogReaderFactoryImpl(this::newBacklogReader), this::newInitialOffsetReader, this::newRestrictionTracker, this::newPartitionProcessor, diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java index d54ab49dd..34439ab89 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -36,6 +38,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.joda.time.Duration; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,7 +51,7 @@ public class OffsetByteRangeTrackerTest { private static final double IGNORED_FRACTION = -10000000.0; private static final long MIN_BYTES = 1000; private static final OffsetRange RANGE = new OffsetRange(123L, Long.MAX_VALUE); - private final TopicBacklogReader reader = mock(TopicBacklogReader.class); + private final TopicBacklogReader unownedBacklogReader = mock(TopicBacklogReader.class); @Spy Ticker ticker; private OffsetByteRangeTracker tracker; @@ -60,7 +63,7 @@ public void setUp() { tracker = new OffsetByteRangeTracker( OffsetByteRange.of(RANGE, 0), - reader, + unownedBacklogReader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES); @@ -70,7 +73,7 @@ public void setUp() { public void progressTracked() { assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(123), 10))); assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(124), 11))); - when(reader.computeMessageStats(Offset.of(125))) + when(unownedBacklogReader.computeMessageStats(Offset.of(125))) .thenReturn(ComputeMessageStatsResponse.newBuilder().setMessageBytes(1000).build()); Progress progress = tracker.getProgress(); assertEquals(21, progress.getWorkCompleted(), .0001); @@ -79,7 +82,7 @@ public void progressTracked() { @Test public void getProgressStatsFailure() { - when(reader.computeMessageStats(Offset.of(123))) + when(unownedBacklogReader.computeMessageStats(Offset.of(123))) .thenThrow(new CheckedApiException(Code.INTERNAL).underlying); assertThrows(ApiException.class, tracker::getProgress); } diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java index f0dddcce9..3adc35aa4 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java @@ -74,8 +74,11 @@ public class PerSubscriptionPartitionSdfTest { @Mock SerializableFunction offsetReaderFactory; + @Mock ManagedBacklogReaderFactory backlogReaderFactory; + @Mock TopicBacklogReader backlogReader; + @Mock - SerializableBiFunction + SerializableBiFunction trackerFactory; @Mock SubscriptionPartitionProcessorFactory processorFactory; @@ -100,9 +103,11 @@ public void setUp() { when(trackerFactory.apply(any(), any())).thenReturn(tracker); when(committerFactory.apply(any())).thenReturn(committer); when(tracker.currentRestriction()).thenReturn(RESTRICTION); + when(backlogReaderFactory.newReader(any())).thenReturn(backlogReader); sdf = new PerSubscriptionPartitionSdf( MAX_SLEEP_TIME, + backlogReaderFactory, offsetReaderFactory, trackerFactory, processorFactory, @@ -128,7 +133,13 @@ public void getInitialRestrictionReadFailure() { @Test public void newTrackerCallsFactory() { assertSame(tracker, sdf.newTracker(PARTITION, RESTRICTION)); - verify(trackerFactory).apply(PARTITION, RESTRICTION); + verify(trackerFactory).apply(backlogReader, RESTRICTION); + } + + @Test + public void tearDownClosesBacklogReaderFactory() { + sdf.teardown(); + verify(backlogReaderFactory).close(); } @Test @@ -162,13 +173,29 @@ public void process() throws Exception { order2.verify(committer).awaitTerminated(); } + private static final class NoopManagedBacklogReaderFactory + implements ManagedBacklogReaderFactory { + @Override + public TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) { + return null; + } + + @Override + public void close() {} + } + @Test @SuppressWarnings("return.type.incompatible") public void dofnIsSerializable() throws Exception { ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream()); output.writeObject( new PerSubscriptionPartitionSdf( - MAX_SLEEP_TIME, x -> null, (x, y) -> null, (x, y, z) -> null, (x) -> null)); + MAX_SLEEP_TIME, + new NoopManagedBacklogReaderFactory(), + x -> null, + (x, y) -> null, + (x, y, z) -> null, + (x) -> null)); } @Test From 59c7de49d6a346262dc807c6528601598d78821b Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 15 Jun 2021 20:51:36 -0400 Subject: [PATCH 2/2] fix: reformat --- .../cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java | 3 --- .../cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java | 3 +-- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java index 34439ab89..b7932a0a7 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/OffsetByteRangeTrackerTest.java @@ -22,8 +22,6 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -38,7 +36,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.joda.time.Duration; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java index 3adc35aa4..00e39dc6d 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PerSubscriptionPartitionSdfTest.java @@ -78,8 +78,7 @@ public class PerSubscriptionPartitionSdfTest { @Mock TopicBacklogReader backlogReader; @Mock - SerializableBiFunction - trackerFactory; + SerializableBiFunction trackerFactory; @Mock SubscriptionPartitionProcessorFactory processorFactory; @Mock SerializableFunction committerFactory;