diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java index 98adde43f..fc8652f58 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java @@ -23,17 +23,21 @@ import io.grpc.StatusRuntimeException; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; public final class ExtractStatus { public static Optional extract(Throwable t) { - if (t instanceof StatusException) { - return Optional.of(((StatusException) t).getStatus()); + try { + throw t; + } catch (StatusException e) { + return Optional.of(e.getStatus()); + } catch (StatusRuntimeException e) { + return Optional.of(e.getStatus()); + } catch (Throwable e) { + return Optional.empty(); } - if (t instanceof StatusRuntimeException) { - return Optional.of(((StatusRuntimeException) t).getStatus()); - } - return Optional.empty(); } public static StatusException toCanonical(Throwable t) { @@ -56,5 +60,47 @@ public static void addFailureHandler(ApiFuture future, Consumer { + O apply(I input) throws StatusException; + } + + public interface StatusConsumer { + void apply(I input) throws StatusException; + } + + public interface StatusBiconsumer { + void apply(K key, V value) throws StatusException; + } + + public static Function rethrowAsRuntime(StatusFunction function) { + return i -> { + try { + return function.apply(i); + } catch (StatusException e) { + throw e.getStatus().asRuntimeException(); + } + }; + } + + public static Consumer rethrowAsRuntime(StatusConsumer consumer) { + return i -> { + try { + consumer.apply(i); + } catch (StatusException e) { + throw e.getStatus().asRuntimeException(); + } + }; + } + + public static BiConsumer rethrowAsRuntime(StatusBiconsumer consumer) { + return (k, v) -> { + try { + consumer.apply(k, v); + } catch (StatusException e) { + throw e.getStatus().asRuntimeException(); + } + }; + } + private ExtractStatus() {} } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java index 29462eeef..6cd492302 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java @@ -26,8 +26,8 @@ import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; import com.google.cloud.pubsublite.proto.PublisherServiceGrpc; import io.grpc.Channel; diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/AckSetTrackerImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/AckSetTrackerImplTest.java index a0234c780..522c87b9a 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/AckSetTrackerImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/AckSetTrackerImplTest.java @@ -29,8 +29,8 @@ import com.google.api.core.SettableApiFuture; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.SequencedMessage; -import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.StatusExceptionMatcher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.Cursor; import com.google.common.util.concurrent.MoreExecutors; diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriberTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriberTest.java index 5ad78aca1..6050d9492 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriberTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriberTest.java @@ -34,8 +34,8 @@ import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms; import com.google.cloud.pubsublite.cloudpubsub.NackHandler; -import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.StatusExceptionMatcher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.wire.Subscriber; import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; import com.google.cloud.pubsublite.proto.Cursor; diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisherTest.java index d9016dff7..ecd857bfb 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisherTest.java @@ -31,9 +31,9 @@ import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms; -import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.StatusExceptionMatcher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import io.grpc.Status.Code; diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherTest.java index f658c7a5d..994f0b3ec 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisherTest.java @@ -28,10 +28,10 @@ import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.PublishMetadata; -import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.RoutingPolicy; import com.google.cloud.pubsublite.internal.StatusExceptionMatcher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import io.grpc.Status.Code; diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java index 1793f9815..b30e5031f 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java @@ -33,8 +33,8 @@ import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.protobuf.ByteString; import org.junit.Before; import org.junit.Test; diff --git a/pom.xml b/pom.xml index b00e1a78a..185be562d 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,40 @@ pom import + + com.google.flogger + google-extensions + 0.5.1 + + + com.google.errorprone + error_prone_annotations + 2.4.0 + + + com.google.truth + truth + 1.0.1 + test + + + com.google.truth.extensions + truth-java8-extension + 1.0.1 + test + + + org.mockito + mockito-core + 3.5.13 + test + + + junit + junit + 4.13 + test + diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteSinkTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteSinkTest.java index ca63d1fba..17502120e 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteSinkTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteSinkTest.java @@ -39,8 +39,8 @@ import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.ExtractStatus; -import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import io.grpc.Status; diff --git a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReaderTest.java b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReaderTest.java index a29b2b45c..7a3346771 100644 --- a/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReaderTest.java +++ b/pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedReaderTest.java @@ -30,8 +30,8 @@ import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.beam.PubsubLiteUnboundedReader.SubscriberState; -import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.PullSubscriber; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; import com.google.common.base.Ticker; diff --git a/pubsublite-kafka-shim/pom.xml b/pubsublite-kafka-shim/pom.xml index 00d71630e..2ec626612 100644 --- a/pubsublite-kafka-shim/pom.xml +++ b/pubsublite-kafka-shim/pom.xml @@ -14,6 +14,59 @@ Pub/Sub Lite Kafka Shim https://github.com/googleapis/java-pubsublite Kafka Producer and Consumer for Google Cloud Pub/Sub Lite + + + com.google.api.grpc + proto-google-cloud-pubsublite-v1 + 0.4.2-SNAPSHOT + + + com.google.cloud + google-cloud-pubsublite + 0.4.2-SNAPSHOT + + + org.apache.kafka + kafka-clients + 2.5.0 + + + io.grpc + grpc-api + + + com.google.protobuf + protobuf-java + + + com.google.protobuf + protobuf-java-util + + + com.google.api + api-common + + + com.google.guava + guava + + + + junit + junit + test + + + com.google.truth + truth + test + + + com.google.truth.extensions + truth-java8-extension + test + + diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/CommitterFactory.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/CommitterFactory.java new file mode 100644 index 000000000..7652aefc5 --- /dev/null +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/CommitterFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.wire.Committer; +import io.grpc.StatusException; + +/** A factory for making new PullSubscribers for a given partition of a subscription. */ +interface CommitterFactory { + Committer newCommitter(Partition partition) throws StatusException; +} diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerFactory.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerFactory.java new file mode 100644 index 000000000..668f6d7b7 --- /dev/null +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerFactory.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +interface ConsumerFactory { + SingleSubscriptionConsumer newConsumer(); +} diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/KafkaExceptionUtils.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/KafkaExceptionUtils.java new file mode 100644 index 000000000..7b92d1a7f --- /dev/null +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/KafkaExceptionUtils.java @@ -0,0 +1,91 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.cloud.pubsublite.internal.ExtractStatus; +import io.grpc.StatusException; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.BrokerNotAvailableException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidRequestException; + +class KafkaExceptionUtils { + private KafkaExceptionUtils() {} + + static KafkaException toKafkaException(StatusException source) { + switch (source.getStatus().getCode()) { + case OK: + throw source.getStatus().asRuntimeException(); + case ABORTED: + return new BrokerNotAvailableException("Aborted.", source); + case ALREADY_EXISTS: + return new KafkaException("Already exists.", source); + case CANCELLED: + return new BrokerNotAvailableException("Cancelled.", source); + case DATA_LOSS: + return new KafkaException("Data loss.", source); + case DEADLINE_EXCEEDED: + return new BrokerNotAvailableException("Deadline exceeded.", source); + case FAILED_PRECONDITION: + return new InvalidRequestException("Failed precondition.", source); + case INTERNAL: + return new BrokerNotAvailableException("Internal.", source); + case INVALID_ARGUMENT: + return new InvalidRequestException("Invalid argument.", source); + case NOT_FOUND: + return new KafkaException("Not found.", source); + case OUT_OF_RANGE: + return new KafkaException("Out of range.", source); + case PERMISSION_DENIED: + return new AuthorizationException("Permission denied.", source); + case RESOURCE_EXHAUSTED: + return new KafkaException("Resource exhausted.", source); + case UNAUTHENTICATED: + return new AuthenticationException("Unauthenticated.", source); + case UNAVAILABLE: + return new BrokerNotAvailableException("Unavailable.", source); + case UNIMPLEMENTED: + return new KafkaException("Unimplemented.", source); + case UNKNOWN: + return new KafkaException("Unknown.", source); + } + return new KafkaException("No case.", source); + } + + /** + * Transform an exception into a kind that is likely to be thrown through kafka interfaces. + * + * @param t A throwable to transform. + * @return The transformed exception suitable for re-throwing. + */ + static RuntimeException toKafka(Throwable t) { + try { + throw t; + } catch (KafkaException | UnsupportedOperationException | IllegalStateException e) { + return e; + } catch (InterruptedException e) { + return new InterruptException(e); + } catch (TimeoutException e) { + return new org.apache.kafka.common.errors.TimeoutException(e); + } catch (Throwable e) { + return KafkaExceptionUtils.toKafkaException(ExtractStatus.toCanonical(t)); + } + } +} diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java new file mode 100644 index 000000000..fd097c56f --- /dev/null +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.protobuf.ByteString; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +class LiteHeaders implements Headers { + private ImmutableListMultimap attributes; + + LiteHeaders(ImmutableListMultimap attributes) { + this.attributes = attributes; + } + + static Header toHeader(String key, ByteString value) { + return new Header() { + @Override + public String key() { + return key; + } + + @Override + public byte[] value() { + return value.toByteArray(); + } + }; + } + + private static List
toHeaders(String key, Collection values) { + ImmutableList.Builder
headersBuilder = ImmutableList.builder(); + values.forEach(value -> headersBuilder.add(toHeader(key, value))); + return headersBuilder.build(); + } + + @Override + public Headers add(Header header) throws IllegalStateException { + throw new IllegalStateException(); + } + + @Override + public Headers add(String s, byte[] bytes) throws IllegalStateException { + throw new IllegalStateException(); + } + + @Override + public Headers remove(String s) throws IllegalStateException { + throw new IllegalStateException(); + } + + @Override + public Header lastHeader(String s) { + return Iterables.getLast(this); + } + + @Override + public Iterable
headers(String s) { + if (attributes.containsKey(s)) + return Iterables.transform(attributes.get(s), value -> toHeader(s, value)); + return ImmutableList.of(); + } + + @Override + public Header[] toArray() { + ImmutableList.Builder
arrayBuilder = ImmutableList.builder(); + attributes + .entries() + .forEach(entry -> arrayBuilder.add(toHeader(entry.getKey(), entry.getValue()))); + return (Header[]) arrayBuilder.build().toArray(); + } + + @Override + public Iterator
iterator() { + return Iterators.transform( + attributes.entries().iterator(), entry -> toHeader(entry.getKey(), entry.getValue())); + } +} diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteNode.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteNode.java new file mode 100644 index 000000000..7a63b9223 --- /dev/null +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteNode.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import org.apache.kafka.common.Node; + +final class PubsubLiteNode { + private PubsubLiteNode() {} + + public static final Node NODE = new Node(0, "pubsublite.googleapis.com", 443); + public static final Node[] NODES = {NODE}; +} diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java new file mode 100644 index 000000000..ff45ed137 --- /dev/null +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.PullSubscriber; +import com.google.cloud.pubsublite.proto.SeekRequest; +import io.grpc.StatusException; + +/** A factory for making new PullSubscribers for a given partition of a subscription. */ +interface PullSubscriberFactory { + PullSubscriber newPullSubscriber(Partition partition, SeekRequest initial) + throws StatusException; +} diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java new file mode 100644 index 000000000..dddbcd223 --- /dev/null +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.TopicPath; +import com.google.common.collect.ImmutableListMultimap; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Timestamps; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.TimestampType; + +class RecordTransforms { + private RecordTransforms() {} + + static Message toMessage(ProducerRecord record) { + Message.Builder builder = + Message.builder() + .setKey(ByteString.copyFrom(record.key())) + .setData(ByteString.copyFrom(record.value())); + if (record.timestamp() != null) { + builder.setEventTime(Timestamps.fromMillis(record.timestamp())); + } + ImmutableListMultimap.Builder attributes = ImmutableListMultimap.builder(); + record + .headers() + .forEach(header -> attributes.put(header.key(), ByteString.copyFrom(header.value()))); + return builder.setAttributes(attributes.build()).build(); + } + + static ConsumerRecord fromMessage( + SequencedMessage message, TopicPath topic, Partition partition) { + Headers headers = new LiteHeaders(message.message().attributes()); + TimestampType type; + Timestamp timestamp; + if (message.message().eventTime().isPresent()) { + type = TimestampType.CREATE_TIME; + timestamp = message.message().eventTime().get(); + } else { + type = TimestampType.LOG_APPEND_TIME; + timestamp = message.publishTime(); + } + return new ConsumerRecord<>( + topic.toString(), + (int) partition.value(), + message.offset().value(), + Timestamps.toMillis(timestamp), + type, + 0L, + message.message().key().size(), + message.message().data().size(), + message.message().key().toByteArray(), + message.message().data().toByteArray(), + headers); + } +} diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumer.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumer.java new file mode 100644 index 000000000..c44420b76 --- /dev/null +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumer.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.api.core.ApiFuture; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.proto.SeekRequest; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaException; + +/** A stripped down KafkaConsumer interface that operates on a single subscription. */ +interface SingleSubscriptionConsumer { + void setAssignment(Set partitions); + + Set assignment(); + + ConsumerRecords poll(Duration duration); + + ApiFuture> commitAll(); + + ApiFuture commit(Map commitOffsets); + + void doSeek(Partition partition, SeekRequest request) throws KafkaException; + + Optional position(Partition partition); + + void close(Duration duration); + + /** + * Cause the outstanding or next call to poll to throw a WakeupException. The consumer is left in + * an unspecified state. + */ + void wakeup(); +} diff --git a/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java new file mode 100644 index 000000000..20c2a8e94 --- /dev/null +++ b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example; +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.TopicPath; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RecordTransformsTest { + private static final Message MESSAGE = + Message.builder() + .setKey(ByteString.copyFromUtf8("abc")) + .setData(ByteString.copyFromUtf8("def")) + .setEventTime(Timestamp.newBuilder().setSeconds(1).setNanos(1000000).build()) + .setAttributes( + ImmutableListMultimap.of( + "xxx", + ByteString.copyFromUtf8("yyy"), + "zzz", + ByteString.copyFromUtf8("zzz"), + "zzz", + ByteString.copyFromUtf8("zzz"))) + .build(); + + @Test + public void publishTransform() { + ProducerRecord record = + new ProducerRecord<>( + example(TopicPath.class).toString(), + null, + 1001L, + "abc".getBytes(), + "def".getBytes(), + ImmutableList.of( + LiteHeaders.toHeader("xxx", ByteString.copyFromUtf8("yyy")), + LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz")), + LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz")))); + Message message = RecordTransforms.toMessage(record); + assertThat(message).isEqualTo(MESSAGE); + } + + @Test + public void subscribeTransform() { + SequencedMessage sequencedMessage = + SequencedMessage.of( + MESSAGE, Timestamp.newBuilder().setNanos(12345).build(), example(Offset.class), 123L); + ConsumerRecord record = + RecordTransforms.fromMessage( + sequencedMessage, example(TopicPath.class), example(Partition.class)); + assertThat(record.key()).isEqualTo("abc".getBytes()); + assertThat(record.value()).isEqualTo("def".getBytes()); + assertThat(record.timestampType()).isEqualTo(TimestampType.CREATE_TIME); + assertThat(record.timestamp()).isEqualTo(1001L); + ImmutableListMultimap.Builder headers = ImmutableListMultimap.builder(); + record + .headers() + .forEach(header -> headers.put(header.key(), ByteString.copyFrom(header.value()))); + assertThat(headers.build()).isEqualTo(MESSAGE.attributes()); + assertThat(record.offset()).isEqualTo(example(Offset.class).value()); + assertThat(record.topic()).isEqualTo(example(TopicPath.class).toString()); + assertThat(record.partition()).isEqualTo(example(Partition.class).value()); + } +} diff --git a/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/StatusTestHelpers.java b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/StatusTestHelpers.java new file mode 100755 index 000000000..34f1b4aab --- /dev/null +++ b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/StatusTestHelpers.java @@ -0,0 +1,39 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.cloud.pubsublite.internal.ExtractStatus; +import io.grpc.Status; +import io.grpc.Status.Code; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class StatusTestHelpers { + private StatusTestHelpers() {} + + public static void assertFutureThrowsCode(Future f, Code code) { + ExecutionException exception = assertThrows(ExecutionException.class, f::get); + Optional statusOr = ExtractStatus.extract(exception.getCause()); + assertThat(statusOr).isPresent(); + assertThat(statusOr.get().getCode()).isEqualTo(code); + } +}