From f6cbeb44bb8fb4a263e57d78607c3e3929285691 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 5 Aug 2020 10:22:49 -0700 Subject: [PATCH 1/2] reduce partition size and expand cloud regions in IT --- .../main/java/pubsublite/CreateTopicExample.java | 6 +++--- .../src/test/java/pubsublite/QuickStartIT.java | 14 ++++++++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java b/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java index dd1c44684..5ee7bb754 100644 --- a/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java +++ b/samples/snippets/src/main/java/pubsublite/CreateTopicExample.java @@ -38,7 +38,7 @@ public static void main(String... args) throws Exception { char zoneId = 'b'; String topicId = "your-topic-id"; long projectNumber = Long.parseLong("123456789"); - Integer partitions = 1; + int partitions = 1; createTopicExample(cloudRegion, zoneId, projectNumber, topicId, partitions); } @@ -67,11 +67,11 @@ public static void createTopicExample( RetentionConfig.newBuilder() // How long messages are retained. .setPeriod(Durations.fromDays(1)) - // Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB. + // Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB. // If the number of bytes stored in any of the topic's partitions grows // beyond this value, older messages will be dropped to make room for // newer ones, regardless of the value of `period`. - .setPerPartitionBytes(100 * 1024 * 1024 * 1024L)) + .setPerPartitionBytes(30 * 1024 * 1024 * 1024L)) .setName(topicPath.value()) .build(); diff --git a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java index 32829aec7..90680c82f 100644 --- a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java +++ b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java @@ -22,7 +22,9 @@ import com.google.common.collect.ImmutableList; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.util.Arrays; import java.util.List; +import java.util.Random; import java.util.UUID; import org.junit.After; import org.junit.Before; @@ -35,17 +37,21 @@ public class QuickStartIT { private ByteArrayOutputStream bout; private PrintStream out; + private Random rand = new Random(); + private List cloudRegions = + Arrays.asList( + "us-central1", "europe-north1", "asia-east1", "australia-southeast1", "asia-northeast2"); private static final String GOOGLE_CLOUD_PROJECT_NUMBER = System.getenv("GOOGLE_CLOUD_PROJECT_NUMBER"); - private static final String CLOUD_REGION = "us-central1"; + private String CLOUD_REGION = cloudRegions.get(rand.nextInt(cloudRegions.size())); private static final char ZONE_ID = 'b'; private static final Long PROJECT_NUMBER = Long.parseLong(GOOGLE_CLOUD_PROJECT_NUMBER); private static final String SUFFIX = UUID.randomUUID().toString(); private static final String TOPIC_NAME = "lite-topic-" + SUFFIX; private static final String SUBSCRIPTION_NAME = "lite-subscription-" + SUFFIX; - private static final int PARTITIONS = 1; - private static final int MESSAGE_COUNT = 1; + private static final int PARTITIONS = 2; + private static final int MESSAGE_COUNT = 10; private static final List PARTITION_NOS = ImmutableList.of(0); private static void requireEnvVar(String varName) { @@ -84,7 +90,7 @@ public void testQuickstart() throws Exception { // Get a topic. GetTopicExample.getTopicExample(CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, TOPIC_NAME); assertThat(bout.toString()).contains(TOPIC_NAME); - assertThat(bout.toString()).contains("1 partition(s)."); + assertThat(bout.toString()).contains(String.format("%s partition(s).", PARTITIONS)); bout.reset(); // List topics. From c65de2f59bbc04c3045f35f5725d57f76c9769d7 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 5 Aug 2020 10:31:19 -0700 Subject: [PATCH 2/2] automatic subscriber partitions --- .readme-partials.yaml | 16 +++++----------- .../java/pubsublite/SubscriberExample.java | 19 +++++-------------- .../test/java/pubsublite/QuickStartIT.java | 7 ++++--- 3 files changed, 14 insertions(+), 28 deletions(-) diff --git a/.readme-partials.yaml b/.readme-partials.yaml index aec52a7b2..531742fea 100644 --- a/.readme-partials.yaml +++ b/.readme-partials.yaml @@ -40,11 +40,11 @@ custom_content: | RetentionConfig.newBuilder() // How long messages are retained. .setPeriod(Durations.fromDays(1)) - // Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB. + // Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB. // If the number of bytes stored in any of the topic's partitions grows // beyond this value, older messages will be dropped to make room for // newer ones, regardless of the value of `period`. - .setPerPartitionBytes(100 * 1024 * 1024 * 1024L)) + .setPerPartitionBytes(30 * 1024 * 1024 * 1024L)) .setName(topicPath.value()) .build(); @@ -210,7 +210,6 @@ custom_content: | String topicId = "your-topic-id"; // Choose an existing subscription. String subscriptionId = "your-subscription-id"; - List partitionNumbers = ImmutableList.of(0); SubscriptionPath subscriptionPath = SubscriptionPaths.newBuilder() @@ -229,11 +228,6 @@ custom_content: | .setMessagesOutstanding(1000L) .build(); - List partitions = new ArrayList<>(); - for (Integer num : partitionNumbers) { - partitions.add(Partition.of(num)); - } - MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { System.out.println("Id : " + message.getMessageId()); @@ -244,7 +238,6 @@ custom_content: | SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder() .setSubscriptionPath(subscriptionPath) - .setPartitions(partitions) .setReceiver(receiver) // Flow control settings are set at the partition level. .setPerPartitionFlowControlSettings(flowControlSettings) @@ -258,10 +251,11 @@ custom_content: | System.out.println("Listening to messages on " + subscriptionPath.value() + "..."); try { - // Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters + System.out.println(subscriber.state()); + // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters // unrecoverable errors before then, its state will change to FAILED and an // IllegalStateException will be thrown. - subscriber.awaitTerminated(30, TimeUnit.SECONDS); + subscriber.awaitTerminated(90, TimeUnit.SECONDS); } catch (TimeoutException t) { // Shut down the subscriber. This will change the state of the subscriber to TERMINATED. subscriber.stopAsync().awaitTerminated(); diff --git a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java index a826b8c35..49e313749 100644 --- a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java +++ b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java @@ -46,19 +46,15 @@ public static void main(String... args) throws Exception { // Choose an existing subscription for the subscribe example to work. String subscriptionId = "your-subscription-id"; long projectNumber = Long.parseLong("123456789"); - // List of partitions to subscribe to. It can be all the partitions in a topic or - // a subset of them. A topic of N partitions has partition numbers [0~N-1]. - List partitionNumbers = ImmutableList.of(0); - subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, partitionNumbers); + subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId); } public static void subscriberExample( String cloudRegion, char zoneId, long projectNumber, - String subscriptionId, - List partitionNumbers) + String subscriptionId) throws StatusException { SubscriptionPath subscriptionPath = @@ -78,11 +74,6 @@ public static void subscriberExample( .setMessagesOutstanding(1000L) .build(); - List partitions = new ArrayList<>(); - for (Integer num : partitionNumbers) { - partitions.add(Partition.of(num)); - } - MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { System.out.println("Id : " + message.getMessageId()); @@ -93,7 +84,6 @@ public static void subscriberExample( SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder() .setSubscriptionPath(subscriptionPath) - .setPartitions(partitions) .setReceiver(receiver) // Flow control settings are set at the partition level. .setPerPartitionFlowControlSettings(flowControlSettings) @@ -107,10 +97,11 @@ public static void subscriberExample( System.out.println("Listening to messages on " + subscriptionPath.value() + "..."); try { - // Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters + System.out.println(subscriber.state()); + // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters // unrecoverable errors before then, its state will change to FAILED and an // IllegalStateException will be thrown. - subscriber.awaitTerminated(30, TimeUnit.SECONDS); + subscriber.awaitTerminated(90, TimeUnit.SECONDS); } catch (TimeoutException t) { // Shut down the subscriber. This will change the state of the subscriber to TERMINATED. subscriber.stopAsync().awaitTerminated(); diff --git a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java index 90680c82f..50d5b36e4 100644 --- a/samples/snippets/src/test/java/pubsublite/QuickStartIT.java +++ b/samples/snippets/src/test/java/pubsublite/QuickStartIT.java @@ -52,7 +52,6 @@ public class QuickStartIT { private static final String SUBSCRIPTION_NAME = "lite-subscription-" + SUFFIX; private static final int PARTITIONS = 2; private static final int MESSAGE_COUNT = 10; - private static final List PARTITION_NOS = ImmutableList.of(0); private static void requireEnvVar(String varName) { assertNotNull( @@ -163,9 +162,11 @@ public void testQuickstart() throws Exception { bout.reset(); // Subscribe. SubscriberExample.subscriberExample( - CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME, PARTITION_NOS); + CLOUD_REGION, ZONE_ID, PROJECT_NUMBER, SUBSCRIPTION_NAME); assertThat(bout.toString()).contains("Listening"); - assertThat(bout.toString()).contains("Data : message-0"); + for (int i = 0; i < MESSAGE_COUNT; ++i) { + assertThat(bout.toString()).contains(String.format("Data : message-%s", i)); + } assertThat(bout.toString()).contains("Subscriber is shut down: TERMINATED"); bout.reset();