Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .readme-partials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ custom_content: |
futures.add(future);
}
} finally {
ArrayList<PublishMetadata> metadata = new ArrayList<>();
ArrayList<MessageMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
for (String id : ackIds) {
// Decoded metadata contains partition and offset.
metadata.add(PublishMetadata.decode(id));
metadata.add(MessageMetadata.decode(id));
}
System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ try {
futures.add(future);
}
} finally {
ArrayList<PublishMetadata> metadata = new ArrayList<>();
ArrayList<MessageMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
for (String id : ackIds) {
// Decoded metadata contains partition and offset.
metadata.add(PublishMetadata.decode(id));
metadata.add(MessageMetadata.decode(id));
}
System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

Expand Down
6 changes: 6 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- TODO: Remove on next release -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/PublishMetadata</className>
<method>*</method>
</difference>
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
<difference>
<differenceType>7013</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,33 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;

/**
* Information about a successful publish operation. Can be encoded in the string returned by the
* Cloud Pub/Sub {@link com.google.cloud.pubsub.v1.Publisher#publish} api.
* Information about a message in Pub/Sub Lite. Can be encoded in the string returned by the Cloud
* Pub/Sub {@link com.google.cloud.pubsub.v1.Publisher#publish} api or the {@link
* com.google.pubsub.v1.PubsubMessage#getMessageId} field on received messages.
*/
@AutoValue
public abstract class PublishMetadata {
public abstract class MessageMetadata {
/** The partition a message was published to. */
public abstract Partition partition();

/** The offset a message was assigned. */
public abstract Offset offset();

/** Construct a PublishMetadata from a Partition and Offset. */
public static PublishMetadata of(Partition partition, Offset offset) {
return new AutoValue_PublishMetadata(partition, offset);
/** Construct a MessageMetadata from a Partition and Offset. */
public static MessageMetadata of(Partition partition, Offset offset) {
return new AutoValue_MessageMetadata(partition, offset);
}

/** Decode a PublishMetadata from the Cloud Pub/Sub ack id. */
public static PublishMetadata decode(String encoded) throws ApiException {
/** Decode a MessageMetadata from the Cloud Pub/Sub ack id. */
public static MessageMetadata decode(String encoded) throws ApiException {
String[] split = encoded.split(":");
checkArgument(split.length == 2, "Invalid encoded PublishMetadata.");
checkArgument(split.length == 2, "Invalid encoded MessageMetadata.");
try {
Partition partition = Partition.of(Long.parseLong(split[0]));
Offset offset = Offset.of(Long.parseLong(split[1]));
return of(partition, offset);
} catch (NumberFormatException e) {
throw new CheckedApiException("Invalid encoded PublishMetadata.", e, Code.INVALID_ARGUMENT)
throw new CheckedApiException("Invalid encoded MessageMetadata.", e, Code.INVALID_ARGUMENT)
.underlying;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.common.collect.ImmutableListMultimap;
Expand Down Expand Up @@ -72,15 +74,30 @@ private static String parseAttributes(Collection<ByteString> values) throws ApiE
"Received an unparseable message with multiple values for an attribute.");
ByteString attribute = values.iterator().next();
checkArgument(
attribute.isValidUtf8(), "Received an unparseable message with a non-utf8 attribute.");
attribute.isValidUtf8(),
String.format(
"Received an unparseable message with a non-utf8 attribute value: %s",
Base64.getEncoder().encodeToString(attribute.toByteArray())));
return attribute.toStringUtf8();
}

static MessageTransformer<SequencedMessage, PubsubMessage> addIdCpsSubscribeTransformer(
Partition partition, MessageTransformer<SequencedMessage, PubsubMessage> toWrap) {
return message -> {
PubsubMessage out = toWrap.transform(message);
checkArgument(
out.getMessageId().isEmpty(),
String.format("Received non-empty message id for PubsubMessage: %s", out));
return out.toBuilder()
.setMessageId(MessageMetadata.of(partition, message.offset()).encode())
.build();
};
}

public static MessageTransformer<SequencedMessage, PubsubMessage> toCpsSubscribeTransformer() {
return message -> {
PubsubMessage.Builder outBuilder =
toCpsPublishTransformer().transform(message.message()).toBuilder();
outBuilder.setMessageId(Long.toString(message.offset().value()));
outBuilder.setPublishTime(message.publishTime());
return outBuilder.build();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ public abstract class SubscriberSettings {
*/
abstract List<Partition> partitions();

/** The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. */
/**
* The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. The messageId
* field must not be set on the returned message.
*/
abstract Optional<MessageTransformer<SequencedMessage, PubsubMessage>> transformer();

/** A provider for credentials. */
Expand Down Expand Up @@ -155,7 +158,10 @@ public abstract static class Builder {
*/
public abstract Builder setPartitions(List<Partition> partition);

/** The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. */
/**
* The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. The messageId
* field must not be set on the returned message.
*/
public abstract Builder setTransformer(
MessageTransformer<SequencedMessage, PubsubMessage> transformer);

Expand Down Expand Up @@ -243,7 +249,8 @@ Subscriber newPartitionSubscriber(Partition partition) throws CheckedApiExceptio

return new SinglePartitionSubscriber(
receiver(),
transformer().orElse(MessageTransforms.toCpsSubscribeTransformer()),
MessageTransforms.addIdCpsSubscribeTransformer(
partition, transformer().orElse(MessageTransforms.toCpsSubscribeTransformer())),
new AckSetTrackerImpl(wireCommitter),
nackHandler().orElse(new NackHandler() {}),
messageConsumer -> wireSubscriberBuilder.setMessageConsumer(messageConsumer).build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;

// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
// publisher. It encodes a PublishMetadata object in the response string.
// publisher. It encodes a MessageMetadata object in the response string.
public class WrappingPublisher extends TrivialProxyService implements Publisher {
private final com.google.cloud.pubsublite.internal.Publisher<PublishMetadata> wirePublisher;
private final com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> wirePublisher;
private final MessageTransformer<PubsubMessage, Message> transformer;

public WrappingPublisher(
com.google.cloud.pubsublite.internal.Publisher<PublishMetadata> wirePublisher,
com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> wirePublisher,
MessageTransformer<PubsubMessage, Message> transformer)
throws ApiException {
super(wirePublisher);
Expand All @@ -58,7 +58,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
}
return ApiFutures.transform(
wirePublisher.publish(wireMessage),
PublishMetadata::encode,
MessageMetadata::encode,
MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
Expand All @@ -39,23 +39,23 @@
import java.util.stream.LongStream;

public class PartitionCountWatchingPublisher extends ProxyService
implements Publisher<PublishMetadata> {
implements Publisher<MessageMetadata> {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
private final PartitionPublisherFactory publisherFactory;
private final RoutingPolicy.Factory policyFactory;

private static class PartitionsWithRouting {
public final ImmutableMap<Partition, Publisher<PublishMetadata>> publishers;
public final ImmutableMap<Partition, Publisher<MessageMetadata>> publishers;
private final RoutingPolicy routingPolicy;

private PartitionsWithRouting(
ImmutableMap<Partition, Publisher<PublishMetadata>> publishers,
ImmutableMap<Partition, Publisher<MessageMetadata>> publishers,
RoutingPolicy routingPolicy) {
this.publishers = publishers;
this.routingPolicy = routingPolicy;
}

public ApiFuture<PublishMetadata> publish(Message message) throws CheckedApiException {
public ApiFuture<MessageMetadata> publish(Message message) throws CheckedApiException {
try {
Partition routedPartition =
message.key().isEmpty()
Expand All @@ -73,13 +73,13 @@ public ApiFuture<PublishMetadata> publish(Message message) throws CheckedApiExce
}

public void cancelOutstandingPublishes() {
for (Publisher<PublishMetadata> publisher : publishers.values()) {
for (Publisher<MessageMetadata> publisher : publishers.values()) {
publisher.cancelOutstandingPublishes();
}
}

public void flush() throws IOException {
for (Publisher<PublishMetadata> publisher : publishers.values()) {
for (Publisher<MessageMetadata> publisher : publishers.values()) {
publisher.flush();
}
}
Expand Down Expand Up @@ -109,7 +109,7 @@ public void stop() {
}

@Override
public ApiFuture<PublishMetadata> publish(Message message) {
public ApiFuture<MessageMetadata> publish(Message message) {
Optional<PartitionsWithRouting> partitions;
try (CloseableMonitor.Hold h = monitor.enter()) {
partitions = partitionsWithRouting;
Expand Down Expand Up @@ -150,12 +150,12 @@ public void flush() throws IOException {
partitions.get().flush();
}

private ImmutableMap<Partition, Publisher<PublishMetadata>> getNewPartitionPublishers(
private ImmutableMap<Partition, Publisher<MessageMetadata>> getNewPartitionPublishers(
LongStream newPartitions) {
ImmutableMap.Builder<Partition, Publisher<PublishMetadata>> mapBuilder = ImmutableMap.builder();
ImmutableMap.Builder<Partition, Publisher<MessageMetadata>> mapBuilder = ImmutableMap.builder();
newPartitions.forEach(
i -> {
Publisher<PublishMetadata> p = publisherFactory.newPublisher(Partition.of(i));
Publisher<MessageMetadata> p = publisherFactory.newPublisher(Partition.of(i));
p.addListener(
new Listener() {
@Override
Expand All @@ -167,7 +167,7 @@ public void failed(State from, Throwable failure) {
mapBuilder.put(Partition.of(i), p);
p.startAsync();
});
ImmutableMap<Partition, Publisher<PublishMetadata>> partitions = mapBuilder.build();
ImmutableMap<Partition, Publisher<MessageMetadata>> partitions = mapBuilder.build();
partitions.values().forEach(ApiService::awaitRunning);
return partitions;
}
Expand All @@ -189,12 +189,12 @@ private void handleConfig(long partitionCount) {
partitionCount);
return;
}
ImmutableMap.Builder<Partition, Publisher<PublishMetadata>> mapBuilder =
ImmutableMap.Builder<Partition, Publisher<MessageMetadata>> mapBuilder =
ImmutableMap.builder();
current.ifPresent(p -> p.publishers.forEach(mapBuilder::put));
getNewPartitionPublishers(LongStream.range(currentSize, partitionCount))
.forEach(mapBuilder::put);
ImmutableMap<Partition, Publisher<PublishMetadata>> newMap = mapBuilder.build();
ImmutableMap<Partition, Publisher<MessageMetadata>> newMap = mapBuilder.build();

partitionsWithRouting =
Optional.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public abstract static class Builder {
public abstract PartitionCountWatchingPublisherSettings build();
}

public Publisher<PublishMetadata> instantiate() throws ApiException {
public Publisher<MessageMetadata> instantiate() throws ApiException {
return new PartitionCountWatchingPublisher(
publisherFactory(),
DefaultRoutingPolicy::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.Publisher;

public interface PartitionPublisherFactory {
Publisher<PublishMetadata> newPublisher(Partition partition) throws ApiException;
Publisher<MessageMetadata> newPublisher(Partition partition) throws ApiException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.RoutingPolicy;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import java.io.IOException;
import java.util.Map;

public class RoutingPublisher extends TrivialProxyService implements Publisher<PublishMetadata> {
private final Map<Partition, Publisher<PublishMetadata>> partitionPublishers;
public class RoutingPublisher extends TrivialProxyService implements Publisher<MessageMetadata> {
private final Map<Partition, Publisher<MessageMetadata>> partitionPublishers;
private final RoutingPolicy policy;

RoutingPublisher(
Map<Partition, Publisher<PublishMetadata>> partitionPublishers, RoutingPolicy policy)
Map<Partition, Publisher<MessageMetadata>> partitionPublishers, RoutingPolicy policy)
throws ApiException {
super(partitionPublishers.values());
this.partitionPublishers = partitionPublishers;
Expand All @@ -46,7 +46,7 @@ public class RoutingPublisher extends TrivialProxyService implements Publisher<P

// Publisher implementation.
@Override
public ApiFuture<PublishMetadata> publish(Message message) {
public ApiFuture<MessageMetadata> publish(Message message) {
try {
Partition routedPartition =
message.key().isEmpty() ? policy.routeWithoutKey() : policy.route(message.key());
Expand All @@ -65,14 +65,14 @@ public ApiFuture<PublishMetadata> publish(Message message) {

@Override
public void cancelOutstandingPublishes() {
for (Publisher<PublishMetadata> publisher : partitionPublishers.values()) {
for (Publisher<MessageMetadata> publisher : partitionPublishers.values()) {
publisher.cancelOutstandingPublishes();
}
}

@Override
public void flush() throws IOException {
for (Publisher<PublishMetadata> publisher : partitionPublishers.values()) {
for (Publisher<MessageMetadata> publisher : partitionPublishers.values()) {
publisher.flush();
}
}
Expand Down
Loading