diff --git a/.readme-partials.yaml b/.readme-partials.yaml index 16de3f9b2..6fc6560b3 100644 --- a/.readme-partials.yaml +++ b/.readme-partials.yaml @@ -110,11 +110,11 @@ custom_content: | futures.add(future); } } finally { - ArrayList metadata = new ArrayList<>(); + ArrayList metadata = new ArrayList<>(); List ackIds = ApiFutures.allAsList(futures).get(); for (String id : ackIds) { // Decoded metadata contains partition and offset. - metadata.add(PublishMetadata.decode(id)); + metadata.add(MessageMetadata.decode(id)); } System.out.println(metadata + "\nPublished " + ackIds.size() + " messages."); diff --git a/README.md b/README.md index a89d793b0..0c00cefb2 100644 --- a/README.md +++ b/README.md @@ -188,11 +188,11 @@ try { futures.add(future); } } finally { - ArrayList metadata = new ArrayList<>(); + ArrayList metadata = new ArrayList<>(); List ackIds = ApiFutures.allAsList(futures).get(); for (String id : ackIds) { // Decoded metadata contains partition and offset. - metadata.add(PublishMetadata.decode(id)); + metadata.add(MessageMetadata.decode(id)); } System.out.println(metadata + "\nPublished " + ackIds.size() + " messages."); diff --git a/google-cloud-pubsublite/clirr-ignored-differences.xml b/google-cloud-pubsublite/clirr-ignored-differences.xml index f652e80d9..4bf5160bf 100644 --- a/google-cloud-pubsublite/clirr-ignored-differences.xml +++ b/google-cloud-pubsublite/clirr-ignored-differences.xml @@ -1,6 +1,12 @@ + + + 8001 + com/google/cloud/pubsublite/PublishMetadata + * + 7013 diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/PublishMetadata.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/MessageMetadata.java similarity index 68% rename from google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/PublishMetadata.java rename to google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/MessageMetadata.java index 155bec0a1..b7fdb87a8 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/PublishMetadata.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/MessageMetadata.java @@ -24,32 +24,33 @@ import com.google.cloud.pubsublite.internal.CheckedApiException; /** - * Information about a successful publish operation. Can be encoded in the string returned by the - * Cloud Pub/Sub {@link com.google.cloud.pubsub.v1.Publisher#publish} api. + * Information about a message in Pub/Sub Lite. Can be encoded in the string returned by the Cloud + * Pub/Sub {@link com.google.cloud.pubsub.v1.Publisher#publish} api or the {@link + * com.google.pubsub.v1.PubsubMessage#getMessageId} field on received messages. */ @AutoValue -public abstract class PublishMetadata { +public abstract class MessageMetadata { /** The partition a message was published to. */ public abstract Partition partition(); /** The offset a message was assigned. */ public abstract Offset offset(); - /** Construct a PublishMetadata from a Partition and Offset. */ - public static PublishMetadata of(Partition partition, Offset offset) { - return new AutoValue_PublishMetadata(partition, offset); + /** Construct a MessageMetadata from a Partition and Offset. */ + public static MessageMetadata of(Partition partition, Offset offset) { + return new AutoValue_MessageMetadata(partition, offset); } - /** Decode a PublishMetadata from the Cloud Pub/Sub ack id. */ - public static PublishMetadata decode(String encoded) throws ApiException { + /** Decode a MessageMetadata from the Cloud Pub/Sub ack id. */ + public static MessageMetadata decode(String encoded) throws ApiException { String[] split = encoded.split(":"); - checkArgument(split.length == 2, "Invalid encoded PublishMetadata."); + checkArgument(split.length == 2, "Invalid encoded MessageMetadata."); try { Partition partition = Partition.of(Long.parseLong(split[0])); Offset offset = Offset.of(Long.parseLong(split[1])); return of(partition, offset); } catch (NumberFormatException e) { - throw new CheckedApiException("Invalid encoded PublishMetadata.", e, Code.INVALID_ARGUMENT) + throw new CheckedApiException("Invalid encoded MessageMetadata.", e, Code.INVALID_ARGUMENT) .underlying; } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransforms.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransforms.java index a88095afd..bfb08bdc3 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransforms.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransforms.java @@ -21,7 +21,9 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.MessageTransformer; +import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.common.collect.ImmutableListMultimap; @@ -72,15 +74,30 @@ private static String parseAttributes(Collection values) throws ApiE "Received an unparseable message with multiple values for an attribute."); ByteString attribute = values.iterator().next(); checkArgument( - attribute.isValidUtf8(), "Received an unparseable message with a non-utf8 attribute."); + attribute.isValidUtf8(), + String.format( + "Received an unparseable message with a non-utf8 attribute value: %s", + Base64.getEncoder().encodeToString(attribute.toByteArray()))); return attribute.toStringUtf8(); } + static MessageTransformer addIdCpsSubscribeTransformer( + Partition partition, MessageTransformer toWrap) { + return message -> { + PubsubMessage out = toWrap.transform(message); + checkArgument( + out.getMessageId().isEmpty(), + String.format("Received non-empty message id for PubsubMessage: %s", out)); + return out.toBuilder() + .setMessageId(MessageMetadata.of(partition, message.offset()).encode()) + .build(); + }; + } + public static MessageTransformer toCpsSubscribeTransformer() { return message -> { PubsubMessage.Builder outBuilder = toCpsPublishTransformer().transform(message.message()).toBuilder(); - outBuilder.setMessageId(Long.toString(message.offset().value())); outBuilder.setPublishTime(message.publishTime()); return outBuilder.build(); }; diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java index 7a22a3d20..923069ed1 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java @@ -88,7 +88,10 @@ public abstract class SubscriberSettings { */ abstract List partitions(); - /** The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. */ + /** + * The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. The messageId + * field must not be set on the returned message. + */ abstract Optional> transformer(); /** A provider for credentials. */ @@ -155,7 +158,10 @@ public abstract static class Builder { */ public abstract Builder setPartitions(List partition); - /** The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. */ + /** + * The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. The messageId + * field must not be set on the returned message. + */ public abstract Builder setTransformer( MessageTransformer transformer); @@ -243,7 +249,8 @@ Subscriber newPartitionSubscriber(Partition partition) throws CheckedApiExceptio return new SinglePartitionSubscriber( receiver(), - transformer().orElse(MessageTransforms.toCpsSubscribeTransformer()), + MessageTransforms.addIdCpsSubscribeTransformer( + partition, transformer().orElse(MessageTransforms.toCpsSubscribeTransformer())), new AckSetTrackerImpl(wireCommitter), nackHandler().orElse(new NackHandler() {}), messageConsumer -> wireSubscriberBuilder.setMessageConsumer(messageConsumer).build(), diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java index 394b9d19b..7f99a6937 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java @@ -22,8 +22,8 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.MessageTransformer; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.cloudpubsub.Publisher; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.TrivialProxyService; @@ -31,13 +31,13 @@ import com.google.pubsub.v1.PubsubMessage; // A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant -// publisher. It encodes a PublishMetadata object in the response string. +// publisher. It encodes a MessageMetadata object in the response string. public class WrappingPublisher extends TrivialProxyService implements Publisher { - private final com.google.cloud.pubsublite.internal.Publisher wirePublisher; + private final com.google.cloud.pubsublite.internal.Publisher wirePublisher; private final MessageTransformer transformer; public WrappingPublisher( - com.google.cloud.pubsublite.internal.Publisher wirePublisher, + com.google.cloud.pubsublite.internal.Publisher wirePublisher, MessageTransformer transformer) throws ApiException { super(wirePublisher); @@ -58,7 +58,7 @@ public ApiFuture publish(PubsubMessage message) { } return ApiFutures.transform( wirePublisher.publish(wireMessage), - PublishMetadata::encode, + MessageMetadata::encode, MoreExecutors.directExecutor()); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java index c4710de6d..b2a27e832 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java @@ -23,8 +23,8 @@ import com.google.api.core.ApiFutures; import com.google.api.core.ApiService; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ProxyService; @@ -39,23 +39,23 @@ import java.util.stream.LongStream; public class PartitionCountWatchingPublisher extends ProxyService - implements Publisher { + implements Publisher { private static final GoogleLogger log = GoogleLogger.forEnclosingClass(); private final PartitionPublisherFactory publisherFactory; private final RoutingPolicy.Factory policyFactory; private static class PartitionsWithRouting { - public final ImmutableMap> publishers; + public final ImmutableMap> publishers; private final RoutingPolicy routingPolicy; private PartitionsWithRouting( - ImmutableMap> publishers, + ImmutableMap> publishers, RoutingPolicy routingPolicy) { this.publishers = publishers; this.routingPolicy = routingPolicy; } - public ApiFuture publish(Message message) throws CheckedApiException { + public ApiFuture publish(Message message) throws CheckedApiException { try { Partition routedPartition = message.key().isEmpty() @@ -73,13 +73,13 @@ public ApiFuture publish(Message message) throws CheckedApiExce } public void cancelOutstandingPublishes() { - for (Publisher publisher : publishers.values()) { + for (Publisher publisher : publishers.values()) { publisher.cancelOutstandingPublishes(); } } public void flush() throws IOException { - for (Publisher publisher : publishers.values()) { + for (Publisher publisher : publishers.values()) { publisher.flush(); } } @@ -109,7 +109,7 @@ public void stop() { } @Override - public ApiFuture publish(Message message) { + public ApiFuture publish(Message message) { Optional partitions; try (CloseableMonitor.Hold h = monitor.enter()) { partitions = partitionsWithRouting; @@ -150,12 +150,12 @@ public void flush() throws IOException { partitions.get().flush(); } - private ImmutableMap> getNewPartitionPublishers( + private ImmutableMap> getNewPartitionPublishers( LongStream newPartitions) { - ImmutableMap.Builder> mapBuilder = ImmutableMap.builder(); + ImmutableMap.Builder> mapBuilder = ImmutableMap.builder(); newPartitions.forEach( i -> { - Publisher p = publisherFactory.newPublisher(Partition.of(i)); + Publisher p = publisherFactory.newPublisher(Partition.of(i)); p.addListener( new Listener() { @Override @@ -167,7 +167,7 @@ public void failed(State from, Throwable failure) { mapBuilder.put(Partition.of(i), p); p.startAsync(); }); - ImmutableMap> partitions = mapBuilder.build(); + ImmutableMap> partitions = mapBuilder.build(); partitions.values().forEach(ApiService::awaitRunning); return partitions; } @@ -189,12 +189,12 @@ private void handleConfig(long partitionCount) { partitionCount); return; } - ImmutableMap.Builder> mapBuilder = + ImmutableMap.Builder> mapBuilder = ImmutableMap.builder(); current.ifPresent(p -> p.publishers.forEach(mapBuilder::put)); getNewPartitionPublishers(LongStream.range(currentSize, partitionCount)) .forEach(mapBuilder::put); - ImmutableMap> newMap = mapBuilder.build(); + ImmutableMap> newMap = mapBuilder.build(); partitionsWithRouting = Optional.of( diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherSettings.java index 10f5d5808..8837faeb6 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherSettings.java @@ -54,7 +54,7 @@ public abstract static class Builder { public abstract PartitionCountWatchingPublisherSettings build(); } - public Publisher instantiate() throws ApiException { + public Publisher instantiate() throws ApiException { return new PartitionCountWatchingPublisher( publisherFactory(), DefaultRoutingPolicy::new, diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionPublisherFactory.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionPublisherFactory.java index 25caf08e9..ab3ff6beb 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionPublisherFactory.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionPublisherFactory.java @@ -17,10 +17,10 @@ package com.google.cloud.pubsublite.internal.wire; import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.internal.Publisher; public interface PartitionPublisherFactory { - Publisher newPublisher(Partition partition) throws ApiException; + Publisher newPublisher(Partition partition) throws ApiException; } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java index e1c62ffa1..ab912202b 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java @@ -23,8 +23,8 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.RoutingPolicy; @@ -32,12 +32,12 @@ import java.io.IOException; import java.util.Map; -public class RoutingPublisher extends TrivialProxyService implements Publisher { - private final Map> partitionPublishers; +public class RoutingPublisher extends TrivialProxyService implements Publisher { + private final Map> partitionPublishers; private final RoutingPolicy policy; RoutingPublisher( - Map> partitionPublishers, RoutingPolicy policy) + Map> partitionPublishers, RoutingPolicy policy) throws ApiException { super(partitionPublishers.values()); this.partitionPublishers = partitionPublishers; @@ -46,7 +46,7 @@ public class RoutingPublisher extends TrivialProxyService implements Publisher

publish(Message message) { + public ApiFuture publish(Message message) { try { Partition routedPartition = message.key().isEmpty() ? policy.routeWithoutKey() : policy.route(message.key()); @@ -65,14 +65,14 @@ public ApiFuture publish(Message message) { @Override public void cancelOutstandingPublishes() { - for (Publisher publisher : partitionPublishers.values()) { + for (Publisher publisher : partitionPublishers.values()) { publisher.cancelOutstandingPublishes(); } } @Override public void flush() throws IOException { - for (Publisher publisher : partitionPublishers.values()) { + for (Publisher publisher : partitionPublishers.values()) { publisher.flush(); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherBuilder.java index eb1b90af7..296c0ffc5 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherBuilder.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherBuilder.java @@ -18,9 +18,9 @@ import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.PartitionLookupUtils; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.DefaultRoutingPolicy; import com.google.cloud.pubsublite.internal.Publisher; @@ -53,7 +53,7 @@ public abstract static class Builder { abstract RoutingPublisherBuilder autoBuild(); - public Publisher build() throws ApiException { + public Publisher build() throws ApiException { RoutingPublisherBuilder builder = autoBuild(); int numPartitions; if (builder.numPartitions().isPresent()) { @@ -62,7 +62,7 @@ public Publisher build() throws ApiException { numPartitions = PartitionLookupUtils.numPartitions(builder.topic()); } - ImmutableMap.Builder> publisherMapBuilder = + ImmutableMap.Builder> publisherMapBuilder = ImmutableMap.builder(); for (int i = 0; i < numPartitions; i++) { publisherMapBuilder.put( diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java index 510724b85..b4a662e8a 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisher.java @@ -20,16 +20,16 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.TrivialProxyService; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; public class SinglePartitionPublisher extends TrivialProxyService - implements Publisher { + implements Publisher { private final Publisher publisher; private final Partition partition; @@ -41,10 +41,10 @@ public class SinglePartitionPublisher extends TrivialProxyService // Publisher implementation. @Override - public ApiFuture publish(Message message) { + public ApiFuture publish(Message message) { return ApiFutures.transform( publisher.publish(message), - offset -> PublishMetadata.of(partition, offset), + offset -> MessageMetadata.of(partition, offset), MoreExecutors.directExecutor()); } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java index 35a5706b3..9faf22b06 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java @@ -19,8 +19,8 @@ import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.v1.PublisherServiceClient; @@ -63,7 +63,7 @@ public abstract static class Builder { abstract SinglePartitionPublisherBuilder autoBuild(); - public Publisher build() throws ApiException { + public Publisher build() throws ApiException { SinglePartitionPublisherBuilder builder = autoBuild(); PublisherBuilder.Builder publisherBuilder = builder diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/PublishMetadataTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/MessageMetadataTest.java similarity index 76% rename from google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/PublishMetadataTest.java rename to google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/MessageMetadataTest.java index bb46259ea..c56fab8b4 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/PublishMetadataTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/MessageMetadataTest.java @@ -25,26 +25,26 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public final class PublishMetadataTest { +public final class MessageMetadataTest { @Test public void roundTripThroughString() { - PublishMetadata metadata = PublishMetadata.of(Partition.of(10), Offset.of(20)); - PublishMetadata metadata2 = PublishMetadata.decode(metadata.encode()); + MessageMetadata metadata = MessageMetadata.of(Partition.of(10), Offset.of(20)); + MessageMetadata metadata2 = MessageMetadata.decode(metadata.encode()); assertThat(metadata2).isEqualTo(metadata); } @Test public void invalidString() { - assertThrows(ApiException.class, () -> PublishMetadata.decode("999")); + assertThrows(ApiException.class, () -> MessageMetadata.decode("999")); } @Test public void invalidPartition() { - assertThrows(ApiException.class, () -> PublishMetadata.decode("abc:999")); + assertThrows(ApiException.class, () -> MessageMetadata.decode("abc:999")); } @Test public void invalidOffset() { - assertThrows(ApiException.class, () -> PublishMetadata.decode("999:abc")); + assertThrows(ApiException.class, () -> MessageMetadata.decode("999:abc")); } } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransformsTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransformsTest.java index a8e0d628b..2595c77c4 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransformsTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/MessageTransformsTest.java @@ -16,14 +16,20 @@ package com.google.cloud.pubsublite.cloudpubsub; +import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.MessageTransformer; import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.common.collect.ImmutableListMultimap; @@ -141,7 +147,6 @@ public void subscribeTransformCorrect() throws ApiException { PubsubMessage result = PubsubMessage.newBuilder() .setData(message.message().data()) - .setMessageId("7") // The offset as a string .setOrderingKey("some_key") // The key field .setPublishTime(message.publishTime()) .putAttributes("abc", "") @@ -153,6 +158,72 @@ public void subscribeTransformCorrect() throws ApiException { assertThat(subscribeTransformer.transform(message)).isEqualTo(result); } + @Test + public void wrappedSubscribeTransformerSetsIdFailure() throws ApiException { + MessageTransformer mockTransformer = + mock(MessageTransformer.class); + MessageTransformer wrapped = + MessageTransforms.addIdCpsSubscribeTransformer(example(Partition.class), mockTransformer); + when(mockTransformer.transform(any())) + .thenReturn(PubsubMessage.newBuilder().setMessageId("3").build()); + ApiException e = + assertThrows( + ApiException.class, + () -> + wrapped.transform( + SequencedMessage.of( + Message.builder() + .setAttributes( + ImmutableListMultimap.builder() + .put("abc", ByteString.EMPTY) + .put("def", ByteString.copyFromUtf8("hij")) + .build()) + .setData(ByteString.copyFrom(notUtf8Array)) + .setEventTime(Timestamps.fromNanos(10)) + .setKey(ByteString.copyFromUtf8("some_key")) + .build(), + Timestamps.fromSeconds(5), + Offset.of(7), + 2))); + assertThat(e.getStatusCode().getCode()).isEqualTo(Code.INVALID_ARGUMENT); + } + + @Test + public void wrappedSubscribeTransformerMetadataId() throws ApiException { + MessageTransformer wrapped = + MessageTransforms.addIdCpsSubscribeTransformer( + example(Partition.class), subscribeTransformer); + SequencedMessage message = + SequencedMessage.of( + Message.builder() + .setAttributes( + ImmutableListMultimap.builder() + .put("abc", ByteString.EMPTY) + .put("def", ByteString.copyFromUtf8("hij")) + .build()) + .setData(ByteString.copyFrom(notUtf8Array)) + .setEventTime(Timestamps.fromNanos(10)) + .setKey(ByteString.copyFromUtf8("some_key")) + .build(), + Timestamps.fromSeconds(5), + example(Offset.class), + 2); + PubsubMessage result = + PubsubMessage.newBuilder() + .setData(message.message().data()) + .setMessageId( + MessageMetadata.of(example(Partition.class), example(Offset.class)).encode()) + .setOrderingKey("some_key") // The key field + .setPublishTime(message.publishTime()) + .putAttributes("abc", "") + .putAttributes("def", "hij") + .putAttributes( + MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO, + MessageTransforms.encodeAttributeEventTime(message.message().eventTime().get())) + .build(); + assertThat(wrapped.transform(message)).isEqualTo(result); + } + @Test public void publishTransformExtractorFailure() { MessageTransformer transformer = diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java index acdc76d36..09031b991 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java @@ -23,8 +23,8 @@ import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.ProjectNumber; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.CheckedApiException; @@ -49,7 +49,7 @@ TopicPath getPath() throws CheckedApiException { } abstract static class FakePublisher extends FakeApiService - implements Publisher {} + implements Publisher {} abstract static class FakeConfigWatcher extends FakeApiService implements PartitionCountWatcher {} diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisherTest.java index ff9c0e1d6..6bc4d7b57 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisherTest.java @@ -27,9 +27,9 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms; import com.google.cloud.pubsublite.internal.ApiExceptionMatcher; @@ -48,7 +48,7 @@ @RunWith(JUnit4.class) public class WrappingPublisherTest { abstract static class FakePublisher extends FakeApiService - implements Publisher {} + implements Publisher {} @Spy private FakePublisher underlying; @@ -76,12 +76,12 @@ public void tearDown() { public void validPublish() throws Exception { PubsubMessage message = PubsubMessage.newBuilder().setOrderingKey("abc").build(); Message wireMessage = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build(); - SettableApiFuture metadataFuture = SettableApiFuture.create(); + SettableApiFuture metadataFuture = SettableApiFuture.create(); when(underlying.publish(wireMessage)).thenReturn(metadataFuture); ApiFuture published = publisher.publish(message); verify(underlying).publish(wireMessage); assertThat(published.isDone()).isFalse(); - PublishMetadata metadata = PublishMetadata.of(Partition.of(3), Offset.of(88)); + MessageMetadata metadata = MessageMetadata.of(Partition.of(3), Offset.of(88)); metadataFuture.set(metadata); assertThat(published.isDone()).isTrue(); assertThat(published.get()).isEqualTo(metadata.encode()); diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherTest.java index 3ecd5ac28..a49aae5d0 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherTest.java @@ -40,7 +40,7 @@ @RunWith(JUnit4.class) public class PartitionCountWatchingPublisherTest { abstract static class FakePublisher extends FakeApiService - implements Publisher {} + implements Publisher {} abstract static class FakeConfigWatcher extends FakeApiService implements PartitionCountWatcher {} @@ -62,18 +62,18 @@ private static TopicPath path() { Consumer leakedConsumer; @Spy FakeConfigWatcher fakeConfigWatcher; - Publisher publisher; + Publisher publisher; @Before public void setUp() { initMocks(this); - doReturn(ApiFutures.immediateFuture(PublishMetadata.of(Partition.of(0), Offset.of(0)))) + doReturn(ApiFutures.immediateFuture(MessageMetadata.of(Partition.of(0), Offset.of(0)))) .when(publisher0) .publish(any()); - doReturn(ApiFutures.immediateFuture(PublishMetadata.of(Partition.of(1), Offset.of(0)))) + doReturn(ApiFutures.immediateFuture(MessageMetadata.of(Partition.of(1), Offset.of(0)))) .when(publisher1) .publish(any()); - doReturn(ApiFutures.immediateFuture(PublishMetadata.of(Partition.of(2), Offset.of(0)))) + doReturn(ApiFutures.immediateFuture(MessageMetadata.of(Partition.of(2), Offset.of(0)))) .when(publisher2) .publish(any()); when(mockPublisherFactory.newPublisher(Partition.of(0))).thenReturn(publisher0); diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherTest.java index b5775148d..b14bc51c8 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherTest.java @@ -26,9 +26,9 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.internal.ApiExceptionMatcher; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.Publisher; @@ -46,7 +46,7 @@ @RunWith(JUnit4.class) public class RoutingPublisherTest { abstract static class FakePublisher extends FakeApiService - implements Publisher {} + implements Publisher {} @Spy private FakePublisher publisher0; @Spy private FakePublisher publisher1; @@ -84,9 +84,9 @@ public void cancelOutstandingCancelsAll() throws Exception { public void publishValidRoute() throws Exception { Message message = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build(); when(routingPolicy.route(message.key())).thenReturn(Partition.of(1)); - PublishMetadata meta = PublishMetadata.of(Partition.of(1), Offset.of(3)); + MessageMetadata meta = MessageMetadata.of(Partition.of(1), Offset.of(3)); when(publisher1.publish(message)).thenReturn(ApiFutures.immediateFuture(meta)); - ApiFuture fut = routing.publish(message); + ApiFuture fut = routing.publish(message); verify(publisher1, times(1)).publish(message); assertThat(fut.get()).isEqualTo(meta); this.routing.stopAsync().awaitTerminated(); @@ -96,7 +96,7 @@ public void publishValidRoute() throws Exception { public void publishInvalidRoute() throws Exception { Message message = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build(); when(routingPolicy.route(message.key())).thenReturn(Partition.of(77)); - ApiFuture fut = routing.publish(message); + ApiFuture fut = routing.publish(message); ApiExceptionMatcher.assertFutureThrowsCode(fut, Code.FAILED_PRECONDITION); } } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java index 7229e55ff..ad81ec7c5 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java @@ -30,10 +30,10 @@ import com.google.cloud.pubsublite.CloudRegion; import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.ProjectNumber; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.Publisher; @@ -52,7 +52,7 @@ abstract static class FakeOffsetPublisher extends FakeApiService implements Publ @Spy private FakeOffsetPublisher underlying; - private Publisher pub; + private Publisher pub; @Before public void setUp() { @@ -86,11 +86,11 @@ public void publishResultTransformed() throws Exception { SettableApiFuture offsetFuture = SettableApiFuture.create(); Message message = Message.builder().setData(ByteString.copyFromUtf8("xyz")).build(); when(underlying.publish(message)).thenReturn(offsetFuture); - ApiFuture metadataFuture = pub.publish(message); + ApiFuture metadataFuture = pub.publish(message); assertThat(metadataFuture.isDone()).isFalse(); offsetFuture.set(Offset.of(7)); assertThat(metadataFuture.isDone()).isTrue(); - assertThat(metadataFuture.get()).isEqualTo(PublishMetadata.of(Partition.of(3), Offset.of(7))); + assertThat(metadataFuture.get()).isEqualTo(MessageMetadata.of(Partition.of(3), Offset.of(7))); pub.stopAsync().awaitTerminated(); } diff --git a/samples/snippets/src/main/java/pubsublite/PublishWithBatchSettingsExample.java b/samples/snippets/src/main/java/pubsublite/PublishWithBatchSettingsExample.java index b8402f372..a1ea03002 100644 --- a/samples/snippets/src/main/java/pubsublite/PublishWithBatchSettingsExample.java +++ b/samples/snippets/src/main/java/pubsublite/PublishWithBatchSettingsExample.java @@ -23,8 +23,8 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.CloudRegion; import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.ProjectNumber; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.Publisher; @@ -101,7 +101,7 @@ public static void publishWithBatchSettingsExample( futures.add(future); } } finally { - ArrayList metadata = new ArrayList<>(); + ArrayList metadata = new ArrayList<>(); List ackIds = ApiFutures.allAsList(futures).get(); System.out.println("Published " + ackIds.size() + " messages with batch settings."); diff --git a/samples/snippets/src/main/java/pubsublite/PublishWithCustomAttributesExample.java b/samples/snippets/src/main/java/pubsublite/PublishWithCustomAttributesExample.java index 168bdab16..f633aa15e 100644 --- a/samples/snippets/src/main/java/pubsublite/PublishWithCustomAttributesExample.java +++ b/samples/snippets/src/main/java/pubsublite/PublishWithCustomAttributesExample.java @@ -21,8 +21,8 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.CloudRegion; import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.ProjectNumber; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.Publisher; @@ -82,7 +82,7 @@ public static void publishWithCustomAttributesExample( publisher.stopAsync().awaitTerminated(); String ackId = future.get(); - PublishMetadata metadata = PublishMetadata.decode(ackId); + MessageMetadata metadata = MessageMetadata.decode(ackId); System.out.println("Published a message with custom attributes:\n" + metadata); } } diff --git a/samples/snippets/src/main/java/pubsublite/PublishWithOrderingKeyExample.java b/samples/snippets/src/main/java/pubsublite/PublishWithOrderingKeyExample.java index c08eb1234..87881ec82 100644 --- a/samples/snippets/src/main/java/pubsublite/PublishWithOrderingKeyExample.java +++ b/samples/snippets/src/main/java/pubsublite/PublishWithOrderingKeyExample.java @@ -21,8 +21,8 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.CloudRegion; import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.ProjectNumber; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.Publisher; @@ -83,7 +83,7 @@ public static void publishWithOrderingKeyExample( publisher.stopAsync().awaitTerminated(); String ackId = future.get(); - PublishMetadata metadata = PublishMetadata.decode(ackId); + MessageMetadata metadata = MessageMetadata.decode(ackId); System.out.println("Published a message with ordering key:\n" + metadata); } } diff --git a/samples/snippets/src/main/java/pubsublite/PublisherExample.java b/samples/snippets/src/main/java/pubsublite/PublisherExample.java index e6f534d36..e7312923b 100644 --- a/samples/snippets/src/main/java/pubsublite/PublisherExample.java +++ b/samples/snippets/src/main/java/pubsublite/PublisherExample.java @@ -22,8 +22,8 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.CloudRegion; import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.ProjectNumber; -import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.Publisher; @@ -83,11 +83,11 @@ public static void publisherExample( futures.add(future); } } finally { - ArrayList metadata = new ArrayList<>(); + ArrayList metadata = new ArrayList<>(); List ackIds = ApiFutures.allAsList(futures).get(); for (String id : ackIds) { // Decoded metadata contains partition and offset. - metadata.add(PublishMetadata.decode(id)); + metadata.add(MessageMetadata.decode(id)); } System.out.println(metadata + "\nPublished " + ackIds.size() + " messages."); diff --git a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java index fb7064beb..a6e0c76f1 100644 --- a/samples/snippets/src/main/java/pubsublite/SubscriberExample.java +++ b/samples/snippets/src/main/java/pubsublite/SubscriberExample.java @@ -22,6 +22,7 @@ import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsublite.CloudRegion; import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.ProjectNumber; import com.google.cloud.pubsublite.SubscriptionName; import com.google.cloud.pubsublite.SubscriptionPath; @@ -68,7 +69,7 @@ public static void subscriberExample( MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> { - System.out.println("Id : " + message.getMessageId()); + System.out.println("Id : " + MessageMetadata.decode(message.getMessageId())); System.out.println("Data : " + message.getData().toStringUtf8()); consumer.ack(); };